Skip to content

Commit

Permalink
add more information to diagnose timeouts on NMV responses
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Nov 6, 2023
1 parent fb93ca8 commit a6b2964
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 24 deletions.
15 changes: 15 additions & 0 deletions core/bucket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ class bucket_impl
return config_->map_key(id.key(), id.node_index());
}

auto config_rev() const -> std::string
{
std::scoped_lock lock(config_mutex_);
if (config_) {
return config_->rev_str();
}
return "<no-config>";
}

[[nodiscard]] auto map_id(const std::vector<std::byte>& key, std::size_t node_index)
-> std::pair<std::uint16_t, std::optional<std::size_t>>
{
Expand Down Expand Up @@ -909,6 +918,12 @@ bucket::map_id(const document_id& id) -> std::pair<std::uint16_t, std::optional<
return impl_->map_id(id);
}

auto
bucket::config_rev() const -> std::string
{
return impl_->config_rev();
}

auto
bucket::direct_dispatch(std::shared_ptr<mcbp::queue_request> req) -> std::error_code
{
Expand Down
42 changes: 29 additions & 13 deletions core/bucket.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,12 @@ class bucket
} else {
auto [partition, server] = map_id(cmd->request.id);
if (!server.has_value()) {
CB_LOG_TRACE(
R"({} unable to map key="{}" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
CB_LOG_TRACE(R"([{}] unable to map key="{}" to the node, id={}, partition={}, rev={})",
log_prefix(),
cmd->request.id,
cmd->id_,
partition,
config_rev());
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
Expand All @@ -116,32 +120,43 @@ class bucket
}
auto session = find_session_by_index(index);
if (!session || !session->has_config()) {
CB_LOG_TRACE(R"({} defer operation id={}, key="{}", partition={}, index={}, session={}, address="{}", has_config={})",
log_prefix(),
cmd->id_,
cmd->request.id,
cmd->request.partition,
index,
session.has_value(),
session.has_value() ? session->bootstrap_address() : "",
session.has_value() && session->has_config());
CB_LOG_TRACE(
R"([{}] defer operation id="{}", key="{}", partition={}, index={}, session={}, address="{}", has_config={}, rev={})",
log_prefix(),
cmd->id_,
cmd->request.id,
cmd->request.partition,
index,
session.has_value(),
session.has_value() ? session->bootstrap_address() : "",
session.has_value() && session->has_config(),
config_rev());
return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); });
}
if (session->is_stopped()) {
CB_LOG_TRACE(
R"({} the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}")",
R"([{}] the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}", rev={})",
log_prefix(),
index,
cmd->id_,
cmd->request.id,
cmd->request.partition,
session->id(),
session->bootstrap_address());
session->bootstrap_address(),
config_rev());
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
cmd->last_dispatched_from_ = session->local_address();
cmd->last_dispatched_to_ = session->bootstrap_address();
CB_LOG_TRACE(R"([{}] send operation id="{}", key="{}", partition={}, index={}, address="{}", rev={})",
session->log_prefix(),
cmd->id_,
cmd->request.id,
cmd->request.partition,
index,
session->bootstrap_address(),
config_rev());
cmd->send_to(session.value());
}

Expand Down Expand Up @@ -186,6 +201,7 @@ class bucket
[[nodiscard]] auto next_session_index() -> std::size_t;
[[nodiscard]] auto find_session_by_index(std::size_t index) const -> std::optional<io::mcbp_session>;
[[nodiscard]] auto map_id(const document_id& id) -> std::pair<std::uint16_t, std::optional<std::size_t>>;
[[nodiscard]] auto config_rev() const -> std::string;

asio::io_context& ctx_;
std::shared_ptr<bucket_impl> impl_;
Expand Down
12 changes: 11 additions & 1 deletion core/io/mcbp_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <couchbase/key_value_error_map_info.hxx>

#include <asio/steady_timer.hpp>
#include <fmt/chrono.h>

