Skip to content

Commit

Permalink
Storages: Shutdown the LocalIndexScheduler before shutting down PageS…
Browse files Browse the repository at this point in the history
…torage/DeltaMergeStore (release-8.5) (#9713)

close #9714

Storages: Shutdown the LocalIndexScheduler before shutting down PageStorage/DeltaMergeStore
* Add a method `LocalIndexerScheduler::shutdown()` and ensure the running task are all finished before shutting down the GlobalPageStorage in `ContextShared::shutdown()`.

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Dec 11, 2024
1 parent 990562e commit 6e12ba2
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 27 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ struct ContextShared
return;
shutdown_called = true;

// The local index scheduler must be shutdown to stop all
// running tasks before shutting down `global_storage_pool`.
if (global_local_indexer_scheduler)
{
global_local_indexer_scheduler->shutdown();
}

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (elem.read_rows != 0)
{
LOG_INFO(
LOG_DEBUG(
execute_query_logger,
"Read {} rows, {} in {:.3f} sec., {} rows/sec., {}/sec.",
elem.read_rows,
Expand Down Expand Up @@ -421,7 +421,7 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in)
in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_INFO(logger, pipeline_log_str());
LOG_DEBUG(logger, pipeline_log_str());
}

BlockIO executeQuery(const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage)
Expand Down
17 changes: 14 additions & 3 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options)
start();
}

LocalIndexerScheduler::~LocalIndexerScheduler()
void LocalIndexerScheduler::shutdown()
{
LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish...");
LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish...");

// First quit the scheduler. Don't schedule more tasks.
is_shutting_down = true;
Expand All @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler()

// Then wait all running tasks to finish.
pool.reset();
LOG_INFO(logger, "LocalIndexerScheduler is shutdown.");
}

LocalIndexerScheduler::~LocalIndexerScheduler()
{
if (!is_shutting_down)
{
shutdown();
}
LOG_INFO(logger, "LocalIndexerScheduler is destroyed");
}

Expand Down Expand Up @@ -295,7 +303,10 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock<std::mutex> & lock
}
};

RUNTIME_CHECK(pool);
if (is_shutting_down || !pool)
// shutting down, retry again
return false;

if (!pool->trySchedule(real_job))
// Concurrent task limit reached
return false;
Expand Down
15 changes: 11 additions & 4 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class LocalIndexerScheduler

~LocalIndexerScheduler();

/**
* @brief Stop the scheduler and wait for running tasks to finish.
* Note that this method won't clear the task pushed.
*/
void shutdown();

/**
* @brief Start the scheduler. In some tests we need to start scheduler
* after some tasks are pushed.
Expand All @@ -101,7 +107,7 @@ class LocalIndexerScheduler

/**
* @brief Blocks until there is no tasks remaining in the queue and there is no running tasks.
* Should be only used in tests.
* **Should be only used in tests**.
*/
void waitForFinish();

Expand All @@ -114,6 +120,7 @@ class LocalIndexerScheduler

/**
* @brief Drop all tasks matching specified keyspace id and table id.
* Note that this method won't drop the running tasks.
*/
size_t dropTasks(KeyspaceID keyspace_id, TableID table_id);

Expand Down Expand Up @@ -147,9 +154,6 @@ class LocalIndexerScheduler
void moveBackReadyTasks(std::unique_lock<std::mutex> & lock);

private:
bool is_started = false;
std::thread scheduler_thread;

/// Try to add a task to the pool. Returns false if the pool is full
/// (for example, reaches concurrent task limit or memory limit).
/// When pool is full, we will not try to schedule any more tasks at this moment.
Expand All @@ -160,6 +164,9 @@ class LocalIndexerScheduler
/// heavy pressure.
bool tryAddTaskToPool(std::unique_lock<std::mutex> & lock, const InternalTaskPtr & task);

std::thread scheduler_thread;
bool is_started = false;

KeyspaceID last_schedule_keyspace_id = 0;
std::map<KeyspaceID, TableID> last_schedule_table_id_by_ks;

Expand Down
40 changes: 25 additions & 15 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( //
DMContext & context,
PageIdU64 segment_id)
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Segment::SegmentMetaInfo segment_info;
readSegmentMetaInfo(buf, segment_info);
try
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
readSegmentMetaInfo(buf, segment_info);

return segment;
auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);

