diff --git a/core/io/mcbp_session.cxx b/core/io/mcbp_session.cxx index 8820466e..a38172cc 100644 --- a/core/io/mcbp_session.cxx +++ b/core/io/mcbp_session.cxx @@ -1084,20 +1084,21 @@ class mcbp_session_impl return; } bootstrapped_ = false; - if (bootstrap_handler_) { - last_bootstrap_error_ = std::move(bootstrap_handler_)->last_bootstrap_error(); + if (auto handler = std::move(bootstrap_handler_); handler) { + last_bootstrap_error_ = handler->last_bootstrap_error(); } - bootstrap_handler_ = nullptr; state_ = diag::endpoint_state::connecting; if (stream_->is_open()) { std::string old_id = stream_->id(); - stream_->reopen(); - CB_LOG_TRACE(R"({} reopen socket connection "{}" -> "{}", host="{}", port={})", - log_prefix_, - old_id, - stream_->id(), - bootstrap_hostname_, - bootstrap_port_); + return stream_->close([self = shared_from_this(), old_id](std::error_code) { + CB_LOG_DEBUG(R"({} reopened socket connection "{}" -> "{}", host="{}", port={})", + self->log_prefix_, + old_id, + self->stream_->id(), + self->bootstrap_hostname_, + self->bootstrap_port_); + return self->initiate_bootstrap(); + }); } if (origin_.exhausted()) { #ifdef COUCHBASE_CXX_CLIENT_COLUMNAR @@ -1174,6 +1175,20 @@ class mcbp_session_impl if (stopped_) { return; } + + if (reason == retry_reason::socket_closed_while_in_flight && !bootstrapped_) { + return stream_->close([self = shared_from_this(), old_id = stream_->id()](std::error_code) { + CB_LOG_DEBUG( + R"({} reopened socket connection due to IO error, "{}" -> "{}", host="{}", port={})", + self->log_prefix_, + old_id, + self->stream_->id(), + self->bootstrap_hostname_, + self->bootstrap_port_); + return self->initiate_bootstrap(); + }); + } + state_ = diag::endpoint_state::disconnecting; CB_LOG_DEBUG("{} stop MCBP connection, reason={}", log_prefix_, reason); stopped_ = true; @@ -1653,6 +1668,7 @@ class mcbp_session_impl private: void invoke_bootstrap_handler(std::error_code ec) { + connection_deadline_.cancel(); retry_backoff_.cancel(); if (ec && state_listener_) { @@ -1716,6 +1732,7 @@ class mcbp_session_impl if (ec == asio::error::operation_aborted || stopped_) { return; } + connection_deadline_.cancel(); last_active_ = std::chrono::steady_clock::now(); if (ec) { CB_LOG_ERROR("{} error on resolve: {} ({})", log_prefix_, ec.value(), ec.message()); @@ -1734,9 +1751,7 @@ class mcbp_session_impl if (timer_ec == asio::error::operation_aborted || self->stopped_) { return; } - return self->stream_->close([self](std::error_code) { - self->initiate_bootstrap(); - }); + self->initiate_bootstrap(); }); } @@ -1768,12 +1783,7 @@ class mcbp_session_impl port, self->bootstrap_hostname_, self->bootstrap_port_); - return self->stream_->close([self](std::error_code ec) { - self->last_bootstrap_error_ = { - ec, ec.message(), self->bootstrap_hostname_, self->bootstrap_port_ - }; - self->initiate_bootstrap(); - }); + self->initiate_bootstrap(); }); stream_->async_connect(it->endpoint(), [capture0 = shared_from_this(), it](auto&& PH1) { capture0->on_connect(std::forward(PH1), it); @@ -1801,6 +1811,7 @@ class mcbp_session_impl if (ec == asio::error::operation_aborted || stopped_) { return; } + connection_deadline_.cancel(); last_active_ = std::chrono::steady_clock::now(); if (!stream_->is_open() || ec) { #ifdef COUCHBASE_CXX_CLIENT_STATIC_BORINGSSL @@ -1853,23 +1864,26 @@ class mcbp_session_impl connection_endpoints_.remote_address, connection_endpoints_.remote.port()); parser_.reset(); + { + const std::scoped_lock lock(output_buffer_mutex_); + output_buffer_.clear(); + } bootstrap_handler_ = std::make_shared(shared_from_this()); - connection_deadline_.cancel(); - } - } - void check_deadline(std::error_code ec) - { - if (ec == asio::error::operation_aborted || stopped_) { - return; - } - if (connection_deadline_.expiry() <= asio::steady_timer::clock_type::now()) { - stream_->close([](std::error_code) { + connection_deadline_.expires_after(origin_.options().key_value_timeout); + connection_deadline_.async_wait([self = shared_from_this()](const auto timer_ec) { + if (timer_ec == asio::error::operation_aborted || self->stopped_) { + return; + } + CB_LOG_DEBUG("{} unable to boostrap single node at {}:{} (\"{}:{}\") in time, reconnecting", + self->log_prefix_, + self->connection_endpoints_.remote_address, + self->connection_endpoints_.remote.port(), + self->bootstrap_hostname_, + self->bootstrap_port_); + return self->initiate_bootstrap(); }); } - connection_deadline_.async_wait([capture0 = shared_from_this()](auto&& PH1) { - capture0->check_deadline(std::forward(PH1)); - }); } void do_read() @@ -1883,6 +1897,7 @@ class mcbp_session_impl [self = shared_from_this(), stream_id = stream_->id()](std::error_code ec, std::size_t bytes_transferred) { if (ec == asio::error::operation_aborted || self->stopped_) { + self->reading_ = false; CB_LOG_PROTOCOL("[MCBP, IN] host=\"{}\", port={}, rc={}, bytes_received={}", self->connection_endpoints_.remote_address, self->connection_endpoints_.remote.port(), @@ -1901,6 +1916,7 @@ class mcbp_session_impl self->last_active_ = std::chrono::steady_clock::now(); if (ec) { + self->reading_ = false; if (stream_id != self->stream_->id()) { CB_LOG_ERROR( R"({} ignore IO error while reading from the socket: {} ({}), old_id="{}", new_id="{}")", diff --git a/core/io/streams.hxx b/core/io/streams.hxx index 1510f35e..5584956a 100644 --- a/core/io/streams.hxx +++ b/core/io/streams.hxx @@ -59,7 +59,6 @@ protected: asio::strand strand_; bool tls_; std::string id_{}; - std::atomic_bool open_{ false }; public: stream_impl(asio::io_context& ctx, bool is_tls) @@ -81,21 +80,16 @@ public: return id_; } - [[nodiscard]] auto is_open() const -> bool - { - return open_; - } - - auto get_executor() const noexcept + [[nodiscard]] auto get_executor() const noexcept { return strand_; } [[nodiscard]] virtual auto local_endpoint() const -> asio::ip::tcp::endpoint = 0; - virtual void close(utils::movable_function&& handler) = 0; + [[nodiscard]] virtual auto is_open() const -> bool = 0; - virtual void reopen() = 0; + virtual void close(utils::movable_function&& handler) = 0; virtual void set_options() = 0; @@ -133,10 +127,20 @@ public: return res; } + [[nodiscard]] auto is_open() const -> bool override + { + if (stream_) { + return stream_->is_open(); + } + return false; + } + void close(utils::movable_function&& handler) override { - open_ = false; - return asio::post(strand_, [stream = stream_, h = std::move(handler)]() { + id_ = uuid::to_string(uuid::random()); + auto s = std::make_shared(strand_); + std::swap(stream_, s); + return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() { asio::error_code ec{}; stream->shutdown(asio::socket_base::shutdown_both, ec); stream->close(ec); @@ -144,17 +148,9 @@ public: }); } - void reopen() override - { - return close([this](std::error_code) { - id_ = uuid::to_string(uuid::random()); - stream_ = std::make_shared(strand_); - }); - } - void set_options() override { - if (!open_ || !stream_) { + if (!is_open()) { return; } std::error_code ec{}; @@ -165,10 +161,7 @@ public: void async_connect(const asio::ip::tcp::resolver::results_type::endpoint_type& endpoint, utils::movable_function&& handler) override { - return stream_->async_connect(endpoint, [this, h = std::move(handler)](std::error_code ec) { - open_ = stream_->is_open(); - h(ec); - }); + return stream_->async_connect(endpoint, std::move(handler)); } void async_write(std::vector& buffers, @@ -211,10 +204,21 @@ public: return res; } + [[nodiscard]] auto is_open() const -> bool override + { + if (stream_) { + return stream_->lowest_layer().is_open(); + } + return false; + } + void close(utils::movable_function&& handler) override { - open_ = false; - return asio::post(strand_, [stream = stream_, h = std::move(handler)]() { + id_ = uuid::to_string(uuid::random()); + auto s = std::make_shared>( + asio::ip::tcp::socket(strand_), tls_); + std::swap(stream_, s); + return asio::post(strand_, [stream = std::move(s), h = std::move(handler)]() { asio::error_code ec{}; stream->lowest_layer().shutdown(asio::socket_base::shutdown_both, ec); stream->lowest_layer().close(ec); @@ -222,18 +226,9 @@ public: }); } - void reopen() override - { - return close([this](std::error_code) { - id_ = uuid::to_string(uuid::random()); - stream_ = std::make_shared>( - asio::ip::tcp::socket(strand_), tls_); - }); - } - void set_options() override { - if (!open_ || !stream_) { + if (!is_open()) { return; } std::error_code ec{}; @@ -245,15 +240,15 @@ public: utils::movable_function&& handler) override { return stream_->lowest_layer().async_connect( - endpoint, [this, handler = std::move(handler)](std::error_code ec_connect) mutable { + endpoint, + [stream = stream_, handler = std::move(handler)](std::error_code ec_connect) mutable { if (ec_connect == asio::error::operation_aborted) { return; } if (ec_connect) { return handler(ec_connect); } - open_ = stream_->lowest_layer().is_open(); - stream_->async_handshake( + stream->async_handshake( asio::ssl::stream_base::client, [handler = std::move(handler)](std::error_code ec_handshake) mutable { if (ec_handshake == asio::error::operation_aborted) {