Skip to content

Commit

Permalink
Update metrics, backend status and events
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Jun 12, 2024
1 parent d2bc46f commit 07040fb
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 154 deletions.
6 changes: 6 additions & 0 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ impl CommandManager {
if !self.json {
debug!("Processing: {}", response.message);
}
if let Some(ResponseContent {
content_type: Some(ContentType::Event(event)),
}) = response.content
{
println!("{event:?}");
}
}
ResponseStatus::Failure => return Err(CtlError::Failure(response.message)),
ResponseStatus::Ok => return Ok(response),
Expand Down
2 changes: 1 addition & 1 deletion command/src/proto/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Response {
}

impl ResponseContent {
fn display(&self, json: bool) -> Result<(), DisplayError> {
pub fn display(&self, json: bool) -> Result<(), DisplayError> {
let content_type = match &self.content_type {
Some(content_type) => content_type,
None => return Ok(println!("No content")),
Expand Down
3 changes: 2 additions & 1 deletion lib/src/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ impl BackendMap {
})?;
self.available = true;

Ok((next_backend.clone(), tcp_stream))
drop(borrowed_backend);
Ok((next_backend, tcp_stream))
}

pub fn backend_from_sticky_session(
Expand Down
13 changes: 7 additions & 6 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,16 @@ impl HttpSession {
return None;
};
let backend = mux.router.backends.remove(&back_token).unwrap();
let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) =
let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
match backend {
mux::Connection::H1(mux::ConnectionH1 {
position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)),
position:
mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
readiness,
socket,
timeout_container,
..
}) => (cluster_id, readiness, socket, timeout_container),
}) => (cluster_id, backend, readiness, socket, timeout_container),
mux::Connection::H1(_) => {
error!("The backend disconnected just after upgrade, abort");
return None;
Expand All @@ -283,11 +284,12 @@ impl HttpSession {
container_frontend_timeout.reset();
container_backend_timeout.reset();

let backend_id = backend.borrow().backend_id.clone();
let mut pipe = Pipe::new(
stream.back.storage.buffer,
None,
Some(backend_id),
Some(backend_socket),
None,
Some(backend),
Some(container_backend_timeout),
Some(container_frontend_timeout),
Some(cluster_id),
Expand All @@ -307,7 +309,6 @@ impl HttpSession {

gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
Some(HttpStateMachine::WebSocket(pipe))
}
Expand Down
13 changes: 7 additions & 6 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,16 @@ impl HttpsSession {
return None;
};
let backend = mux.router.backends.remove(&back_token).unwrap();
let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) =
let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
match backend {
mux::Connection::H1(mux::ConnectionH1 {
position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)),
position:
mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
readiness,
socket,
timeout_container,
..
}) => (cluster_id, readiness, socket, timeout_container),
}) => (cluster_id, backend, readiness, socket, timeout_container),
mux::Connection::H1(_) => {
error!("The backend disconnected just after upgrade, abort");
return None;
Expand All @@ -367,11 +368,12 @@ impl HttpsSession {
container_frontend_timeout.reset();
container_backend_timeout.reset();

let backend_id = backend.borrow().backend_id.clone();
let mut pipe = Pipe::new(
stream.back.storage.buffer,
None,
Some(backend_id),
Some(backend_socket),
None,
Some(backend),
Some(container_backend_timeout),
Some(container_frontend_timeout),
Some(cluster_id),
Expand All @@ -391,7 +393,6 @@ impl HttpsSession {

gauge_add!("protocol.https", -1);
gauge_add!("protocol.wss", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
Some(HttpsStateMachine::WebSocket(pipe))
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ pub enum BackendConnectAction {
pub enum BackendConnectionError {
#[error("Not found: {0:?}")]
NotFound(ObjectKind),
#[error("Too many connections on cluster {0:?}")]
#[error("Too many failed attemps on cluster {0:?}")]
MaxConnectionRetries(Option<String>),
#[error("the sessions slab has reached maximum capacity")]
MaxSessionsMemory,
Expand Down
94 changes: 62 additions & 32 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use sozu_command::ready::Ready;
use crate::{
println_,
protocol::mux::{
debug_kawa, forcefully_terminate_answer, parser::H2Error, set_default_answer, update_readiness_after_read, update_readiness_after_write, BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult, Position, StreamState
debug_kawa, forcefully_terminate_answer, parser::H2Error, set_default_answer,
update_readiness_after_read, update_readiness_after_write, BackendStatus, Context,
Endpoint, GlobalStreamId, MuxResult, Position, StreamState,
},
socket::SocketHandler,
timer::TimeoutContainer,
Expand Down Expand Up @@ -44,6 +46,16 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
let kawa = parts.rbuffer;
let (size, status) = self.socket.socket_read(kawa.storage.space());
kawa.storage.fill(size);
match self.position {
Position::Client(..) => {
count!("back_bytes_in", size as i64);
parts.metrics.backend_bin += size;
}
Position::Server => {
count!("bytes_in", size as i64);
parts.metrics.bin += size;
}
}
if update_readiness_after_read(size, status, &mut self.readiness) {
return MuxResult::Continue;
}
Expand All @@ -53,7 +65,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
debug_kawa(kawa);
if kawa.is_error() {
match self.position {
Position::Client(_) => {
Position::Client(..) => {
let StreamState::Linked(token) = stream.state else {
unreachable!()
};
Expand All @@ -79,15 +91,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
.interest
.insert(Ready::WRITABLE)
}
match self.position {
Position::Server => {
if !was_main_phase {
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
stream.state = StreamState::Link
}
}
Position::Client(_) => {}
if !was_main_phase && self.position.is_server() {
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
gauge_add!("http.active_requests", 1);
stream.state = StreamState::Link
}
};
MuxResult::Continue
Expand All @@ -101,7 +109,8 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
println_!("======= MUX H1 WRITABLE {:?}", self.position);
self.timeout_container.reset();
let stream = &mut context.streams[self.stream];
let kawa = stream.wbuffer(&self.position);
let parts = stream.split(&self.position);
let kawa = parts.wbuffer;
kawa.prepare(&mut kawa::h1::BlockConverter);
debug_kawa(kawa);
let bufs = kawa.as_io_slice();
Expand All @@ -111,13 +120,23 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
let (size, status) = self.socket.socket_write_vectored(&bufs);
kawa.consume(size);
match self.position {
Position::Client(..) => {
count!("back_bytes_out", size as i64);
parts.metrics.backend_bout += size;
}
Position::Server => {
count!("bytes_out", size as i64);
parts.metrics.bout += size;
}
}
if update_readiness_after_write(size, status, &mut self.readiness) {
return MuxResult::Continue;
}

if kawa.is_terminated() && kawa.is_completed() {
match self.position {
Position::Client(_) => self.readiness.interest.insert(Ready::READABLE),
Position::Client(..) => self.readiness.interest.insert(Ready::READABLE),
Position::Server => {
if stream.context.closing {
return MuxResult::CloseSession;
Expand Down Expand Up @@ -153,7 +172,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
_ => {}
}
// ACCESS LOG
stream.generate_access_log(false, Some(String::from("H1")), context.listener.clone());
stream.generate_access_log(
false,
Some(String::from("H1")),
context.listener.clone(),
);
stream.metrics.reset();
let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
if stream.context.keep_alive_frontend {
self.timeout_container.reset();
Expand All @@ -180,9 +204,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}

pub fn force_disconnect(&mut self) -> MuxResult {
match self.position {
Position::Client(_) => {
self.position = Position::Client(BackendStatus::Disconnecting);
match &mut self.position {
Position::Client(_, _, status) => {
*status = BackendStatus::Disconnecting;
self.readiness.event = Ready::HUP;
MuxResult::Continue
}
Expand All @@ -196,13 +220,13 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
L: ListenerHandler + L7ListenerHandler,
{
match self.position {
Position::Client(BackendStatus::KeepAlive(_))
| Position::Client(BackendStatus::Disconnecting) => {
Position::Client(_, _, BackendStatus::KeepAlive)
| Position::Client(_, _, BackendStatus::Disconnecting) => {
println_!("close detached client ConnectionH1");
return;
}
Position::Client(BackendStatus::Connecting(_))
| Position::Client(BackendStatus::Connected(_)) => {}
Position::Client(_, _, BackendStatus::Connecting(_))
| Position::Client(_, _, BackendStatus::Connected) => {}
Position::Server => unreachable!(),
}
// reconnection is handled by the server
Expand All @@ -221,28 +245,34 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
let stream_context = &mut stream.context;
println_!("end H1 stream {}: {stream_context:#?}", self.stream);
match &mut self.position {
Position::Client(BackendStatus::Connected(cluster_id))
| Position::Client(BackendStatus::Connecting(cluster_id)) => {
Position::Client(_, _, BackendStatus::Connecting(_)) => {
self.stream = usize::MAX;
self.force_disconnect();
}
Position::Client(_, _, status @ BackendStatus::Connected) => {
self.stream = usize::MAX;
// keep alive should probably be used only if the http context is fully reset
// in case end_stream occurs due to an error the connection state is probably
// unrecoverable and should be terminated
if stream_context.keep_alive_backend {
self.position =
Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id)))
*status = BackendStatus::KeepAlive;
} else {
self.force_disconnect();
}
}
Position::Client(BackendStatus::KeepAlive(_))
| Position::Client(BackendStatus::Disconnecting) => unreachable!(),
Position::Client(_, _, BackendStatus::KeepAlive)
| Position::Client(_, _, BackendStatus::Disconnecting) => unreachable!(),
Position::Server => match (stream.front.consumed, stream.back.is_main_phase()) {
(true, true) => {
// we have a "forwardable" answer from the back
// if the answer is not terminated we send an RstStream to properly clean the stream
// if it is terminated, we finish the transfer, the backend is not necessary anymore
if !stream.back.is_terminated() {
forcefully_terminate_answer(stream, &mut self.readiness, H2Error::InternalError);
forcefully_terminate_answer(
stream,
&mut self.readiness,
H2Error::InternalError,
);
} else {
stream.state = StreamState::Unlinked;
self.readiness.interest.insert(Ready::WRITABLE);
Expand Down Expand Up @@ -271,11 +301,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
self.readiness.interest.insert(Ready::ALL);
self.stream = stream;
match &mut self.position {
Position::Client(BackendStatus::KeepAlive(cluster_id)) => {
self.position =
Position::Client(BackendStatus::Connecting(std::mem::take(cluster_id)))
Position::Client(_, _, status @ BackendStatus::KeepAlive) => {
*status = BackendStatus::Connected;
}
Position::Client(_) => {}
Position::Client(_, _, BackendStatus::Disconnecting) => unreachable!(),
Position::Client(_, _, _) => {}
Position::Server => unreachable!(),
}
}
Expand Down
Loading

0 comments on commit 07040fb

Please sign in to comment.