Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXXCBC-606: Fix detection of disfunctional node #673

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 48 additions & 32 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1084,20 +1084,21 @@ class mcbp_session_impl
return;
}
bootstrapped_ = false;
if (bootstrap_handler_) {
last_bootstrap_error_ = std::move(bootstrap_handler_)->last_bootstrap_error();
if (auto handler = std::move(bootstrap_handler_); handler) {
last_bootstrap_error_ = handler->last_bootstrap_error();
}
bootstrap_handler_ = nullptr;
state_ = diag::endpoint_state::connecting;
if (stream_->is_open()) {
std::string old_id = stream_->id();
stream_->reopen();
CB_LOG_TRACE(R"({} reopen socket connection "{}" -> "{}", host="{}", port={})",
log_prefix_,
old_id,
stream_->id(),
bootstrap_hostname_,
bootstrap_port_);
return stream_->close([self = shared_from_this(), old_id](std::error_code) {
CB_LOG_DEBUG(R"({} reopened socket connection "{}" -> "{}", host="{}", port={})",
self->log_prefix_,
old_id,
self->stream_->id(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}
if (origin_.exhausted()) {
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
Expand Down Expand Up @@ -1174,6 +1175,20 @@ class mcbp_session_impl
if (stopped_) {
return;
}

if (reason == retry_reason::socket_closed_while_in_flight && !bootstrapped_) {
return stream_->close([self = shared_from_this(), old_id = stream_->id()](std::error_code) {
CB_LOG_DEBUG(
R"({} reopened socket connection due to IO error, "{}" -> "{}", host="{}", port={})",
self->log_prefix_,
old_id,
self->stream_->id(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}

state_ = diag::endpoint_state::disconnecting;
CB_LOG_DEBUG("{} stop MCBP connection, reason={}", log_prefix_, reason);
stopped_ = true;
Expand Down Expand Up @@ -1653,6 +1668,7 @@ class mcbp_session_impl
private:
void invoke_bootstrap_handler(std::error_code ec)
{
connection_deadline_.cancel();
retry_backoff_.cancel();

if (ec && state_listener_) {
Expand Down Expand Up @@ -1716,6 +1732,7 @@ class mcbp_session_impl
if (ec == asio::error::operation_aborted || stopped_) {
return;
}
connection_deadline_.cancel();
last_active_ = std::chrono::steady_clock::now();
if (ec) {
CB_LOG_ERROR("{} error on resolve: {} ({})", log_prefix_, ec.value(), ec.message());
Expand All @@ -1734,9 +1751,7 @@ class mcbp_session_impl
if (timer_ec == asio::error::operation_aborted || self->stopped_) {
return;
}
return self->stream_->close([self](std::error_code) {
self->initiate_bootstrap();
});
self->initiate_bootstrap();
});
}

Expand Down Expand Up @@ -1768,12 +1783,7 @@ class mcbp_session_impl
port,
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->stream_->close([self](std::error_code ec) {
self->last_bootstrap_error_ = {
ec, ec.message(), self->bootstrap_hostname_, self->bootstrap_port_
};
self->initiate_bootstrap();
});
self->initiate_bootstrap();
});
stream_->async_connect(it->endpoint(), [capture0 = shared_from_this(), it](auto&& PH1) {
capture0->on_connect(std::forward<decltype(PH1)>(PH1), it);
Expand Down Expand Up @@ -1801,6 +1811,7 @@ class mcbp_session_impl
if (ec == asio::error::operation_aborted || stopped_) {
return;
}
connection_deadline_.cancel();
last_active_ = std::chrono::steady_clock::now();
if (!stream_->is_open() || ec) {
#ifdef COUCHBASE_CXX_CLIENT_STATIC_BORINGSSL
Expand Down Expand Up @@ -1853,23 +1864,26 @@ class mcbp_session_impl
connection_endpoints_.remote_address,
connection_endpoints_.remote.port());
parser_.reset();
{
const std::scoped_lock lock(output_buffer_mutex_);
output_buffer_.clear();
}
bootstrap_handler_ = std::make_shared<bootstrap_handler>(shared_from_this());
connection_deadline_.cancel();
}
}

void check_deadline(std::error_code ec)
{
if (ec == asio::error::operation_aborted || stopped_) {
return;
}
if (connection_deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
stream_->close([](std::error_code) {
connection_deadline_.expires_after(origin_.options().key_value_timeout);
connection_deadline_.async_wait([self = shared_from_this()](const auto timer_ec) {
if (timer_ec == asio::error::operation_aborted || self->stopped_) {
return;
}
CB_LOG_DEBUG("{} unable to boostrap single node at {}:{} (\"{}:{}\") in time, reconnecting",
self->log_prefix_,
self->connection_endpoints_.remote_address,
self->connection_endpoints_.remote.port(),
self->bootstrap_hostname_,
self->bootstrap_port_);
return self->initiate_bootstrap();
});
}
connection_deadline_.async_wait([capture0 = shared_from_this()](auto&& PH1) {
capture0->check_deadline(std::forward<decltype(PH1)>(PH1));
});
}

void do_read()
Expand All @@ -1883,6 +1897,7 @@ class mcbp_session_impl
[self = shared_from_this(), stream_id = stream_->id()](std::error_code ec,
std::size_t bytes_transferred) {
if (ec == asio::error::operation_aborted || self->stopped_) {
self->reading_ = false;
CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, bytes_received={}",
self->connection_endpoints_.remote_address,
self->connection_endpoints_.remote.port(),
Expand All @@ -1901,6 +1916,7 @@ class mcbp_session_impl

self->last_active_ = std::chrono::steady_clock::now();
if (ec) {
self->reading_ = false;
if (stream_id != self->stream_->id()) {
CB_LOG_ERROR(
R"({} ignore IO error while reading from the socket: {} ({}), old_id="{}", new_id="{}")",
Expand Down
73 changes: 34 additions & 39 deletions core/io/streams.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected:
asio::strand<asio::io_context::executor_type> strand_;
bool tls_;
std::string id_{};
std::atomic_bool open_{ false };

public:
stream_impl(asio::io_context& ctx, bool is_tls)
Expand All @@ -81,21 +80,16 @@ public:
return id_;
}

[[nodiscard]] auto is_open() const -> bool
{
return open_;
}

auto get_executor() const noexcept
[[nodiscard]] auto get_executor() const noexcept
{
return strand_;
}

[[nodiscard]] virtual auto local_endpoint() const -> asio::ip::tcp::endpoint = 0;

virtual void close(utils::movable_function<void(std::error_code)>&& handler) = 0;
[[nodiscard]] virtual auto is_open() const -> bool = 0;

virtual void reopen() = 0;
virtual void close(utils::movable_function<void(std::error_code)>&& handler) = 0;

virtual void set_options() = 0;

Expand Down Expand Up @@ -133,28 +127,30 @@ public:
return res;
}

[[nodiscard]] auto is_open() const -> bool override
{
if (stream_) {
return stream_->is_open();
}
return false;
}

void close(utils::movable_function<void(std::error_code)>&& handler) override
{
open_ = false;
return asio::post(strand_, [stream = stream_, h = std::move(handler)]() {
id_ = uuid::to_string(uuid::random());
auto s = std::make_shared<asio::ip::tcp::socket>(strand_);
std::swap(stream_, s);
return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() {
asio::error_code ec{};
stream->shutdown(asio::socket_base::shutdown_both, ec);
stream->close(ec);
h(ec);
});
}

void reopen() override
{
return close([this](std::error_code) {
id_ = uuid::to_string(uuid::random());
stream_ = std::make_shared<asio::ip::tcp::socket>(strand_);
});
}

void set_options() override
{
if (!open_ || !stream_) {
if (!is_open()) {
return;
}
std::error_code ec{};
Expand All @@ -165,10 +161,7 @@ public:
void async_connect(const asio::ip::tcp::resolver::results_type::endpoint_type& endpoint,
utils::movable_function<void(std::error_code)>&& handler) override
{
return stream_->async_connect(endpoint, [this, h = std::move(handler)](std::error_code ec) {
open_ = stream_->is_open();
h(ec);
});
return stream_->async_connect(endpoint, std::move(handler));
}

void async_write(std::vector<asio::const_buffer>& buffers,
Expand Down Expand Up @@ -211,29 +204,31 @@ public:
return res;
}

[[nodiscard]] auto is_open() const -> bool override
{
if (stream_) {
return stream_->lowest_layer().is_open();
}
return false;
}

void close(utils::movable_function<void(std::error_code)>&& handler) override
{
open_ = false;
return asio::post(strand_, [stream = stream_, h = std::move(handler)]() {
id_ = uuid::to_string(uuid::random());
auto s = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(
asio::ip::tcp::socket(strand_), tls_);
std::swap(stream_, s);
return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() {
asio::error_code ec{};
stream->lowest_layer().shutdown(asio::socket_base::shutdown_both, ec);
stream->lowest_layer().close(ec);
h(ec);
});
}

void reopen() override
{
return close([this](std::error_code) {
id_ = uuid::to_string(uuid::random());
stream_ = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(
asio::ip::tcp::socket(strand_), tls_);
});
}

void set_options() override
{
if (!open_ || !stream_) {
if (!is_open()) {
return;
}
std::error_code ec{};
Expand All @@ -245,15 +240,15 @@ public:
utils::movable_function<void(std::error_code)>&& handler) override
{
return stream_->lowest_layer().async_connect(
endpoint, [this, handler = std::move(handler)](std::error_code ec_connect) mutable {
endpoint,
[stream = stream_, handler = std::move(handler)](std::error_code ec_connect) mutable {
if (ec_connect == asio::error::operation_aborted) {
return;
}
if (ec_connect) {
return handler(ec_connect);
}
open_ = stream_->lowest_layer().is_open();
stream_->async_handshake(
stream->async_handshake(
asio::ssl::stream_base::client,
[handler = std::move(handler)](std::error_code ec_handshake) mutable {
if (ec_handshake == asio::error::operation_aborted) {
Expand Down
Loading