diff --git a/core/bucket.cxx b/core/bucket.cxx index 4e9e97d37..0fb8dd2ba 100644 --- a/core/bucket.cxx +++ b/core/bucket.cxx @@ -273,6 +273,15 @@ class bucket_impl return config_->map_key(id.key(), id.node_index()); } + auto config_rev() const -> std::string + { + std::scoped_lock lock(config_mutex_); + if (config_) { + return config_->rev_str(); + } + return ""; + } + [[nodiscard]] auto map_id(const std::vector& key, std::size_t node_index) -> std::pair> { @@ -909,6 +918,12 @@ bucket::map_id(const document_id& id) -> std::pairmap_id(id); } +auto +bucket::config_rev() const -> std::string +{ + return impl_->config_rev(); +} + auto bucket::direct_dispatch(std::shared_ptr req) -> std::error_code { diff --git a/core/bucket.hxx b/core/bucket.hxx index 1a8c69da3..02ad0164f 100644 --- a/core/bucket.hxx +++ b/core/bucket.hxx @@ -106,8 +106,12 @@ class bucket } else { auto [partition, server] = map_id(cmd->request.id); if (!server.has_value()) { - CB_LOG_TRACE( - R"({} unable to map key="{}" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition); + CB_LOG_TRACE(R"([{}] unable to map key="{}" to the node, id={}, partition={}, rev={})", + log_prefix(), + cmd->request.id, + cmd->id_, + partition, + config_rev()); return io::retry_orchestrator::maybe_retry( cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled); } @@ -116,32 +120,43 @@ class bucket } auto session = find_session_by_index(index); if (!session || !session->has_config()) { - CB_LOG_TRACE(R"({} defer operation id={}, key="{}", partition={}, index={}, session={}, address="{}", has_config={})", - log_prefix(), - cmd->id_, - cmd->request.id, - cmd->request.partition, - index, - session.has_value(), - session.has_value() ? session->bootstrap_address() : "", - session.has_value() && session->has_config()); + CB_LOG_TRACE( + R"([{}] defer operation id="{}", key="{}", partition={}, index={}, session={}, address="{}", has_config={}, rev={})", + log_prefix(), + cmd->id_, + cmd->request.id, + cmd->request.partition, + index, + session.has_value(), + session.has_value() ? session->bootstrap_address() : "", + session.has_value() && session->has_config(), + config_rev()); return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); }); } if (session->is_stopped()) { CB_LOG_TRACE( - R"({} the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}")", + R"([{}] the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}", rev={})", log_prefix(), index, cmd->id_, cmd->request.id, cmd->request.partition, session->id(), - session->bootstrap_address()); + session->bootstrap_address(), + config_rev()); return io::retry_orchestrator::maybe_retry( cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled); } cmd->last_dispatched_from_ = session->local_address(); cmd->last_dispatched_to_ = session->bootstrap_address(); + CB_LOG_TRACE(R"([{}] send operation id="{}", key="{}", partition={}, index={}, address="{}", rev={})", + session->log_prefix(), + cmd->id_, + cmd->request.id, + cmd->request.partition, + index, + session->bootstrap_address(), + config_rev()); cmd->send_to(session.value()); } @@ -186,6 +201,7 @@ class bucket [[nodiscard]] auto next_session_index() -> std::size_t; [[nodiscard]] auto find_session_by_index(std::size_t index) const -> std::optional; [[nodiscard]] auto map_id(const document_id& id) -> std::pair>; + [[nodiscard]] auto config_rev() const -> std::string; asio::io_context& ctx_; std::shared_ptr impl_; diff --git a/core/io/mcbp_command.hxx b/core/io/mcbp_command.hxx index d160d5f6f..0e3719f9a 100644 --- a/core/io/mcbp_command.hxx +++ b/core/io/mcbp_command.hxx @@ -35,6 +35,7 @@ #include #include +#include #include #include @@ -136,6 +137,16 @@ struct mcbp_command : public std::enable_shared_from_thislog_prefix() : manager_->log_prefix(), + id_, + encoded_request_type::body_type::opcode, + request.id, + request.partition, + time_left); + } handler(ec, std::move(msg)); } } @@ -273,7 +284,6 @@ struct mcbp_command : public std::enable_shared_from_thissession_->decode_error_code(msg.header.status()); } if (status == key_value_status_code::not_my_vbucket) { - self->session_->handle_not_my_vbucket(msg); return io::retry_orchestrator::maybe_retry(self->manager_, self, retry_reason::key_value_not_my_vbucket, ec); } if (status == key_value_status_code::unknown_collection) { diff --git a/core/io/mcbp_session.cxx b/core/io/mcbp_session.cxx index 38df41e86..2c5405aa5 100644 --- a/core/io/mcbp_session.cxx +++ b/core/io/mcbp_session.cxx @@ -59,6 +59,94 @@ #include #include +namespace +{ +template +struct mcbp_header_view { + const Container& buf_; + + mcbp_header_view(const Container& buf) + : buf_{ buf } + { + } +}; + +struct mcbp_header_layout { + std::uint8_t magic; + std::uint8_t opcode; + union { + std::uint16_t normal; + struct { + std::uint8_t framing_extras; + std::uint8_t key; + } alt; + } keylen; + std::uint8_t extlen; + std::uint8_t datatype; + std::uint16_t specific; + std::uint32_t bodylen; + std::uint32_t opaque; + std::uint64_t cas; + + [[nodiscard]] constexpr auto specific_name() const -> std::string_view + { + if (magic == 0x18 || magic == 0x81) { + return "status"; + } + return "vbucket"; + } + + [[nodiscard]] constexpr auto key_length() const -> std::uint16_t + { + if (magic == 0x18 || magic == 0x08) { + return keylen.alt.key; + } + return couchbase::core::utils::byte_swap(keylen.normal); + } + + [[nodiscard]] constexpr auto framing_extras_length() const -> std::uint8_t + { + if (magic == 0x18 || magic == 0x08) { + return keylen.alt.framing_extras; + } + return 0; + } +}; +} // namespace + +template +struct fmt::formatter> { + template + constexpr auto parse(ParseContext& ctx) + { + return ctx.begin(); + } + + template + auto format(const mcbp_header_view& view, FormatContext& ctx) const + { + if (view.buf_.size() < sizeof(couchbase::core::io::binary_header)) { + return format_to(ctx.out(), "{:n}", spdlog::to_hex(view.buf_)); + } + + const auto* header = reinterpret_cast(view.buf_.data()); + return format_to( + ctx.out(), + "{{magic=0x{:x}, opcode=0x{:x}, fextlen={}, keylen={}, extlen={}, datatype={}, {}={}, bodylen={}, opaque={}, cas={}}}", + header->magic, + header->opcode, + header->framing_extras_length(), + header->key_length(), + header->extlen, + header->datatype, + header->specific_name(), + couchbase::core::utils::byte_swap(header->specific), + couchbase::core::utils::byte_swap(header->bodylen), + couchbase::core::utils::byte_swap(header->opaque), + couchbase::core::utils::byte_swap(header->cas)); + } +}; + namespace couchbase::core::io { struct connection_endpoints { @@ -887,9 +975,7 @@ class mcbp_session_impl if (stopped_) { return; } - std::uint32_t opaque{ 0 }; - std::memcpy(&opaque, buf.data() + 12, sizeof(opaque)); - CB_LOG_TRACE("{} MCBP send, opaque={}, {:n}", log_prefix_, utils::byte_swap(opaque), spdlog::to_hex(buf.begin(), buf.begin() + 24)); + CB_LOG_TRACE("{} MCBP send {}", log_prefix_, mcbp_header_view(buf)); std::scoped_lock lock(output_buffer_mutex_); output_buffer_.emplace_back(std::move(buf)); } @@ -1126,11 +1212,17 @@ class mcbp_session_impl return; } if (config == config_) { - CB_LOG_TRACE("{} received a configuration with identical revision (rev={}), ignoring", log_prefix_, config.rev_str()); + CB_LOG_TRACE("{} received a configuration with identical revision (new={}, old={}), ignoring", + log_prefix_, + config.rev_str(), + config_->rev_str()); return; } if (config < config_) { - CB_LOG_DEBUG("{} received a configuration with older revision, ignoring", log_prefix_); + CB_LOG_DEBUG("{} received a configuration with older revision (new={}, old={}), ignoring", + log_prefix_, + config.rev_str(), + config_->rev_str()); return; } } @@ -1429,10 +1521,7 @@ class mcbp_session_impl if (self->stopped_) { return; } - CB_LOG_TRACE("{} MCBP recv, opaque={}, {:n}", - self->log_prefix_, - utils::byte_swap(msg.header.opaque), - spdlog::to_hex(msg.header_data())); + CB_LOG_TRACE("{} MCBP recv {}", self->log_prefix_, mcbp_header_view(msg.header_data())); if (self->bootstrapped_) { self->handler_->handle(std::move(msg)); } else { diff --git a/core/io/retry_orchestrator.hxx b/core/io/retry_orchestrator.hxx index d25142bee..bc440ec55 100644 --- a/core/io/retry_orchestrator.hxx +++ b/core/io/retry_orchestrator.hxx @@ -55,11 +55,12 @@ retry_with_duration(std::shared_ptr manager, std::chrono::milliseconds duration) { command->request.retries.record_retry_attempt(reason); - CB_LOG_TRACE(R"({} retrying operation {} (duration={}ms, id="{}", reason={}, attempts={}, last_dispatched_to=\"{}\"))", + CB_LOG_TRACE(R"({} retrying operation {} (duration={}ms, id="{}", vbucket_id={}, reason={}, attempts={}, last_dispatched_to="{}"))", manager->log_prefix(), decltype(command->request)::encoded_request_type::body_type::opcode, duration.count(), command->id_, + command->request.partition, reason, command->request.retries.retry_attempts(), command->session_ ? command->session_->remote_address() : ""); diff --git a/core/protocol/client_request.hxx b/core/protocol/client_request.hxx index d4c95da59..74077e42d 100644 --- a/core/protocol/client_request.hxx +++ b/core/protocol/client_request.hxx @@ -91,6 +91,11 @@ class client_request partition_ = val; } + [[nodiscard]] auto partition() const -> std::uint16_t + { + return partition_; + } + Body& body() { return body_;