Skip to content

Commit

Permalink
Merge pull request #2 from github/timings
Browse files Browse the repository at this point in the history
Implement and provide access to request/response timing information.
  • Loading branch information
patrickt authored Jan 18, 2024
2 parents 58b6deb + ce9fe2e commit bfa64f0
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 9 deletions.
2 changes: 1 addition & 1 deletion crates/twirp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod test;

pub use client::{Client, ClientBuilder, ClientError, Middleware, Next, Result};
pub use error::*; // many constructors like `invalid_argument()`
pub use server::{serve, Router};
pub use server::{serve, Router, Timings};

// Re-export `reqwest` so that it's easy to implement middleware.
pub use reqwest;
Expand Down
121 changes: 113 additions & 8 deletions crates/twirp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ use futures::Future;
use hyper::{header, Body, Method, Request, Response};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::time::{Duration, Instant};

use crate::headers::{CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF};
use crate::{error, to_proto_body, GenericError, TwirpErrorResponse};

/// A function that handles a request and returns a response.
type HandlerFn = Box<dyn Fn(Request<Body>) -> HandlerResponse + Send + Sync>;

/// Type alias for a handler response.
type HandlerResponse =
Box<dyn Future<Output = Result<Response<Body>, GenericError>> + Unpin + Send>;

type HandlerFn = Box<dyn Fn(Request<Body>) -> HandlerResponse + Send + Sync>;

/// A Router maps a request to a handler.
/// A Router maps a request (method, path) tuple to a handler.
pub struct Router {
routes: HashMap<(Method, String), HandlerFn>,
prefix: &'static str,
}

/// The canonical twirp path prefix. You don't have to use this, but it's the default.
pub const DEFAULT_TWIRP_PATH_PREFIX: &str = "/twirp";

impl Default for Router {
Expand Down Expand Up @@ -48,7 +52,7 @@ impl Router {
}
}

/// Adds a handler to the router for the given method and path.
/// Adds a sync handler to the router for the given method and path.
pub fn add_sync_handler<F>(&mut self, method: Method, path: &str, f: F)
where
F: Fn(Request<Body>) -> Result<Response<Body>, GenericError>
Expand Down Expand Up @@ -96,8 +100,16 @@ impl Router {
> {
let f = f.clone();
Box::new(Box::pin(async move {
match parse_request(req).await {
Ok((req, resp_fmt)) => write_response(f(req).await, resp_fmt),
let mut timings = *req
.extensions()
.get::<Timings>()
.expect("invariant violated: timing info not present in request");
match parse_request(req, &mut timings).await {
Ok((req, resp_fmt)) => {
let res = f(req).await;
timings.set_response_handled();
write_response(res, resp_fmt)
}
Err(err) => {
// This is the only place we use tracing (would be nice to remove)
// tracing::error!(?err, "failed to parse request");
Expand All @@ -109,17 +121,27 @@ impl Router {
twirp_err.to_response()
}
}
.map(|mut resp| {
timings.set_response_written();
resp.extensions_mut().insert(timings);
resp
})
}))
};
let key = (Method::POST, [self.prefix, path].join("/"));
self.routes.insert(key, Box::new(g));
}
}

/// Serve a request using the given router.
pub async fn serve(
router: Arc<Router>,
req: Request<Body>,
mut req: Request<Body>,
) -> Result<Response<Body>, GenericError> {
if req.extensions().get::<Timings>().is_none() {
let start = tokio::time::Instant::now();
req.extensions_mut().insert(Timings::new(start));
}
let key = (req.method().clone(), req.uri().path().to_string());
if let Some(handler) = router.routes.get(&key) {
handler(req).await
Expand Down Expand Up @@ -150,16 +172,21 @@ impl BodyFormat {
}
}

async fn parse_request<T>(req: Request<Body>) -> Result<(T, BodyFormat), GenericError>
async fn parse_request<T>(
req: Request<Body>,
timings: &mut Timings,
) -> Result<(T, BodyFormat), GenericError>
where
T: prost::Message + Default + DeserializeOwned,
{
let format = BodyFormat::from_content_type(&req);
let bytes = hyper::body::to_bytes(req.into_body()).await?;
timings.set_received();
let request = match format {
BodyFormat::Pb => T::decode(bytes)?,
BodyFormat::JsonPb => serde_json::from_slice(&bytes)?,
};
timings.set_parsed();
Ok((request, format))
}

Expand Down Expand Up @@ -191,6 +218,84 @@ where
Ok(res)
}

/// Contains timing information associated with a request.
/// To access the timings in a given request, use the [extensions](Request::extensions)
/// method and specialize to `Timings` appropriately.
#[derive(Debug, Clone, Copy)]
pub struct Timings {
// When the request started.
pub start: Instant,
// When the request was received (headers and body).
pub request_received: Option<Instant>,
// When the request body was parsed.
pub request_parsed: Option<Instant>,
// When the response handler returned.
pub response_handled: Option<Instant>,
// When the response was written.
pub response_written: Option<Instant>,
}

impl Timings {
#[allow(clippy::new_without_default)]
pub fn new(start: Instant) -> Self {
Self {
start,
request_received: None,
request_parsed: None,
response_handled: None,
response_written: None,
}
}

fn set_received(&mut self) {
self.request_received = Some(Instant::now());
}

fn set_parsed(&mut self) {
self.request_parsed = Some(Instant::now());
}

fn set_response_handled(&mut self) {
self.response_handled = Some(Instant::now());
}

fn set_response_written(&mut self) {
self.response_written = Some(Instant::now());
}

pub fn received(&self) -> Option<Duration> {
self.request_received.map(|x| x - self.start)
}

pub fn parsed(&self) -> Option<Duration> {
match (self.request_parsed, self.request_received) {
(Some(parsed), Some(received)) => Some(parsed - received),
_ => None,
}
}

pub fn response_handled(&self) -> Option<Duration> {
match (self.response_handled, self.request_parsed) {
(Some(handled), Some(parsed)) => Some(handled - parsed),
_ => None,
}
}

pub fn response_written(&self) -> Option<Duration> {
match (self.response_written, self.response_handled) {
(Some(written), Some(handled)) => Some(written - handled),
(Some(written), None) => {
if let Some(parsed) = self.request_parsed {
Some(written - parsed)
} else {
self.request_received.map(|received| written - received)
}
}
_ => None,
}
}
}

#[cfg(test)]
mod tests {

Expand Down

0 comments on commit bfa64f0

Please sign in to comment.