Skip to content

Commit

Permalink
Performance and stability improvements
Browse files Browse the repository at this point in the history
- remove kawa delimiters (it overly fragments the writes and slows h2
  tremendously)
- check rustls buffers before the socket (to reduce syscalls)
- ignore empty data frames with end stream on close stream (all the
  stream management should be revised honestly)
- only count the active streams when checking if opening a new one would
  overflow the max concurrent allowed (again... stream management = bad)
- log the precise reason of any goaway
- properly reset metrics
- display total time and backend response time in access logs (will
  soon changed when rebase on main)

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Jun 20, 2024
1 parent 07040fb commit 2c9620e
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 107 deletions.
2 changes: 1 addition & 1 deletion command/src/logging/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl fmt::Display for LogDuration {

let ms = duration.as_millis();
if ms < 10 {
let us = duration.as_millis();
let us = duration.as_micros();
if us >= 10 {
return write!(f, "{us}μs");
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/protocol/mux/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
.unwrap();
kawa.push_out(Store::from_slice(&header));
kawa.push_out(data);
kawa.push_delimiter();
// kawa.push_delimiter();
return can_continue;
}
Block::Flags(Flags {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
kawa.push_out(Store::from_slice(&header));
}
if end_header || end_stream {
kawa.push_delimiter()
// kawa.push_delimiter()
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Instant;

use sozu_command::ready::Ready;

use crate::{
Expand Down Expand Up @@ -42,6 +44,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
println_!("======= MUX H1 READABLE {:?}", self.position);
self.timeout_container.reset();
let stream = &mut context.streams[self.stream];
if stream.metrics.start.is_none() {
stream.metrics.start = Some(Instant::now());
}
let parts = stream.split(&self.position);
let kawa = parts.rbuffer;
let (size, status) = self.socket.socket_read(kawa.storage.space());
Expand Down Expand Up @@ -144,19 +149,19 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
let kawa = &mut stream.back;
match kawa.detached.status_line {
kawa::StatusLine::Response { code: 101, .. } => {
println!("============== HANDLE UPGRADE!");
debug!("============== HANDLE UPGRADE!");
return MuxResult::Upgrade;
}
kawa::StatusLine::Response { code: 100, .. } => {
println!("============== HANDLE CONTINUE!");
debug!("============== HANDLE CONTINUE!");
// after a 100 continue, we expect the client to continue with its request
self.timeout_container.reset();
self.readiness.interest.insert(Ready::READABLE);
kawa.clear();
return MuxResult::Continue;
}
kawa::StatusLine::Response { code: 103, .. } => {
println!("============== HANDLE EARLY HINT!");
debug!("============== HANDLE EARLY HINT!");
if let StreamState::Linked(token) = stream.state {
// after a 103 early hints, we expect the backend to send its response
endpoint
Expand All @@ -181,9 +186,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
if stream.context.keep_alive_frontend {
self.timeout_container.reset();
// println!("{old_state:?} {:?}", self.readiness);
if let StreamState::Linked(token) = old_state {
// println!("{:?}", endpoint.readiness(token));
endpoint.end_stream(token, self.stream, context);
}
self.readiness.interest.insert(Ready::READABLE);
Expand Down Expand Up @@ -285,7 +288,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
(false, false) => {
// we do not have an answer, but the request is untouched so we can retry
println!("H1 RECONNECT");
debug!("H1 RECONNECT");
stream.state = StreamState::Link;
}
(false, true) => unreachable!(),
Expand Down
118 changes: 75 additions & 43 deletions lib/src/protocol/mux/h2.rs

Large diffs are not rendered by default.

47 changes: 28 additions & 19 deletions lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ pub use crate::protocol::mux::{
#[macro_export]
macro_rules! println_ {
($($t:expr),*) => {
print!("{}:{} ", file!(), line!());
println!($($t),*)
// $(let _ = &$t;)*
// print!("{}:{} ", file!(), line!());
// println!($($t),*)
$(let _ = &$t;)*
};
}
fn debug_kawa(_kawa: &GenericHttpStream) {
Expand Down Expand Up @@ -737,7 +737,7 @@ impl Stream {
tags,
client_rtt: None, //socket_rtt(self.front_socket()),
server_rtt: None, //self.backend_socket.as_ref().and_then(socket_rtt),
service_time: Duration::from_secs(0), // self.metrics.service_time(),
service_time: self.metrics.backend_connection_time().unwrap_or(Duration::from_secs(0)), // self.metrics.service_time(),
response_time: self.metrics.response_time(),
bytes_in: self.metrics.bin,
bytes_out: self.metrics.bout,
Expand Down Expand Up @@ -770,6 +770,13 @@ impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
}
}

pub fn active_len(&self) -> usize {
self.streams
.iter()
.filter(|s| !matches!(s.state, StreamState::Recycle))
.count()
}

pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option<GlobalStreamId> {
let listener = self.listener.borrow();
let http_context = HttpContext::new(
Expand All @@ -784,12 +791,15 @@ impl<L: ListenerHandler + L7ListenerHandler> Context<L> {
println_!("Reuse stream: {stream_id}");
stream.state = StreamState::Idle;
stream.attempts = 0;
stream.received_end_of_stream = false;
stream.window = window as i32;
stream.context = http_context;
stream.back.clear();
stream.back.storage.clear();
stream.front.clear();
stream.front.storage.clear();
stream.metrics.reset();
stream.metrics.start = Some(Instant::now());
return Some(stream_id);
}
}
Expand Down Expand Up @@ -919,8 +929,8 @@ impl Router {

let token = if let Some(token) = reuse_token {
println_!("reused backend: {:#?}", self.backends.get(&token).unwrap());
stream.metrics.backend_start();
stream.metrics.backend_connected();
// stream.metrics.backend_start();
// stream.metrics.backend_connected();
token
} else {
let (mut socket, backend) = self.backend_from_request(
Expand Down Expand Up @@ -995,7 +1005,7 @@ impl Router {
Err(cluster_error) => {
// we are past kawa parsing if it succeeded this can't fail
// if the request was malformed it was caught by kawa and we sent a 400
panic!("{cluster_error}");
unreachable!("{cluster_error}");
}
};

Expand All @@ -1004,7 +1014,7 @@ impl Router {
let route = match route_result {
Ok(route) => route,
Err(frontend_error) => {
println!("{}", frontend_error);
println_!("{}", frontend_error);
// self.set_answer(DefaultAnswerStatus::Answer404, None);
return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
}
Expand All @@ -1013,7 +1023,7 @@ impl Router {
let cluster_id = match route {
Route::Cluster(id) => id,
Route::Deny => {
println!("Route::Deny");
println_!("Route::Deny");
// self.set_answer(DefaultAnswerStatus::Answer401, None);
return Err(RetrieveClusterError::UnauthorizedRoute);
}
Expand All @@ -1038,7 +1048,7 @@ impl Router {
proxy,
)
.map_err(|backend_error| {
println!("{backend_error}");
println_!("{backend_error}");
// self.set_answer(DefaultAnswerStatus::Answer503, None);
BackendConnectionError::Backend(backend_error)
})?;
Expand Down Expand Up @@ -1135,7 +1145,6 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
let mut dead_backends = Vec::new();
for (token, client) in self.router.backends.iter_mut() {
let readiness = client.readiness_mut();
// println!("{token:?} -> {readiness:?}");
let dead = readiness.filter_interest().is_hup()
|| readiness.filter_interest().is_error();
if dead {
Expand Down Expand Up @@ -1292,8 +1301,6 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
}
if !proxy_borrow.remove_session(*token) {
error!("session {:?} was already removed!", token);
} else {
// println!("SUCCESS: session {token:?} was removed!");
}
}
println_!("FRONTEND: {:#?}", self.frontend);
Expand Down Expand Up @@ -1384,7 +1391,7 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
}

fn update_readiness(&mut self, token: Token, events: Ready) {
println!("EVENTS: {events:?} on {token:?}");
println_!("EVENTS: {events:?} on {token:?}");
if token == self.frontend_token {
self.frontend.readiness_mut().event |= events;
} else if let Some(c) = self.router.backends.get_mut(&token) {
Expand Down Expand Up @@ -1451,7 +1458,7 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
if token == back_token {
// This stream is linked to the backend that timedout
if stream.back.is_terminated() || stream.back.is_error() {
println!(
println_!(
"Stream terminated or in error, do nothing, just wait a bit more"
);
// Nothing to do, simply wait for the remaining bytes to be proxied
Expand All @@ -1460,11 +1467,13 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
}
} else if !stream.back.consumed {
// The response has not started yet
println!("Stream still waiting for response, send 504");
println_!("Stream still waiting for response, send 504");
set_default_answer(stream, front_readiness, 504);
should_write = true;
} else {
println!("Stream waiting for end of response, forcefully terminate it");
println_!(
"Stream waiting for end of response, forcefully terminate it"
);
forcefully_terminate_answer(
stream,
front_readiness,
Expand Down Expand Up @@ -1527,6 +1536,7 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
}

fn close<P: L7Proxy>(&mut self, proxy: Rc<RefCell<P>>, _metrics: &mut SessionMetrics) {
println!("MUX CLOSE");
println_!("FRONTEND: {:#?}", self.frontend);
println_!("BACKENDS: {:#?}", self.router.backends);

Expand All @@ -1544,8 +1554,6 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
}
if !proxy_borrow.remove_session(*token) {
error!("session {:?} was already removed!", token);
} else {
// println!("SUCCESS: session {token:?} was removed!");
}

match client.position() {
Expand Down Expand Up @@ -1573,6 +1581,7 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
Position::Server => unreachable!(),
}
}
return;
let s = match &mut self.frontend {

Check warning on line 1585 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Build Sozu 🦀

unreachable statement
Connection::H1(c) => &mut c.socket,
Connection::H2(c) => &mut c.socket,
Expand Down
5 changes: 2 additions & 3 deletions lib/src/protocol/mux/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub fn frame_header(input: &[u8], max_frame_size: u32) -> IResult<&[u8], FrameHe
FrameType::WindowUpdate => true,
};
if !valid_stream_id {
println!("invalid stream_id: {stream_id}");
error!("invalid stream_id: {}", stream_id);
return Err(Err::Failure(ParserError::new_h2(i, H2Error::ProtocolError)));
}

Expand All @@ -223,7 +223,7 @@ pub fn frame_header(input: &[u8], max_frame_size: u32) -> IResult<&[u8], FrameHe
}

fn convert_frame_type(t: u8) -> Option<FrameType> {
info!("got frame type: {}", t);
debug!("got frame type: {}", t);
match t {
0 => Some(FrameType::Data),
1 => Some(FrameType::Headers),
Expand Down Expand Up @@ -353,7 +353,6 @@ pub fn data_frame<'a>(
header: &FrameHeader,
) -> IResult<&'a [u8], Frame, ParserError<'a>> {
let (remaining, i) = take(header.payload_len)(input)?;
println!("{i:?}");

let (i, pad_length) = if header.flags & 0x8 != 0 {
let (i, pad_length) = be_u8(i)?;
Expand Down
34 changes: 26 additions & 8 deletions lib/src/protocol/mux/pkawa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ where
return handle_trailer(kawa, input, end_stream, decoder);
}
kawa.push_block(Block::StatusLine);
// kawa.detached.status_line = match kawa.kind {
// Kind::Request => StatusLine::Request {
// version: Version::V20,
// method: Store::Static(b"GET"),
// uri: Store::Static(b"/"),
// authority: Store::Static(b"lolcatho.st:8443"),
// path: Store::Static(b"/"),
// },
// Kind::Response => StatusLine::Response {
// version: Version::V20,
// code: 200,
// status: Store::Static(b"200"),
// reason: Store::Static(b"FromH2"),
// },
// };
kawa.detached.status_line = match kawa.kind {
Kind::Request => {
let mut method = Store::Empty;
Expand Down Expand Up @@ -87,12 +102,15 @@ where
invalid_headers = true;
}
} else if compare_no_case(&k, b"priority") {
todo!("decode priority");
// todo!("decode priority");
warn!("DECODE PRIORITY: {}", unsafe {
std::str::from_utf8_unchecked(v.as_ref())
});
prioriser.push_priority(
stream_id,
PriorityPart::Rfc9218 {
urgency: todo!(),
incremental: todo!(),
urgency: 0,
incremental: false,
},
);
}
Expand All @@ -105,7 +123,7 @@ where
}
});
if let Err(error) = decode_status {
println!("INVALID FRAGMENT: {error:?}");
error!("INVALID FRAGMENT: {:?}", error);
return Err((H2Error::CompressionError, true));
}
if invalid_headers
Expand All @@ -114,7 +132,7 @@ where
|| path.len() == 0
|| scheme.len() == 0
{
println!("INVALID HEADERS");
error!("INVALID HEADERS");
return Err((H2Error::ProtocolError, false));
}
// uri is only used by H1 statusline, in most cases it only consists of the path
Expand Down Expand Up @@ -177,11 +195,11 @@ where
}
});
if let Err(error) = decode_status {
println!("INVALID FRAGMENT: {error:?}");
error!("INVALID FRAGMENT: {:?}", error);
return Err((H2Error::CompressionError, true));
}
if invalid_headers || status.len() == 0 {
println!("INVALID HEADERS");
error!("INVALID HEADERS");
return Err((H2Error::ProtocolError, false));
}
StatusLine::Response {
Expand All @@ -195,7 +213,7 @@ where

// everything has been parsed
kawa.storage.head = kawa.storage.end;
println!(
debug!(
"index: {}/{}/{}",
kawa.storage.start, kawa.storage.head, kawa.storage.end
);
Expand Down
Loading

0 comments on commit 2c9620e

Please sign in to comment.