diff --git a/bin/src/ctl/command.rs b/bin/src/ctl/command.rs index 59e0b07a0..474ee5ceb 100644 --- a/bin/src/ctl/command.rs +++ b/bin/src/ctl/command.rs @@ -46,6 +46,12 @@ impl CommandManager { if !self.json { debug!("Processing: {}", response.message); } + if let Some(ResponseContent { + content_type: Some(ContentType::Event(event)), + }) = response.content + { + println!("{event:?}"); + } } ResponseStatus::Failure => return Err(CtlError::Failure(response.message)), ResponseStatus::Ok => return Ok(response), diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index af97c8e10..9a6123ffe 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -149,7 +149,7 @@ impl Response { } impl ResponseContent { - fn display(&self, json: bool) -> Result<(), DisplayError> { + pub fn display(&self, json: bool) -> Result<(), DisplayError> { let content_type = match &self.content_type { Some(content_type) => content_type, None => return Ok(println!("No content")), diff --git a/lib/src/backends.rs b/lib/src/backends.rs index 96c587efd..5baf2cd14 100644 --- a/lib/src/backends.rs +++ b/lib/src/backends.rs @@ -297,7 +297,8 @@ impl BackendMap { })?; self.available = true; - Ok((next_backend.clone(), tcp_stream)) + drop(borrowed_backend); + Ok((next_backend, tcp_stream)) } pub fn backend_from_sticky_session( diff --git a/lib/src/http.rs b/lib/src/http.rs index abd081111..63fea8fb4 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -259,15 +259,16 @@ impl HttpSession { return None; }; let backend = mux.router.backends.remove(&back_token).unwrap(); - let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) = + let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) = match backend { mux::Connection::H1(mux::ConnectionH1 { - position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), + position: + mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected), readiness, socket, timeout_container, .. - }) => (cluster_id, readiness, socket, timeout_container), + }) => (cluster_id, backend, readiness, socket, timeout_container), mux::Connection::H1(_) => { error!("The backend disconnected just after upgrade, abort"); return None; @@ -283,11 +284,12 @@ impl HttpSession { container_frontend_timeout.reset(); container_backend_timeout.reset(); + let backend_id = backend.borrow().backend_id.clone(); let mut pipe = Pipe::new( stream.back.storage.buffer, - None, + Some(backend_id), Some(backend_socket), - None, + Some(backend), Some(container_backend_timeout), Some(container_frontend_timeout), Some(cluster_id), @@ -307,7 +309,6 @@ impl HttpSession { gauge_add!("protocol.http", -1); gauge_add!("protocol.ws", 1); - gauge_add!("http.active_requests", -1); gauge_add!("websocket.active_requests", 1); Some(HttpStateMachine::WebSocket(pipe)) } diff --git a/lib/src/https.rs b/lib/src/https.rs index 1d57e2169..f8a134b8b 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -343,15 +343,16 @@ impl HttpsSession { return None; }; let backend = mux.router.backends.remove(&back_token).unwrap(); - let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) = + let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) = match backend { mux::Connection::H1(mux::ConnectionH1 { - position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), + position: + mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected), readiness, socket, timeout_container, .. - }) => (cluster_id, readiness, socket, timeout_container), + }) => (cluster_id, backend, readiness, socket, timeout_container), mux::Connection::H1(_) => { error!("The backend disconnected just after upgrade, abort"); return None; @@ -367,11 +368,12 @@ impl HttpsSession { container_frontend_timeout.reset(); container_backend_timeout.reset(); + let backend_id = backend.borrow().backend_id.clone(); let mut pipe = Pipe::new( stream.back.storage.buffer, - None, + Some(backend_id), Some(backend_socket), - None, + Some(backend), Some(container_backend_timeout), Some(container_frontend_timeout), Some(cluster_id), @@ -391,7 +393,6 @@ impl HttpsSession { gauge_add!("protocol.https", -1); gauge_add!("protocol.wss", 1); - gauge_add!("http.active_requests", -1); gauge_add!("websocket.active_requests", 1); Some(HttpsStateMachine::WebSocket(pipe)) } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index e5a2ee499..5645870fb 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -583,7 +583,7 @@ pub enum BackendConnectAction { pub enum BackendConnectionError { #[error("Not found: {0:?}")] NotFound(ObjectKind), - #[error("Too many connections on cluster {0:?}")] + #[error("Too many failed attemps on cluster {0:?}")] MaxConnectionRetries(Option), #[error("the sessions slab has reached maximum capacity")] MaxSessionsMemory, diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index 81d1637e9..c0a93840c 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -3,7 +3,9 @@ use sozu_command::ready::Ready; use crate::{ println_, protocol::mux::{ - debug_kawa, forcefully_terminate_answer, parser::H2Error, set_default_answer, update_readiness_after_read, update_readiness_after_write, BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult, Position, StreamState + debug_kawa, forcefully_terminate_answer, parser::H2Error, set_default_answer, + update_readiness_after_read, update_readiness_after_write, BackendStatus, Context, + Endpoint, GlobalStreamId, MuxResult, Position, StreamState, }, socket::SocketHandler, timer::TimeoutContainer, @@ -44,6 +46,16 @@ impl ConnectionH1 { let kawa = parts.rbuffer; let (size, status) = self.socket.socket_read(kawa.storage.space()); kawa.storage.fill(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_in", size as i64); + parts.metrics.backend_bin += size; + } + Position::Server => { + count!("bytes_in", size as i64); + parts.metrics.bin += size; + } + } if update_readiness_after_read(size, status, &mut self.readiness) { return MuxResult::Continue; } @@ -53,7 +65,7 @@ impl ConnectionH1 { debug_kawa(kawa); if kawa.is_error() { match self.position { - Position::Client(_) => { + Position::Client(..) => { let StreamState::Linked(token) = stream.state else { unreachable!() }; @@ -79,15 +91,11 @@ impl ConnectionH1 { .interest .insert(Ready::WRITABLE) } - match self.position { - Position::Server => { - if !was_main_phase { - self.requests += 1; - println_!("REQUESTS: {}", self.requests); - stream.state = StreamState::Link - } - } - Position::Client(_) => {} + if !was_main_phase && self.position.is_server() { + self.requests += 1; + println_!("REQUESTS: {}", self.requests); + gauge_add!("http.active_requests", 1); + stream.state = StreamState::Link } }; MuxResult::Continue @@ -101,7 +109,8 @@ impl ConnectionH1 { println_!("======= MUX H1 WRITABLE {:?}", self.position); self.timeout_container.reset(); let stream = &mut context.streams[self.stream]; - let kawa = stream.wbuffer(&self.position); + let parts = stream.split(&self.position); + let kawa = parts.wbuffer; kawa.prepare(&mut kawa::h1::BlockConverter); debug_kawa(kawa); let bufs = kawa.as_io_slice(); @@ -111,13 +120,23 @@ impl ConnectionH1 { } let (size, status) = self.socket.socket_write_vectored(&bufs); kawa.consume(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_out", size as i64); + parts.metrics.backend_bout += size; + } + Position::Server => { + count!("bytes_out", size as i64); + parts.metrics.bout += size; + } + } if update_readiness_after_write(size, status, &mut self.readiness) { return MuxResult::Continue; } if kawa.is_terminated() && kawa.is_completed() { match self.position { - Position::Client(_) => self.readiness.interest.insert(Ready::READABLE), + Position::Client(..) => self.readiness.interest.insert(Ready::READABLE), Position::Server => { if stream.context.closing { return MuxResult::CloseSession; @@ -153,7 +172,12 @@ impl ConnectionH1 { _ => {} } // ACCESS LOG - stream.generate_access_log(false, Some(String::from("H1")), context.listener.clone()); + stream.generate_access_log( + false, + Some(String::from("H1")), + context.listener.clone(), + ); + stream.metrics.reset(); let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked); if stream.context.keep_alive_frontend { self.timeout_container.reset(); @@ -180,9 +204,9 @@ impl ConnectionH1 { } pub fn force_disconnect(&mut self) -> MuxResult { - match self.position { - Position::Client(_) => { - self.position = Position::Client(BackendStatus::Disconnecting); + match &mut self.position { + Position::Client(_, _, status) => { + *status = BackendStatus::Disconnecting; self.readiness.event = Ready::HUP; MuxResult::Continue } @@ -196,13 +220,13 @@ impl ConnectionH1 { L: ListenerHandler + L7ListenerHandler, { match self.position { - Position::Client(BackendStatus::KeepAlive(_)) - | Position::Client(BackendStatus::Disconnecting) => { + Position::Client(_, _, BackendStatus::KeepAlive) + | Position::Client(_, _, BackendStatus::Disconnecting) => { println_!("close detached client ConnectionH1"); return; } - Position::Client(BackendStatus::Connecting(_)) - | Position::Client(BackendStatus::Connected(_)) => {} + Position::Client(_, _, BackendStatus::Connecting(_)) + | Position::Client(_, _, BackendStatus::Connected) => {} Position::Server => unreachable!(), } // reconnection is handled by the server @@ -221,28 +245,34 @@ impl ConnectionH1 { let stream_context = &mut stream.context; println_!("end H1 stream {}: {stream_context:#?}", self.stream); match &mut self.position { - Position::Client(BackendStatus::Connected(cluster_id)) - | Position::Client(BackendStatus::Connecting(cluster_id)) => { + Position::Client(_, _, BackendStatus::Connecting(_)) => { + self.stream = usize::MAX; + self.force_disconnect(); + } + Position::Client(_, _, status @ BackendStatus::Connected) => { self.stream = usize::MAX; // keep alive should probably be used only if the http context is fully reset // in case end_stream occurs due to an error the connection state is probably // unrecoverable and should be terminated if stream_context.keep_alive_backend { - self.position = - Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id))) + *status = BackendStatus::KeepAlive; } else { self.force_disconnect(); } } - Position::Client(BackendStatus::KeepAlive(_)) - | Position::Client(BackendStatus::Disconnecting) => unreachable!(), + Position::Client(_, _, BackendStatus::KeepAlive) + | Position::Client(_, _, BackendStatus::Disconnecting) => unreachable!(), Position::Server => match (stream.front.consumed, stream.back.is_main_phase()) { (true, true) => { // we have a "forwardable" answer from the back // if the answer is not terminated we send an RstStream to properly clean the stream // if it is terminated, we finish the transfer, the backend is not necessary anymore if !stream.back.is_terminated() { - forcefully_terminate_answer(stream, &mut self.readiness, H2Error::InternalError); + forcefully_terminate_answer( + stream, + &mut self.readiness, + H2Error::InternalError, + ); } else { stream.state = StreamState::Unlinked; self.readiness.interest.insert(Ready::WRITABLE); @@ -271,11 +301,11 @@ impl ConnectionH1 { self.readiness.interest.insert(Ready::ALL); self.stream = stream; match &mut self.position { - Position::Client(BackendStatus::KeepAlive(cluster_id)) => { - self.position = - Position::Client(BackendStatus::Connecting(std::mem::take(cluster_id))) + Position::Client(_, _, status @ BackendStatus::KeepAlive) => { + *status = BackendStatus::Connected; } - Position::Client(_) => {} + Position::Client(_, _, BackendStatus::Disconnecting) => unreachable!(), + Position::Client(_, _, _) => {} Position::Server => unreachable!(), } } diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 160dfa132..2987af681 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -159,7 +159,9 @@ impl ConnectionH2 { let kawa = match stream_id { H2StreamId::Zero => &mut self.zero, H2StreamId::Other(stream_id, global_stream_id) => { - context.streams[global_stream_id].rbuffer(&self.position) + context.streams[global_stream_id] + .split(&self.position) + .rbuffer } }; println_!("{:?}({stream_id:?}, {amount})", self.state); @@ -170,6 +172,14 @@ impl ConnectionH2 { } let (size, status) = self.socket.socket_read(&mut kawa.storage.space()[..amount]); kawa.storage.fill(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_in", size as i64); + } + Position::Server => { + count!("bytes_in", size as i64); + } + } if update_readiness_after_read(size, status, &mut self.readiness) { return MuxResult::Continue; } else { @@ -202,8 +212,8 @@ impl ConnectionH2 { (H2State::Error, _) | (H2State::GoAway, _) | (H2State::ServerSettings, Position::Server) - | (H2State::ClientPreface, Position::Client(_)) - | (H2State::ClientSettings, Position::Client(_)) => unreachable!( + | (H2State::ClientPreface, Position::Client(..)) + | (H2State::ClientSettings, Position::Client(..)) => unreachable!( "Unexpected combination: (Readable, {:?}, {:?})", self.state, self.position ), @@ -267,7 +277,7 @@ impl ConnectionH2 { self.expect_write = Some(H2StreamId::Zero); return self.handle_frame(settings, context, endpoint); } - (H2State::ServerSettings, Position::Client(_)) => { + (H2State::ServerSettings, Position::Client(..)) => { let i = kawa.storage.data(); match parser::frame_header(i, self.local_settings.settings_max_frame_size) { Ok(( @@ -432,6 +442,14 @@ impl ConnectionH2 { while !kawa.storage.is_empty() { let (size, status) = self.socket.socket_write(kawa.storage.data()); kawa.storage.consume(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_out", size as i64); + } + Position::Server => { + count!("bytes_out", size as i64); + } + } if update_readiness_after_write(size, status, &mut self.readiness) { return MuxResult::Continue; } @@ -446,12 +464,12 @@ impl ConnectionH2 { | (H2State::Discard, _) | (H2State::ClientPreface, Position::Server) | (H2State::ClientSettings, Position::Server) - | (H2State::ServerSettings, Position::Client(_)) => unreachable!( + | (H2State::ServerSettings, Position::Client(..)) => unreachable!( "Unexpected combination: (Writable, {:?}, {:?})", self.state, self.position ), (H2State::GoAway, _) => self.force_disconnect(), - (H2State::ClientPreface, Position::Client(_)) => { + (H2State::ClientPreface, Position::Client(..)) => { println_!("Preparing preface and settings"); let pri = serializer::H2_PRI.as_bytes(); let kawa = &mut self.zero; @@ -470,7 +488,7 @@ impl ConnectionH2 { self.expect_write = Some(H2StreamId::Zero); MuxResult::Continue } - (H2State::ClientSettings, Position::Client(_)) => { + (H2State::ClientSettings, Position::Client(..)) => { println_!("Sent preface and settings"); self.state = H2State::ServerSettings; self.readiness.interest.remove(Ready::WRITABLE); @@ -494,11 +512,22 @@ impl ConnectionH2 { self.expect_write { let stream = &mut context.streams[global_stream_id]; - let kawa = stream.wbuffer(&self.position); + let parts = stream.split(&self.position); + let kawa = parts.wbuffer; while !kawa.out.is_empty() { let bufs = kawa.as_io_slice(); let (size, status) = self.socket.socket_write_vectored(&bufs); kawa.consume(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_out", size as i64); + parts.metrics.backend_bout += size; + } + Position::Server => { + count!("bytes_out", size as i64); + parts.metrics.bout += size; + } + } if let Some((read_stream, amount)) = self.expect_read { if write_stream == read_stream && kawa.storage.available_space() >= amount @@ -513,7 +542,7 @@ impl ConnectionH2 { self.expect_write = None; if (kawa.is_terminated() || kawa.is_error()) && kawa.is_completed() { match self.position { - Position::Client(_) => {} + Position::Client(..) => {} Position::Server => { // mark stream as reusable println_!("Recycle stream: {global_stream_id}"); @@ -563,6 +592,16 @@ impl ConnectionH2 { let bufs = kawa.as_io_slice(); let (size, status) = self.socket.socket_write_vectored(&bufs); kawa.consume(size); + match self.position { + Position::Client(..) => { + count!("back_bytes_out", size as i64); + parts.metrics.backend_bout += size; + } + Position::Server => { + count!("bytes_out", size as i64); + parts.metrics.bout += size; + } + } if update_readiness_after_write(size, status, &mut self.readiness) { self.expect_write = Some(H2StreamId::Other(*stream_id, global_stream_id)); @@ -572,7 +611,7 @@ impl ConnectionH2 { self.expect_write = None; if (kawa.is_terminated() || kawa.is_error()) && kawa.is_completed() { match self.position { - Position::Client(_) => {} + Position::Client(..) => {} Position::Server => { // mark stream as reusable println_!("Recycle1 stream: {global_stream_id}"); @@ -646,7 +685,7 @@ impl ConnectionH2 { pub fn new_stream_id(&mut self) -> StreamId { self.last_stream_id += 2; match self.position { - Position::Client(_) => self.last_stream_id - 1, + Position::Client(..) => self.last_stream_id - 1, Position::Server => self.last_stream_id - 2, } } @@ -670,7 +709,12 @@ impl ConnectionH2 { None => panic!("stream error"), }; let stream = &mut context.streams[global_stream_id]; - let kawa = stream.rbuffer(&self.position); + let parts = stream.split(&self.position); + let kawa = parts.rbuffer; + match self.position { + Position::Client(..) => parts.metrics.backend_bin += slice.len(), + Position::Server => parts.metrics.bin += slice.len(), + } slice.start += kawa.storage.head as u32; kawa.storage.head += slice.len(); kawa.push_block(kawa::Block::Chunk(kawa::Chunk { @@ -720,6 +764,10 @@ impl ConnectionH2 { let buffer = headers.header_block_fragment.data(kawa.storage.buffer()); let stream = &mut context.streams[global_stream_id]; let parts = &mut stream.split(&self.position); + match self.position { + Position::Client(..) => parts.metrics.backend_bin += buffer.len(), + Position::Server => parts.metrics.bin += buffer.len(), + } let was_initial = parts.rbuffer.is_initial(); let status = pkawa::handle_header( &mut self.decoder, @@ -746,15 +794,14 @@ impl ConnectionH2 { .interest .insert(Ready::WRITABLE) } - if was_initial { - match self.position { - Position::Server => stream.state = StreamState::Link, - Position::Client(_) => {} - }; + // was_initial prevents trailers from triggering connection + if was_initial && self.position.is_server() { + gauge_add!("http.active_requests", 1); + stream.state = StreamState::Link; } } Frame::PushPromise(push_promise) => match self.position { - Position::Client(_) => { + Position::Client(..) => { if self.local_settings.settings_enable_push { todo!("forward the push") } else { @@ -796,7 +843,7 @@ impl ConnectionH2 { } let stream = &mut context.streams[stream_id]; match self.position { - Position::Client(_) => {} + Position::Client(..) => {} Position::Server => { // This is a special case, normally, all stream are terminated by the server // when the last byte of the response is written. Here, the reset is requested @@ -945,9 +992,9 @@ impl ConnectionH2 { pub fn force_disconnect(&mut self) -> MuxResult { self.state = H2State::Error; - match self.position { - Position::Client(_) => { - self.position = Position::Client(BackendStatus::Disconnecting); + match &mut self.position { + Position::Client(_, _, status) => { + *status = BackendStatus::Disconnecting; self.readiness.event = Ready::HUP; MuxResult::Continue } @@ -961,10 +1008,8 @@ impl ConnectionH2 { L: ListenerHandler + L7ListenerHandler, { match self.position { - Position::Client(BackendStatus::Connected(_)) - | Position::Client(BackendStatus::Connecting(_)) - | Position::Client(BackendStatus::Disconnecting) => {} - Position::Client(BackendStatus::KeepAlive(_)) => unreachable!(), + Position::Client(_, _, BackendStatus::KeepAlive) => unreachable!(), + Position::Client(..) => {} Position::Server => unreachable!(), } // reconnection is handled by the server for each stream separately @@ -1005,7 +1050,7 @@ impl ConnectionH2 { let stream_context = &mut context.streams[stream].context; println_!("end H2 stream {stream}: {stream_context:#?}"); match self.position { - Position::Client(_) => { + Position::Client(..) => { for (stream_id, global_stream_id) in &self.streams { if *global_stream_id == stream { let id = *stream_id; diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 9561f5f7e..0d2761dae 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -1,15 +1,20 @@ use std::{ cell::RefCell, collections::HashMap, + fmt::Debug, io::ErrorKind, net::{Shutdown, SocketAddr}, rc::{Rc, Weak}, - time::Duration, + time::{Duration, Instant}, }; use mio::{net::TcpStream, Interest, Token}; use rusty_ulid::Ulid; -use sozu_command::{logging::EndpointRecord, proto::command::ListenerType, ready::Ready}; +use sozu_command::{ + logging::EndpointRecord, + proto::command::{Event, EventKind, ListenerType}, + ready::Ready, +}; mod converter; mod h1; @@ -28,8 +33,9 @@ use crate::{ mux::h2::{H2Settings, H2State, H2StreamId, Prioriser}, SessionState, }, + retry::RetryPolicy, router::Route, - server::CONN_RETRIES, + server::{push_event, CONN_RETRIES}, socket::{FrontRustls, SocketHandler, SocketResult}, timer::TimeoutContainer, BackendConnectionError, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, @@ -148,22 +154,34 @@ fn forcefully_terminate_answer(stream: &mut Stream, readiness: &mut Readiness, e readiness.interest.insert(Ready::WRITABLE); } -#[derive(Debug)] pub enum Position { - Client(BackendStatus), + Client(String, Rc>, BackendStatus), Server, } +impl Debug for Position { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Client(cluster_id, _, status) => f + .debug_tuple("Client") + .field(cluster_id) + .field(status) + .finish(), + Self::Server => write!(f, "Server"), + } + } +} + impl Position { fn is_server(&self) -> bool { match self { - Position::Client(_) => false, + Position::Client(..) => false, Position::Server => true, } } fn is_client(&self) -> bool { match self { - Position::Client(_) => true, + Position::Client(..) => true, Position::Server => false, } } @@ -171,9 +189,9 @@ impl Position { #[derive(Debug)] pub enum BackendStatus { - Connecting(String), - Connected(String), - KeepAlive(String), + Connecting(Instant), + Connected, + KeepAlive, Disconnecting, } @@ -234,11 +252,16 @@ impl Connection { pub fn new_h1_client( front_stream: Front, cluster_id: String, + backend: Rc>, timeout_container: TimeoutContainer, ) -> Connection { Connection::H1(ConnectionH1 { socket: front_stream, - position: Position::Client(BackendStatus::Connecting(cluster_id)), + position: Position::Client( + cluster_id, + backend, + BackendStatus::Connecting(Instant::now()), + ), readiness: Readiness { interest: Ready::WRITABLE | Ready::READABLE | Ready::HUP | Ready::ERROR, event: Ready::EMPTY, @@ -282,6 +305,7 @@ impl Connection { pub fn new_h2_client( front_stream: Front, cluster_id: String, + backend: Rc>, pool: Weak>, timeout_container: TimeoutContainer, ) -> Option> { @@ -296,7 +320,11 @@ impl Connection { last_stream_id: 0, local_settings: H2Settings::default(), peer_settings: H2Settings::default(), - position: Position::Client(BackendStatus::Connecting(cluster_id)), + position: Position::Client( + cluster_id, + backend, + BackendStatus::Connecting(Instant::now()), + ), prioriser: Prioriser::new(), readiness: Readiness { interest: Ready::WRITABLE | Ready::HUP | Ready::ERROR, @@ -386,6 +414,21 @@ impl Connection { E: Endpoint, L: ListenerHandler + L7ListenerHandler, { + match self.position() { + Position::Client(cluster_id, backend, _) => { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.dec_connections(); + gauge_add!("backend.connections", -1); + gauge_add!( + "connections_per_backend", + -1, + Some(cluster_id), + Some(&backend_borrow.backend_id) + ); + println_!("--------------- CONNECTION CLOSE: {backend_borrow:#?}"); + } + Position::Server => todo!(), + } match self { Connection::H1(c) => c.close(context, endpoint), Connection::H2(c) => c.close(context, endpoint), @@ -396,6 +439,14 @@ impl Connection { where L: ListenerHandler + L7ListenerHandler, { + match self.position() { + Position::Client(_, backend, BackendStatus::Connected) => { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.active_requests = backend_borrow.active_requests.saturating_sub(1); + println_!("--------------- CONNECTION END STREAM: {backend_borrow:#?}"); + } + _ => {} + } match self { Connection::H1(c) => c.end_stream(stream, context), Connection::H2(c) => c.end_stream(stream, context), @@ -406,6 +457,14 @@ impl Connection { where L: ListenerHandler + L7ListenerHandler, { + match self.position() { + Position::Client(_, backend, BackendStatus::Connected) => { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.active_requests += 1; + println_!("--------------- CONNECTION START STREAM: {backend_borrow:#?}"); + } + _ => {} + } match self { Connection::H1(c) => c.start_stream(stream, context), Connection::H2(c) => c.start_stream(stream, context), @@ -581,15 +640,17 @@ pub struct Stream { pub front: GenericHttpStream, pub back: GenericHttpStream, pub context: HttpContext, + pub metrics: SessionMetrics, } /// This struct allows to mutably borrow the read and write buffers (dependant on the position) -/// as well as the context of a Stream at the same time +/// as well as the context and metrics of a Stream at the same time pub struct StreamParts<'a> { pub window: &'a mut i32, pub rbuffer: &'a mut GenericHttpStream, pub wbuffer: &'a mut GenericHttpStream, pub context: &'a mut HttpContext, + pub metrics: &'a mut SessionMetrics, } impl Stream { @@ -612,36 +673,27 @@ impl Stream { front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)), back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)), context, + metrics: SessionMetrics::new(None), // FIXME }) } pub fn split(&mut self, position: &Position) -> StreamParts<'_> { match position { - Position::Client(_) => StreamParts { + Position::Client(..) => StreamParts { window: &mut self.window, rbuffer: &mut self.back, wbuffer: &mut self.front, context: &mut self.context, + metrics: &mut self.metrics, }, Position::Server => StreamParts { window: &mut self.window, rbuffer: &mut self.front, wbuffer: &mut self.back, context: &mut self.context, + metrics: &mut self.metrics, }, } } - pub fn rbuffer(&mut self, position: &Position) -> &mut GenericHttpStream { - match position { - Position::Client(_) => &mut self.back, - Position::Server => &mut self.front, - } - } - pub fn wbuffer(&mut self, position: &Position) -> &mut GenericHttpStream { - match position { - Position::Client(_) => &mut self.front, - Position::Server => &mut self.back, - } - } pub fn generate_access_log( &mut self, error: bool, @@ -650,6 +702,7 @@ impl Stream { ) where L: ListenerHandler + L7ListenerHandler, { + gauge_add!("http.active_requests", -1); let protocol = match self.context.protocol { Protocol::HTTP => "http", Protocol::HTTPS => "https", @@ -684,10 +737,10 @@ impl Stream { tags, client_rtt: None, //socket_rtt(self.front_socket()), server_rtt: None, //self.backend_socket.as_ref().and_then(socket_rtt), - service_time: Duration::from_micros(0), //metrics.service_time(), - response_time: Duration::from_micros(0), //metrics.response_time(), - bytes_in: 0, //metrics.bin, - bytes_out: 0, //metrics.bout, + service_time: Duration::from_secs(0), // self.metrics.service_time(), + response_time: self.metrics.response_time(), + bytes_in: self.metrics.bin, + bytes_out: self.metrics.bout, user_agent: self.context.user_agent.as_deref(), }; } @@ -767,7 +820,6 @@ impl Router { context: &mut Context, session: Rc>, proxy: Rc>, - metrics: &mut SessionMetrics, ) -> Result<(), BackendConnectionError> { let stream = &mut context.streams[stream_id]; // when reused, a stream should be detached from its old connection, if not we could end @@ -826,28 +878,32 @@ impl Router { (_, _, Position::Server) => { unreachable!("Backend connection behaves like a server") } - (_, _, Position::Client(BackendStatus::Disconnecting)) => {} - (true, false, Position::Client(BackendStatus::Connecting(_))) => {} + (_, _, Position::Client(_, _, BackendStatus::Disconnecting)) => {} + (true, false, Position::Client(_, _, BackendStatus::Connecting(_))) => {} - (true, _, Position::Client(BackendStatus::Connected(other_cluster_id))) => { + (true, _, Position::Client(other_cluster_id, _, BackendStatus::Connected)) => { if *other_cluster_id == cluster_id { reuse_token = Some(*token); reuse_connecting = false; break; } } - (true, true, Position::Client(BackendStatus::Connecting(other_cluster_id))) => { + ( + true, + true, + Position::Client(other_cluster_id, _, BackendStatus::Connecting(_)), + ) => { if *other_cluster_id == cluster_id { reuse_token = Some(*token) } } - (true, _, Position::Client(BackendStatus::KeepAlive(other_cluster_id))) => { + (true, _, Position::Client(other_cluster_id, _, BackendStatus::KeepAlive)) => { if *other_cluster_id == cluster_id { unreachable!("ConnectionH2 behaves like H1") } } - (false, _, Position::Client(BackendStatus::KeepAlive(old_cluster_id))) => { + (false, _, Position::Client(old_cluster_id, _, BackendStatus::KeepAlive)) => { if *old_cluster_id == cluster_id { reuse_token = Some(*token); reuse_connecting = false; @@ -855,24 +911,33 @@ impl Router { } } // can't bundle H1 streams together - (false, _, Position::Client(BackendStatus::Connected(_))) - | (false, _, Position::Client(BackendStatus::Connecting(_))) => {} + (false, _, Position::Client(_, _, BackendStatus::Connected)) + | (false, _, Position::Client(_, _, BackendStatus::Connecting(_))) => {} } } println_!("connect: {cluster_id} (stick={frontend_should_stick}, h2={h2}) -> (reuse={reuse_token:?})"); let token = if let Some(token) = reuse_token { println_!("reused backend: {:#?}", self.backends.get(&token).unwrap()); + stream.metrics.backend_start(); + stream.metrics.backend_connected(); token } else { - let mut socket = self.backend_from_request( + let (mut socket, backend) = self.backend_from_request( &cluster_id, frontend_should_stick, stream_context, proxy.clone(), &context.listener, - metrics, )?; + stream.metrics.backend_start(); + gauge_add!("backend.connections", 1); + gauge_add!( + "connections_per_backend", + 1, + Some(&cluster_id), + Some(&backend.borrow().backend_id) + ); if let Err(e) = socket.set_nodelay(true) { error!( @@ -880,8 +945,6 @@ impl Router { socket, e ); } - // self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR; - // self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now()); let token = proxy.borrow().add_session(session); @@ -898,6 +961,7 @@ impl Router { match Connection::new_h2_client( socket, cluster_id, + backend, context.pool.clone(), timeout_container, ) { @@ -905,7 +969,7 @@ impl Router { None => return Err(BackendConnectionError::MaxBuffers), } } else { - Connection::new_h1_client(socket, cluster_id, timeout_container) + Connection::new_h1_client(socket, cluster_id, backend, timeout_container) }; self.backends.insert(token, connection); token @@ -965,8 +1029,7 @@ impl Router { context: &mut HttpContext, proxy: Rc>, listener: &Rc>, - _metrics: &mut SessionMetrics, - ) -> Result { + ) -> Result<(TcpStream, Rc>), BackendConnectionError> { let (backend, conn) = self .get_backend_for_sticky_session( cluster_id, @@ -993,14 +1056,10 @@ impl Router { ); } - // metrics.backend_id = Some(backend.borrow().backend_id.clone()); - // metrics.backend_start(); - // self.set_backend_id(backend.borrow().backend_id.clone()); - // self.backend = Some(backend); context.backend_id = Some(backend.borrow().backend_id.clone()); context.backend_address = Some(backend.borrow().address); - Ok(conn) + Ok((conn, backend)) } fn get_backend_for_sticky_session( @@ -1074,8 +1133,8 @@ impl {readiness:?}"); let dead = readiness.filter_interest().is_hup() || readiness.filter_interest().is_error(); @@ -1084,50 +1143,139 @@ impl { - *position = Position::Client(BackendStatus::Connected( + Position::Client( + cluster_id, + backend, + BackendStatus::Connecting(start), + ) => { + let mut backend_borrow = backend.borrow_mut(); + if backend_borrow.retry_policy.is_down() { + info!( + "backend server {} at {} is up", + backend_borrow.backend_id, backend_borrow.address + ); + incr!( + "backend.up", + Some(cluster_id), + Some(&backend_borrow.backend_id) + ); + push_event(Event { + kind: EventKind::BackendUp as i32, + backend_id: Some(backend_borrow.backend_id.to_owned()), + address: Some(backend_borrow.address.into()), + cluster_id: Some(cluster_id.to_owned()), + }); + } + + //successful connection, reset failure counter + backend_borrow.failures = 0; + backend_borrow.set_connection_time(start.elapsed()); + backend_borrow.retry_policy.succeed(); + + for stream in &mut context.streams { + match stream.state { + StreamState::Linked(back_token) if back_token == *token => { + stream.metrics.backend_connected(); + backend_borrow.active_requests += 1; + } + _ => {} + } + } + println_!( + "--------------- CONNECTION SUCCESS: {backend_borrow:#?}" + ); + drop(backend_borrow); + *position = Position::Client( std::mem::take(cluster_id), - )); - backend + backend.clone(), + BackendStatus::Connected, + ); + client .timeout_container() .set_duration(self.router.configured_backend_timeout); } - _ => {} + Position::Client(..) => {} + Position::Server => unreachable!(), } - match backend.writable(context, EndpointServer(&mut self.frontend)) { + match client.writable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => {} MuxResult::Upgrade => unreachable!(), // only frontend can upgrade MuxResult::CloseSession => return SessionResult::Close, } } - if backend.readiness().filter_interest().is_readable() { - match backend.readable(context, EndpointServer(&mut self.frontend)) { + if client.readiness().filter_interest().is_readable() { + match client.readable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => {} MuxResult::Upgrade => unreachable!(), // only frontend can upgrade MuxResult::CloseSession => return SessionResult::Close, } } - if dead && !backend.readiness().filter_interest().is_readable() { - println_!("Closing {:#?}", backend); - backend.close(context, EndpointServer(&mut self.frontend)); + if dead && !client.readiness().filter_interest().is_readable() { + println_!("Closing {:#?}", client); + match client.position() { + Position::Client(cluster_id, backend, BackendStatus::Connecting(_)) => { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.failures += 1; + + let already_unavailable = backend_borrow.retry_policy.is_down(); + backend_borrow.retry_policy.fail(); + incr!( + "backend.connections.error", + Some(cluster_id), + Some(&backend_borrow.backend_id) + ); + if !already_unavailable && backend_borrow.retry_policy.is_down() { + error!( + "backend server {} at {} is down", + backend_borrow.backend_id, backend_borrow.address + ); + incr!( + "backend.down", + Some(cluster_id), + Some(&backend_borrow.backend_id) + ); + push_event(Event { + kind: EventKind::BackendDown as i32, + backend_id: Some(backend_borrow.backend_id.to_owned()), + address: Some(backend_borrow.address.into()), + cluster_id: Some(cluster_id.to_owned()), + }); + } + println_!("--------------- CONNECTION FAIL: {backend_borrow:#?}"); + } + Position::Client(_, backend, _) => { + let mut backend_borrow = backend.borrow_mut(); + for stream in &mut context.streams { + match stream.state { + StreamState::Linked(back_token) if back_token == *token => { + backend_borrow.active_requests = + backend_borrow.active_requests.saturating_sub(1); + } + _ => {} + } + } + } + Position::Server => unreachable!(), + } + client.close(context, EndpointServer(&mut self.frontend)); dead_backends.push(*token); } - if !backend.readiness().filter_interest().is_empty() { + if !client.readiness().filter_interest().is_empty() { all_backends_readiness_are_empty = false; } } if !dead_backends.is_empty() { for token in &dead_backends { let proxy_borrow = proxy.borrow(); - if let Some(mut backend) = self.router.backends.remove(token) { - backend.timeout_container().cancel(); - let socket = backend.socket_mut(); + if let Some(mut client) = self.router.backends.remove(token) { + client.timeout_container().cancel(); + let socket = client.socket_mut(); if let Err(e) = proxy_borrow.deregister_socket(socket) { error!("error deregistering back socket({:?}): {:?}", socket, e); } @@ -1188,13 +1336,10 @@ impl {} Err(error) => { println_!("Connection error: {error}"); @@ -1385,10 +1530,10 @@ impl { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.dec_connections(); + gauge_add!("backend.connections", -1); + gauge_add!( + "connections_per_backend", + -1, + Some(cluster_id), + Some(&backend_borrow.backend_id) + ); + for stream in &mut self.context.streams { + match stream.state { + StreamState::Linked(back_token) if back_token == *token => { + backend_borrow.active_requests = + backend_borrow.active_requests.saturating_sub(1); + } + _ => {} + } + } + println_!("--------------- CONNECTION(SESSION) CLOSED: {backend_borrow:#?}"); + } + Position::Server => unreachable!(), + } } let s = match &mut self.frontend { Connection::H1(c) => &mut c.socket,