Skip to content

Commit

Permalink
ddl: Fix unstable test "fullstack-test/mpp/rollup_tpcds.test" (#9628)
Browse files Browse the repository at this point in the history
close #9613

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JaySon-Huang and ti-chi-bot[bot] authored Nov 18, 2024
1 parent 99ab3c5 commit 2c221d8
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 67 deletions.
169 changes: 105 additions & 64 deletions dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
#include <Poco/FileStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageLog.h>
#include <common/logger_useful.h>

#include <boost/range/join.hpp>
#include <memory>
#include <string_view>


namespace DB
Expand All @@ -67,9 +69,13 @@ extern const char exception_between_create_database_meta_and_directory[];
}


InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_)
InterpreterCreateQuery::InterpreterCreateQuery(
const ASTPtr & query_ptr_,
Context & context_,
std::string_view log_suffix_)
: query_ptr(query_ptr_)
, context(context_)
, log_suffix(log_suffix_)
{}


Expand Down Expand Up @@ -447,7 +453,6 @@ ColumnsDescription InterpreterCreateQuery::setColumns(
return res;
}


void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
Expand Down Expand Up @@ -488,6 +493,89 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
}


/**
* Try to acquire a DDLGuard to execute the "CREATE TABLE" actions.
*
* Return the gurad if this thread become the owner to execute "CREATE TABLE".
* If the thread does not is the owner to execute "CREATE TABLE".
* - If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
* - Otherwise, concurrent queries for creating a table, if the table does not exist,
* wait for `timeout_seconds` at max to check whether the table creation is completly
* created. If the table has been created within timeout, then do nothing and return.
* If timeout happen at last, throw an exception.
*/
std::unique_ptr<DDLGuard> tryGetDDLGuard(
Context & context,
const String & database_name,
const String & table_name,
bool create_if_not_exists,
size_t timeout_seconds,
std::string_view log_suffix)
{
constexpr int wait_useconds = 50'000;
const size_t max_retries = timeout_seconds * 1'000'000 / wait_useconds;
try
{
auto guard = context.getDDLGuardIfTableDoesntExist(
database_name,
table_name,
"Table " + database_name + "." + table_name + " is creating or attaching right now");

if (!guard)
{
if (create_if_not_exists)
return {}; // return a null guard
else
throw Exception(
"Table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_ALREADY_EXISTS);
}
return guard;
}
catch (Exception & e)
{
// Concurrent queries for creating the same table may run into this branch.
// We have to wait for the table created completely, then return to use the table.
// Thus, we choose to do a retry here to wait the table created completed.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE)
{
auto log = Logger::get(log_suffix);
LOG_WARNING(log, "Concurrent create table happens, error_code={} error_msg={}", e.code(), e.message());
for (size_t i = 0; i < max_retries; ++i)
{
// Once we can get the table from `context`, consider the table create has been "completed"
// and return a null guard
if (context.isTableExist(database_name, table_name))
return {};

// sleep a while and retry
LOG_WARNING(
log,
"Waiting for the completion of concurrent table creation action"
", sleep for {} ms and try again",
wait_useconds / 1000);
usleep(wait_useconds);
}

// timeout, throw an exception
LOG_ERROR(
log,
"still failed to wait for the completion of concurrent table creation in InterpreterCreateQuery, "
"max_retries={} stack_info={}",
max_retries,
e.getStackTrace().toString());
e.rethrow();
}
else
{
e.addMessage(std::string(log_suffix));
e.rethrow();
}
}
return {}; // not reachable
}

BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
String path = context.getPath();
Expand Down Expand Up @@ -534,8 +622,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set the table engine if it was not specified explicitly.
setEngine(create);

StoragePtr res;

{
std::unique_ptr<DDLGuard> guard;

Expand All @@ -547,72 +633,25 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
database = context.getDatabase(database_name);
data_path = database->getDataPath();

/** If the table already exists, and the request specifies IF NOT EXISTS,
* then we allow concurrent CREATE queries (which do nothing).
* Otherwise, concurrent queries for creating a table, if the table does not exist,
* can throw an exception, even if IF NOT EXISTS is specified.
*/
try
guard = tryGetDDLGuard(
context,
database_name,
table_name,
create.if_not_exists,
/*timeout_seconds=*/5,
log_suffix);
if (!guard)
{
guard = context.getDDLGuardIfTableDoesntExist(
database_name,
table_name,
"Table " + database_name + "." + table_name + " is creating or attaching right now");

if (!guard)
{
if (create.if_not_exists)
return {};
else
throw Exception(
"Table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
catch (Exception & e)
{
// Due to even if it throws this two error code, it can't ensure the table is completely created
// So we have to wait for the table created completely, then return to use the table.
// Thus, we choose to do a retry here to wait the table created completed.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE)
{
auto log = Logger::get(fmt::format("InterpreterCreateQuery {} {}", database_name, table_name));
LOG_WARNING(
log,
"createTable failed with error code is {}, error info is {}, stack_info is {}",
e.code(),
e.displayText(),
e.getStackTrace().toString());
const size_t max_retry = 50;
const int wait_useconds = 20000;
for (size_t i = 0; i < max_retry; i++) // retry
{
if (context.isTableExist(database_name, table_name))
return {};

// sleep a while and retry
LOG_ERROR(
log,
"createTable failed but table not exist now, \nWe will sleep for {} ms and try again.",
wait_useconds / 1000);
usleep(wait_useconds); // sleep 20ms
}
LOG_ERROR(
log,
"still failed to createTable in InterpreterCreateQuery for retry {} times",
max_retry);
e.rethrow();
}
else
{
e.rethrow();
}
// Not the owner to create IStorage instance, and the table is created
// completely, let's return
return {};
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};

res = StorageFactory::instance().get(
// Guard is acquired, let's create the IStorage instance
StoragePtr res = StorageFactory::instance().get(
create,
data_path,
table_name,
Expand Down Expand Up @@ -642,6 +681,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)

if (!create.is_temporary)
database->attachTable(table_name, res);

// the table has been created completely
}

/// If the query is a CREATE SELECT, insert the data into the table.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterCreateQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterCreateQuery : public IInterpreter
{
public:
InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_);
InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_, std::string_view log_suffix_ = "");

BlockIO execute() override;

Expand Down Expand Up @@ -68,6 +68,7 @@ class InterpreterCreateQuery : public IInterpreter

ASTPtr query_ptr;
Context & context;
std::string_view log_suffix;

/// Using while loading database.
ThreadPool * thread_pool = nullptr;
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,10 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateDatabaseByInfo(const TiDB::DB

ASTPtr ast = parseCreateStatement(statement);

InterpreterCreateQuery interpreter(ast, context);
InterpreterCreateQuery interpreter(
ast,
context,
fmt::format("keyspace={} database_id={}", keyspace_id, db_info->id));
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
Expand Down Expand Up @@ -1169,7 +1172,15 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
ast_create_query->if_not_exists = true;
ast_create_query->database = database_mapped_name;

InterpreterCreateQuery interpreter(ast, context);
InterpreterCreateQuery interpreter(
ast,
context,
fmt::format(
"keyspace={} database_id={} table_id={} action={}",
keyspace_id,
database_id,
table_info->id,
action));
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
Expand Down

0 comments on commit 2c221d8

Please sign in to comment.