return segment;
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
return {};
}

Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
Expand Down
55 changes: 52 additions & 3 deletions dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Checksum.h>
#include <IO/Encryption/MockKeyManager.h>
#include <Interpreters/Context.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/PatternFormatter.h>
#include <Server/CLIService.h>
#include <Storages/Page/PageDefinesBase.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/RaftDataReader.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/PathPool.h>
Expand All @@ -29,6 +33,7 @@
#include <common/types.h>

#include <boost/program_options.hpp>
#include <cstdint>
#include <magic_enum.hpp>
#include <unordered_set>

Expand All @@ -47,12 +52,15 @@ struct ControlOptions
CHECK_ALL_DATA_CRC = 4,
DISPLAY_WAL_ENTRIES = 5,
DISPLAY_REGION_INFO = 6,
DISPLAY_BLOB_DATA = 7,
};

std::vector<std::string> paths;
DisplayType mode = DisplayType::DISPLAY_SUMMARY_INFO;
UInt64 page_id = UINT64_MAX;
UInt32 blob_id = UINT32_MAX;
BlobFileOffset blob_offset = INVALID_BLOBFILE_OFFSET;
size_t blob_size = UINT64_MAX;
UInt64 namespace_id = DB::TEST_NAMESPACE_ID;
StorageType storage_type = StorageType::Unknown; // only useful for universal page storage
UInt32 keyspace_id = NullspaceID; // only useful for universal page storage
Expand Down Expand Up @@ -85,6 +93,7 @@ ControlOptions ControlOptions::parse(int argc, char ** argv)
4 is check every data is valid
5 is dump entries in WAL log files
6 is display all region info
7 is display blob data (in hex)
)") //
("show_entries",
value<bool>()->default_value(true),
Expand All @@ -106,8 +115,14 @@ ControlOptions ControlOptions::parse(int argc, char ** argv)
value<UInt64>()->default_value(UINT64_MAX),
"Query a single Page id, and print its version chain.") //
("blob_id,B",
value<UInt32>()->default_value(UINT32_MAX),
"Query a single Blob id, and print its data distribution.") //
value<BlobFileId>()->default_value(INVALID_BLOBFILE_ID),
"Specify the blob_id") //
("blob_offset",
value<BlobFileOffset>()->default_value(INVALID_BLOBFILE_OFFSET),
"Specify the offset.") //
("blob_size",
value<size_t>()->default_value(0),
"Specify the size.") //
//
("imitative,I",
value<bool>()->default_value(true),
Expand Down Expand Up @@ -140,7 +155,9 @@ ControlOptions ControlOptions::parse(int argc, char ** argv)
opt.paths = options["paths"].as<std::vector<std::string>>();
auto mode_int = options["mode"].as<int>();
opt.page_id = options["page_id"].as<UInt64>();
opt.blob_id = options["blob_id"].as<UInt32>();
opt.blob_id = options["blob_id"].as<BlobFileId>();
opt.blob_offset = options["blob_offset"].as<BlobFileOffset>();
opt.blob_size = options["blob_size"].as<size_t>();
opt.show_entries = options["show_entries"].as<bool>();
opt.check_fields = options["check_fields"].as<bool>();
auto storage_type_int = options["storage_type"].as<int>();
Expand Down Expand Up @@ -346,6 +363,12 @@ class PageStorageControlV3
}
break;
}
case ControlOptions::DisplayType::DISPLAY_BLOB_DATA:
{
String hex_data = getBlobData(blob_store, opts.blob_id, opts.blob_offset, opts.blob_size);
fmt::println("hex:{}", hex_data);
break;
}
default:
std::cout << "Invalid display mode." << std::endl;
break;
Expand Down Expand Up @@ -821,6 +844,32 @@ class PageStorageControlV3
return error_msg.toString();
}

static String getBlobData(
typename Trait::BlobStore & blob_store,
BlobFileId blob_id,
BlobFileOffset offset,
size_t size)
{
auto page_id = []() {
if constexpr (std::is_same_v<Trait, u128::PageStorageControlV3Trait>)
return PageIdV3Internal(0, 0);
else
return UniversalPageId("");
}();
char * buffer = new char[size];
blob_store.read(page_id, blob_id, offset, buffer, size, nullptr, false);

using ChecksumClass = Digest::CRC64;
ChecksumClass digest;
digest.update(buffer, size);
auto checksum = digest.checksum();
fmt::println("checksum: 0x{:X}", checksum);

auto hex_str = Redact::keyToHexString(buffer, size);
delete[] buffer;
return hex_str;
}

private:
ControlOptions options;
};
Expand Down

0 comments on commit 6e12ba2

Please sign in to comment.