#include <functional>
#include <utility>
Expand Down Expand Up @@ -136,6 +137,16 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
span_ = nullptr;
}
if (handler) {
if (ec == errc::common::unambiguous_timeout || ec == errc::common::ambiguous_timeout) {
auto time_left = deadline.expiry() - std::chrono::steady_clock::now();
CB_LOG_TRACE(R"([{}] timeout operation id="{}", {}, key="{}", partition={}, time_left={})",
session_ ? session_->log_prefix() : manager_->log_prefix(),
id_,
encoded_request_type::body_type::opcode,
request.id,
request.partition,
time_left);
}
handler(ec, std::move(msg));
}
}
Expand Down Expand Up @@ -273,7 +284,6 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
error_code = self->session_->decode_error_code(msg.header.status());
}
if (status == key_value_status_code::not_my_vbucket) {
self->session_->handle_not_my_vbucket(msg);
return io::retry_orchestrator::maybe_retry(self->manager_, self, retry_reason::key_value_not_my_vbucket, ec);
}
if (status == key_value_status_code::unknown_collection) {
Expand Down
107 changes: 98 additions & 9 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,94 @@
#include <cstring>
#include <utility>

namespace
{
template<typename Container>
struct mcbp_header_view {
const Container& buf_;

mcbp_header_view(const Container& buf)
: buf_{ buf }
{
}
};

struct mcbp_header_layout {
std::uint8_t magic;
std::uint8_t opcode;
union {
std::uint16_t normal;
struct {
std::uint8_t framing_extras;
std::uint8_t key;
} alt;
} keylen;
std::uint8_t extlen;
std::uint8_t datatype;
std::uint16_t specific;
std::uint32_t bodylen;
std::uint32_t opaque;
std::uint64_t cas;

[[nodiscard]] constexpr auto specific_name() const -> std::string_view
{
if (magic == 0x18 || magic == 0x81) {
return "status";
}
return "vbucket";
}

[[nodiscard]] constexpr auto key_length() const -> std::uint16_t
{
if (magic == 0x18 || magic == 0x08) {
return keylen.alt.key;
}
return couchbase::core::utils::byte_swap(keylen.normal);
}

[[nodiscard]] constexpr auto framing_extras_length() const -> std::uint8_t
{
if (magic == 0x18 || magic == 0x08) {
return keylen.alt.framing_extras;
}
return 0;
}
};
} // namespace

template<typename Container>
struct fmt::formatter<mcbp_header_view<Container>> {
template<typename ParseContext>
constexpr auto parse(ParseContext& ctx)
{
return ctx.begin();
}

template<typename FormatContext>
auto format(const mcbp_header_view<Container>& view, FormatContext& ctx) const
{
if (view.buf_.size() < sizeof(couchbase::core::io::binary_header)) {
return format_to(ctx.out(), "{:n}", spdlog::to_hex(view.buf_));
}

const auto* header = reinterpret_cast<const mcbp_header_layout*>(view.buf_.data());
return format_to(
ctx.out(),
"{{magic=0x{:x}, opcode=0x{:x}, fextlen={}, keylen={}, extlen={}, datatype={}, {}={}, bodylen={}, opaque={}, cas={}}}",
header->magic,
header->opcode,
header->framing_extras_length(),
header->key_length(),
header->extlen,
header->datatype,
header->specific_name(),
couchbase::core::utils::byte_swap(header->specific),
couchbase::core::utils::byte_swap(header->bodylen),
couchbase::core::utils::byte_swap(header->opaque),
couchbase::core::utils::byte_swap(header->cas));
}
};

namespace couchbase::core::io
{
struct connection_endpoints {
Expand Down Expand Up @@ -887,9 +975,7 @@ class mcbp_session_impl
if (stopped_) {
return;
}
std::uint32_t opaque{ 0 };
std::memcpy(&opaque, buf.data() + 12, sizeof(opaque));
CB_LOG_TRACE("{} MCBP send, opaque={}, {:n}", log_prefix_, utils::byte_swap(opaque), spdlog::to_hex(buf.begin(), buf.begin() + 24));
CB_LOG_TRACE("{} MCBP send {}", log_prefix_, mcbp_header_view(buf));
std::scoped_lock lock(output_buffer_mutex_);
output_buffer_.emplace_back(std::move(buf));
}
Expand Down Expand Up @@ -1126,11 +1212,17 @@ class mcbp_session_impl
return;
}
if (config == config_) {
CB_LOG_TRACE("{} received a configuration with identical revision (rev={}), ignoring", log_prefix_, config.rev_str());
CB_LOG_TRACE("{} received a configuration with identical revision (new={}, old={}), ignoring",
log_prefix_,
config.rev_str(),
config_->rev_str());
return;
}
if (config < config_) {
CB_LOG_DEBUG("{} received a configuration with older revision, ignoring", log_prefix_);
CB_LOG_DEBUG("{} received a configuration with older revision (new={}, old={}), ignoring",
log_prefix_,
config.rev_str(),
config_->rev_str());
return;
}
}
Expand Down Expand Up @@ -1429,10 +1521,7 @@ class mcbp_session_impl
if (self->stopped_) {
return;
}
CB_LOG_TRACE("{} MCBP recv, opaque={}, {:n}",
self->log_prefix_,
utils::byte_swap(msg.header.opaque),
spdlog::to_hex(msg.header_data()));
CB_LOG_TRACE("{} MCBP recv {}", self->log_prefix_, mcbp_header_view(msg.header_data()));
if (self->bootstrapped_) {
self->handler_->handle(std::move(msg));
} else {
Expand Down
3 changes: 2 additions & 1 deletion core/io/retry_orchestrator.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ retry_with_duration(std::shared_ptr<Manager> manager,
std::chrono::milliseconds duration)
{
command->request.retries.record_retry_attempt(reason);
CB_LOG_TRACE(R"({} retrying operation {} (duration={}ms, id="{}", reason={}, attempts={}, last_dispatched_to=\"{}\"))",
CB_LOG_TRACE(R"({} retrying operation {} (duration={}ms, id="{}", vbucket_id={}, reason={}, attempts={}, last_dispatched_to="{}"))",
manager->log_prefix(),
decltype(command->request)::encoded_request_type::body_type::opcode,
duration.count(),
command->id_,
command->request.partition,
reason,
command->request.retries.retry_attempts(),
command->session_ ? command->session_->remote_address() : "");
Expand Down
5 changes: 5 additions & 0 deletions core/protocol/client_request.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class client_request
partition_ = val;
}

[[nodiscard]] auto partition() const -> std::uint16_t
{
return partition_;
}

Body& body()
{
return body_;
Expand Down

0 comments on commit a6b2964

Please sign in to comment.