Skip to content

Commit

Permalink
CXXCBC-606: Fix detection of disfunctional node
Browse files Browse the repository at this point in the history
* remove "reopen()" API of the stream, the close is asynchronous
  anyway, so it is not correct to assume that the socket will be ready
  immediately.

* remove open_ flag for stream, and instead proxy it directly to the
  socket to avoid "operation is already in progress" type of errors when
  trying to open the same socket twice.

* re-arm connection_deadline_ when the socket is ready for data, use KV
  timeout for whole handshake with the single node. Re-initiate
  bootstrap using next bootstrap address in case of single node timeout.

* reset reading_ flag in case of error. Ensure that the flag is reset if
  the session is not reading.

* handle socket_closed_while_in_flight error, and if it happens during
  bootstrap, try next node in the list, otherwise preserve old behavior
  (close the connection, and let bucket to reopen it).
  • Loading branch information
avsej committed Oct 10, 2024
1 parent fdba46a commit a5cae8c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 71 deletions.
78 changes: 46 additions & 32 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1084,20 +1084,21 @@ class mcbp_session_impl
return;
}
bootstrapped_ = false;
if (bootstrap_handler_) {
last_bootstrap_error_ = std::move(bootstrap_handler_)->last_bootstrap_error();
if (auto handler = std::move(bootstrap_handler_); handler) {
last_bootstrap_error_ = handler->last_bootstrap_error();
}
bootstrap_handler_ = nullptr;
state_ = diag::endpoint_state::connecting;
if (stream_->is_open()) {
std::string old_id = stream_->id();
stream_->reopen();
CB_LOG_TRACE(R"({} reopen socket connection "{}" -> "{}", host="{}", port={})",
log_prefix_,
old_id,
stream_->id(),
bootstrap_hostname_,
bootstrap_port_);
return stream_->close([self = shared_from_this(), old_id](std::error_code) {
CB_LOG_DEBUG(R"({} reopened socket connection "{}" -> "{}", host="{}", port={})",
self->log_prefix_,
old_id,
self->stream_->id(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}
if (origin_.exhausted()) {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
Expand Down Expand Up @@ -1174,6 +1175,20 @@ class mcbp_session_impl
if (stopped_) {
return;
}

if (reason == retry_reason::socket_closed_while_in_flight && !bootstrapped_) {
return stream_->close([self = shared_from_this(), old_id = stream_->id()](std::error_code) {
CB_LOG_DEBUG(
R"({} reopened socket connection due to IO error, "{}" -> "{}", host="{}", port={})",
self->log_prefix_,
old_id,
self->stream_->id(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}

state_ = diag::endpoint_state::disconnecting;
CB_LOG_DEBUG("{} stop MCBP connection, reason={}", log_prefix_, reason);
stopped_ = true;
Expand Down Expand Up @@ -1653,6 +1668,7 @@ class mcbp_session_impl
private:
void invoke_bootstrap_handler(std::error_code ec)
{
connection_deadline_.cancel();
retry_backoff_.cancel();

if (ec && state_listener_) {
Expand Down Expand Up @@ -1734,9 +1750,7 @@ class mcbp_session_impl
if (timer_ec == asio::error::operation_aborted || self->stopped_) {
return;
}
return self->stream_->close([self](std::error_code) {
self->initiate_bootstrap();
});
self->initiate_bootstrap();
});
}

Expand Down Expand Up @@ -1768,12 +1782,7 @@ class mcbp_session_impl
port,
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->stream_->close([self](std::error_code ec) {
self->last_bootstrap_error_ = {
ec, ec.message(), self->bootstrap_hostname_, self->bootstrap_port_
};
self->initiate_bootstrap();
});
self->initiate_bootstrap();
});
stream_->async_connect(it->endpoint(), [capture0 = shared_from_this(), it](auto&& PH1) {
capture0->on_connect(std::forward<decltype(PH1)>(PH1), it);
Expand Down Expand Up @@ -1853,23 +1862,26 @@ class mcbp_session_impl
connection_endpoints_.remote_address,
connection_endpoints_.remote.port());
parser_.reset();
{
const std::scoped_lock lock(output_buffer_mutex_);
output_buffer_.clear();
}
bootstrap_handler_ = std::make_shared<bootstrap_handler>(shared_from_this());
connection_deadline_.cancel();
}
}

void check_deadline(std::error_code ec)
{
if (ec == asio::error::operation_aborted || stopped_) {
return;
}
if (connection_deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
stream_->close([](std::error_code) {
connection_deadline_.expires_after(origin_.options().key_value_timeout);
connection_deadline_.async_wait([self = shared_from_this()](const auto timer_ec) {
if (timer_ec == asio::error::operation_aborted || self->stopped_) {
return;
}
CB_LOG_DEBUG("{} unable to boostrap single node at {}:{} (\"{}:{}\") in time, reconnecting",
self->log_prefix_,
self->connection_endpoints_.remote_address,
self->connection_endpoints_.remote.port(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}
connection_deadline_.async_wait([capture0 = shared_from_this()](auto&& PH1) {
capture0->check_deadline(std::forward<decltype(PH1)>(PH1));
});
}

void do_read()
Expand All @@ -1883,6 +1895,7 @@ class mcbp_session_impl
[self = shared_from_this(), stream_id = stream_->id()](std::error_code ec,
std::size_t bytes_transferred) {
if (ec == asio::error::operation_aborted || self->stopped_) {
self->reading_ = false;
CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, bytes_received={}",
self->connection_endpoints_.remote_address,
self->connection_endpoints_.remote.port(),
Expand All @@ -1901,6 +1914,7 @@ class mcbp_session_impl

self->last_active_ = std::chrono::steady_clock::now();
if (ec) {
self->reading_ = false;
if (stream_id != self->stream_->id()) {
CB_LOG_ERROR(
R"({} ignore IO error while reading from the socket: {} ({}), old_id="{}", new_id="{}")",
Expand Down
73 changes: 34 additions & 39 deletions core/io/streams.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected:
asio::strand<asio::io_context::executor_type> strand_;
bool tls_;
std::string id_{};
std::atomic_bool open_{ false };

public:
stream_impl(asio::io_context& ctx, bool is_tls)
Expand All @@ -81,21 +80,16 @@ public:
return id_;
}

[[nodiscard]] auto is_open() const -> bool
{
return open_;
}

auto get_executor() const noexcept
[[nodiscard]] auto get_executor() const noexcept
{
return strand_;
}

[[nodiscard]] virtual auto local_endpoint() const -> asio::ip::tcp::endpoint = 0;

virtual void close(utils::movable_function<void(std::error_code)>&& handler) = 0;
[[nodiscard]] virtual auto is_open() const -> bool = 0;

virtual void reopen() = 0;
virtual void close(utils::movable_function<void(std::error_code)>&& handler) = 0;

virtual void set_options() = 0;

Expand Down Expand Up @@ -133,28 +127,30 @@ public:
return res;
}

[[nodiscard]] auto is_open() const -> bool override
{
if (stream_) {
return stream_->is_open();
}
return false;
}

void close(utils::movable_function<void(std::error_code)>&& handler) override
{
open_ = false;
return asio::post(strand_, [stream = stream_, h = std::move(handler)]() {
id_ = uuid::to_string(uuid::random());
auto s = std::make_shared<asio::ip::tcp::socket>(strand_);
std::swap(stream_, s);
return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() {
asio::error_code ec{};
stream->shutdown(asio::socket_base::shutdown_both, ec);
stream->close(ec);
h(ec);
});
}

void reopen() override
{
return close([this](std::error_code) {
id_ = uuid::to_string(uuid::random());
stream_ = std::make_shared<asio::ip::tcp::socket>(strand_);
});
}

void set_options() override
{
if (!open_ || !stream_) {
if (!is_open()) {
return;
}
std::error_code ec{};
Expand All @@ -165,10 +161,7 @@ public:
void async_connect(const asio::ip::tcp::resolver::results_type::endpoint_type& endpoint,
utils::movable_function<void(std::error_code)>&& handler) override
{
return stream_->async_connect(endpoint, [this, h = std::move(handler)](std::error_code ec) {
open_ = stream_->is_open();
h(ec);
});
return stream_->async_connect(endpoint, std::move(handler));
}

void async_write(std::vector<asio::const_buffer>& buffers,
Expand Down Expand Up @@ -211,29 +204,31 @@ public:
return res;
}

[[nodiscard]] auto is_open() const -> bool override
{
if (stream_) {
return stream_->lowest_layer().is_open();
}
return false;
}

void close(utils::movable_function<void(std::error_code)>&& handler) override
{
open_ = false;
return asio::post(strand_, [stream = stream_, h = std::move(handler)]() {
id_ = uuid::to_string(uuid::random());
auto s = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(
asio::ip::tcp::socket(strand_), tls_);
std::swap(stream_, s);
return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() {
asio::error_code ec{};
stream->lowest_layer().shutdown(asio::socket_base::shutdown_both, ec);
stream->lowest_layer().close(ec);
h(ec);
});
}

void reopen() override
{
return close([this](std::error_code) {
id_ = uuid::to_string(uuid::random());
stream_ = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(
asio::ip::tcp::socket(strand_), tls_);
});
}

void set_options() override
{
if (!open_ || !stream_) {
if (!is_open()) {
return;
}
std::error_code ec{};
Expand All @@ -245,15 +240,15 @@ public:
utils::movable_function<void(std::error_code)>&& handler) override
{
return stream_->lowest_layer().async_connect(
endpoint, [this, handler = std::move(handler)](std::error_code ec_connect) mutable {
endpoint,
[stream = stream_, handler = std::move(handler)](std::error_code ec_connect) mutable {
if (ec_connect == asio::error::operation_aborted) {
return;
}
if (ec_connect) {
return handler(ec_connect);
}
open_ = stream_->lowest_layer().is_open();
stream_->async_handshake(
stream->async_handshake(
asio::ssl::stream_base::client,
[handler = std::move(handler)](std::error_code ec_handshake) mutable {
if (ec_handshake == asio::error::operation_aborted) {
Expand Down

0 comments on commit a5cae8c

Please sign in to comment.