From 2d49a28c156e1e4ab0a8eb7dbda09d4b2a26d801 Mon Sep 17 00:00:00 2001 From: adiholden Date: Sun, 10 Nov 2024 09:34:40 +0200 Subject: [PATCH] fix(server): handle running script load inside multi (#4074) Signed-off-by: adi_holden --- src/server/common.cc | 27 +++++++++++ src/server/common.h | 25 +++++++++++ src/server/conn_context.cc | 7 +++ src/server/conn_context.h | 2 + src/server/main_service.cc | 90 ++++++++++--------------------------- src/server/multi_test.cc | 23 ++++++++++ src/server/script_mgr.cc | 12 ++--- src/server/script_mgr.h | 5 ++- src/server/server_family.cc | 2 +- src/server/transaction.cc | 8 ++-- 10 files changed, 123 insertions(+), 78 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index cdaf41ebe2e9..bb78d4e4668f 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -18,6 +18,8 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "core/compact_object.h" +#include "core/interpreter.h" +#include "server/conn_context.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" @@ -453,4 +455,29 @@ void ThreadLocalMutex::unlock() { } } +BorrowedInterpreter::BorrowedInterpreter(Transaction* tx, ConnectionState* state) { + // Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our + // preborrowed interpreter (which can't be shared on multiple threads). + CHECK(!state->squashing_info); + + if (auto borrowed = state->exec_info.preborrowed_interpreter; borrowed) { + // Ensure a preborrowed interpreter is only set for an already running MULTI transaction. + CHECK_EQ(state->exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING); + + interpreter_ = borrowed; + } else { + // A scheduled transaction occupies a place in the transaction queue and holds locks, + // preventing other transactions from progressing. Blocking below can deadlock! + CHECK(!tx->IsScheduled()); + + interpreter_ = ServerState::tlocal()->BorrowInterpreter(); + owned_ = true; + } +} + +BorrowedInterpreter::~BorrowedInterpreter() { + if (owned_) + ServerState::tlocal()->ReturnInterpreter(interpreter_); +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index efe66d6ee684..1ab29b27023f 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -47,6 +47,8 @@ using RdbTypeFreqMap = absl::flat_hash_map; class CommandId; class Transaction; class EngineShard; +class ConnectionState; +class Interpreter; struct LockTagOptions { bool enabled = false; @@ -353,6 +355,29 @@ template class ABSL_SCOPED_LOCKABLE SharedLock { bool is_locked_; }; +// Ensures availability of an interpreter for EVAL-like commands and it's automatic release. +// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired. +struct BorrowedInterpreter { + BorrowedInterpreter(Transaction* tx, ConnectionState* state); + + ~BorrowedInterpreter(); + + // Give up ownership of the interpreter, it must be returned manually. + Interpreter* Release() && { + DCHECK(owned_); + owned_ = false; + return interpreter_; + } + + operator Interpreter*() { + return interpreter_; + } + + private: + Interpreter* interpreter_ = nullptr; + bool owned_ = false; +}; + extern size_t serialization_max_chunk_size; } // namespace dfly diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 6cdd66f1a12b..40652c34e00d 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -72,6 +72,13 @@ size_t StoredCmd::NumArgs() const { return sizes_.size(); } +std::string StoredCmd::FirstArg() const { + if (sizes_.size() == 0) { + return {}; + } + return buffer_.substr(0, sizes_[0]); +} + facade::ReplyMode StoredCmd::ReplyMode() const { return reply_mode_; } diff --git a/src/server/conn_context.h b/src/server/conn_context.h index dc8c50442179..58e586e6e789 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -46,6 +46,8 @@ class StoredCmd { Fill(absl::MakeSpan(*dest)); } + std::string FirstArg() const; + const CommandId* Cid() const; facade::ReplyMode ReplyMode() const; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 96b5ead33822..d87265a4e704 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -684,32 +684,33 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, send->Invoke(std::move(resp)); } -enum class ExecEvalState { +enum class ExecScriptUse { NONE = 0, - ALL = 1, - SOME = 2, + SCRIPT_LOAD = 1, + SCRIPT_RUN = 2, }; -ExecEvalState DetermineEvalPresense(const std::vector& body) { - unsigned eval_cnt = 0; +ExecScriptUse DetermineScriptPresense(const std::vector& body) { + bool script_load = false; for (const auto& scmd : body) { if (CO::IsEvalKind(scmd.Cid()->name())) { - eval_cnt++; + return ExecScriptUse::SCRIPT_RUN; } - } - if (eval_cnt == 0) - return ExecEvalState::NONE; + if ((scmd.Cid()->name() == "SCRIPT") && (absl::AsciiStrToUpper(scmd.FirstArg()) == "LOAD")) { + script_load = true; + } + } - if (eval_cnt == body.size()) - return ExecEvalState::ALL; + if (script_load) + return ExecScriptUse::SCRIPT_LOAD; - return ExecEvalState::SOME; + return ExecScriptUse::NONE; } // Returns the multi mode for that transaction. Returns NOT_DETERMINED if no scheduling // is required. -Transaction::MultiMode DeduceExecMode(ExecEvalState state, +Transaction::MultiMode DeduceExecMode(ExecScriptUse state, const ConnectionState::ExecInfo& exec_info, const ScriptMgr& script_mgr) { // Check if script most LIKELY has global eval transactions @@ -717,7 +718,7 @@ Transaction::MultiMode DeduceExecMode(ExecEvalState state, Transaction::MultiMode multi_mode = static_cast(absl::GetFlag(FLAGS_multi_exec_mode)); - if (state != ExecEvalState::NONE) { + if (state == ExecScriptUse::SCRIPT_RUN) { contains_global = script_mgr.AreGlobalByDefault(); } @@ -765,50 +766,6 @@ string CreateExecDescriptor(const std::vector& stored_cmds, unsigned return result; } -// Ensures availability of an interpreter for EVAL-like commands and it's automatic release. -// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired. -struct BorrowedInterpreter { - BorrowedInterpreter(Transaction* tx, ConnectionContext* cntx) { - // Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our - // preborrowed interpreter (which can't be shared on multiple threads). - CHECK(!cntx->conn_state.squashing_info); - - if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) { - // Ensure a preborrowed interpreter is only set for an already running MULTI transaction. - CHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING); - - interpreter_ = borrowed; - } else { - // A scheduled transaction occupies a place in the transaction queue and holds locks, - // preventing other transactions from progressing. Blocking below can deadlock! - CHECK(!tx->IsScheduled()); - - interpreter_ = ServerState::tlocal()->BorrowInterpreter(); - owned_ = true; - } - } - - ~BorrowedInterpreter() { - if (owned_) - ServerState::tlocal()->ReturnInterpreter(interpreter_); - } - - // Give up ownership of the interpreter, it must be returned manually. - Interpreter* Release() && { - DCHECK(owned_); - owned_ = false; - return interpreter_; - } - - operator Interpreter*() { - return interpreter_; - } - - private: - Interpreter* interpreter_ = nullptr; - bool owned_ = false; -}; - string ConnectionLogContext(const facade::Connection* conn) { if (conn == nullptr) { return "(null-conn)"; @@ -1873,7 +1830,7 @@ void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, return rb->SendNull(); } - BorrowedInterpreter interpreter{tx, cntx}; + BorrowedInterpreter interpreter{tx, &cntx->conn_state}; auto res = server_family_.script_mgr()->Insert(body, interpreter); if (!res) return builder->SendError(res.error().Format(), facade::kScriptErrType); @@ -1887,7 +1844,7 @@ void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde ConnectionContext* cntx) { string sha = absl::AsciiStrToLower(ArgS(args, 0)); - BorrowedInterpreter interpreter{cntx->transaction, cntx}; + BorrowedInterpreter interpreter{cntx->transaction, &cntx->conn_state}; CallSHA(args, sha, interpreter, builder, cntx); } @@ -2254,12 +2211,13 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, cntx->last_command_debug.exec_body_len = exec_info.body.size(); - // The transaction can contain scripts, determine their presence ahead to customize logic below. - ExecEvalState state = DetermineEvalPresense(exec_info.body); + // The transaction can contain script load script execution, determine their presence ahead to + // customize logic below. + ExecScriptUse state = DetermineScriptPresense(exec_info.body); - // We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup - if (state != ExecEvalState::NONE) { - exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, cntx).Release(); + // We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup + if (state != ExecScriptUse::NONE) { + exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, &cntx->conn_state).Release(); } // Determine according multi mode, not only only flag, but based on presence of global commands @@ -2293,7 +2251,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ServerState::tlocal()->exec_freq_count[descr]++; } - if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE && + if (absl::GetFlag(FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN && !cntx->conn_state.tracking_info_.IsTrackingOn()) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this); } else { diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index a1ed70faea9c..161f291de04e 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -1147,6 +1147,29 @@ TEST_F(MultiEvalTest, MultiAndEval) { Run({"eval", "return 'OK';", "0"}); auto resp = Run({"exec"}); EXPECT_EQ(resp, "OK"); + + // We had a bug running script load inside multi + Run({"multi"}); + Run({"script", "load", "return '5'"}); + Run({"exec"}); + + Run({"multi"}); + Run({"script", "load", "return '5'"}); + Run({"get", "x"}); + Run({"exec"}); + + Run({"multi"}); + Run({"script", "load", "return '5'"}); + Run({"mset", "x1", "y1", "x2", "y2"}); + Run({"exec"}); + + Run({"multi"}); + Run({"script", "load", "return '5'"}); + Run({"eval", "return redis.call('set', 'x', 'y')", "1", "x"}); + Run({"get", "x"}); + Run({"exec"}); + + Run({"get", "x"}); } TEST_F(MultiTest, MultiTypes) { diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index f6bb61a920d7..0053a02f470f 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -67,7 +67,8 @@ ScriptMgr::ScriptKey::ScriptKey(string_view sha) : array{} { memcpy(data(), sha.data(), size()); } -void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string subcmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (subcmd == "HELP") { @@ -110,7 +111,7 @@ void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) return LatencyCmd(tx, builder); if (subcmd == "LOAD" && args.size() == 2) - return LoadCmd(args, tx, builder); + return LoadCmd(args, tx, builder, cntx); if (subcmd == "FLAGS" && args.size() > 2) return ConfigCmd(args, tx, builder); @@ -144,7 +145,8 @@ void ScriptMgr::FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui return builder->SendOk(); } -void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string_view body = ArgS(args, 1); auto rb = static_cast(builder); if (body.empty()) { @@ -153,9 +155,7 @@ void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil return rb->SendBulkString(sha); } - ServerState* ss = ServerState::tlocal(); - auto interpreter = ss->BorrowInterpreter(); - absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); }; + BorrowedInterpreter interpreter{tx, &cntx->conn_state}; auto res = Insert(body, interpreter); if (!res) diff --git a/src/server/script_mgr.h b/src/server/script_mgr.h index 7cad5ceea02c..63241682b925 100644 --- a/src/server/script_mgr.h +++ b/src/server/script_mgr.h @@ -48,7 +48,7 @@ class ScriptMgr { ScriptMgr(); - void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); // Insert script and return sha. Get possible error from compilation or parsing script flags. io::Result Insert(std::string_view body, Interpreter* interpreter); @@ -69,7 +69,8 @@ class ScriptMgr { private: void ExistsCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) const; void FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); void ConfigCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); void ListCmd(Transaction* tx, SinkReplyBuilder* builder) const; void LatencyCmd(Transaction* tx, SinkReplyBuilder* builder) const; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 03cae22afdd3..83f9ea39e967 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -3008,7 +3008,7 @@ void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { - script_mgr_->Run(std::move(args), tx, builder); + script_mgr_->Run(std::move(args), tx, builder, cntx); } void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, diff --git a/src/server/transaction.cc b/src/server/transaction.cc index cb5935ac4596..3064cd51296a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -399,10 +399,12 @@ OpStatus Transaction::InitByArgs(Namespace* ns, DbIndex index, CmdArgList args) } if ((cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) > 0) { - if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0) + if (((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)) { EnableAllShards(); - else + } else { EnableShard(0); + } + return OpStatus::OK; } @@ -976,7 +978,7 @@ string Transaction::DEBUG_PrintFailState(ShardId sid) const { void Transaction::EnableShard(ShardId sid) { unique_shard_cnt_ = 1; unique_shard_id_ = sid; - shard_data_.resize(1); + shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1); shard_data_.front().local_mask |= ACTIVE; }