Skip to content

Commit

Permalink
Implement stop method (#5)
Browse files Browse the repository at this point in the history
* drop -> stop

Signed-off-by: Dan Bond <[email protected]>

* assert error enum

Signed-off-by: Dan Bond <[email protected]>

Signed-off-by: Dan Bond <[email protected]>
  • Loading branch information
loshz authored Sep 25, 2022
1 parent 1f2db21 commit 6873b43
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
- main

env:
RUST_VERSION: 1.61
RUST_VERSION: 1.64

jobs:
lint:
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "metrics_server"
version = "0.9.0"
version = "0.10.0"
authors = ["Dan Bond <[email protected]>"]
edition = "2021"
rust-version = "1.58"
Expand All @@ -22,6 +22,7 @@ log = "0.4"
tiny_http = "0.11"

[dev-dependencies]
ctrlc = { version = "3.2", features = ["termination"] }
prometheus-client = "0.18"
reqwest = { version = "0.11", features = ["blocking"] }

Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ This crate provides a thread safe, minimalstic HTTP/S server used to buffer metr
Include the lib in your `Cargo.toml` dependencies:
```toml
[dependencies]
metrics_server = "0.9"
metrics_server = "0.10"
```

To enable TLS support, pass the optional feature flag:
```toml
[dependencies]
metrics_server = { version = "0.9", features = ["tls"] }
metrics_server = { version = "0.10", features = ["tls"] }
```

### HTTP
Expand All @@ -35,6 +35,9 @@ let server = MetricsServer::http("localhost:8001");
// Publish you application metrics.
let bytes = server.update(Vec::from([1, 2, 3, 4]));
assert_eq!(4, bytes);

// Stop the server.
server.stop().unwrap();
```

### HTTPS
Expand All @@ -52,6 +55,9 @@ let server = MetricsServer::https("localhost:8443", cert, key);
// Publish you application metrics.
let bytes = server.update(Vec::from([1, 2, 3, 4]));
assert_eq!(4, bytes);

// Stop the server.
server.stop().unwrap();
```

For more comprehensive usage, see the included [examples](./examples).
61 changes: 41 additions & 20 deletions examples/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,56 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time};

use metrics_server::MetricsServer;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::registry::Registry;

fn main() {
// Create a metrics registry.
let mut registry = Registry::default();

// Create a metric that represents a single monotonically increasing counter.
let counter: Counter = Counter::default();
use metrics_server::MetricsServer;

// Register a metric with the Registry.
registry.register("some_count", "Number of random counts", counter.clone());
fn main() {
// Register stop handler.
let stop = Arc::new(AtomicBool::new(false));
let s = stop.clone();
ctrlc::set_handler(move || {
s.store(true, Ordering::Relaxed);
})
.unwrap();

// Expose the Prometheus metrics.
let server = MetricsServer::http("localhost:8001");
println!("Starting metrics server: http://localhost:8001/metrics");

std::thread::scope(|s| {
let handle = s.spawn(|| {
// Create a metrics registry and counter that represents a single monotonically
// increasing counter.
let mut registry = Registry::default();
let counter: Counter = Counter::default();
registry.register("some_count", "Number of random counts", counter.clone());

// Increment the counter every 5 seconds.
loop {
if stop.load(Ordering::Relaxed) {
break;
}

counter.inc();

// Encode the current Registry in Prometheus format.
let mut encoded = Vec::new();
encode(&mut encoded, &registry).unwrap();

// Increment the counter every 5 seconds.
loop {
counter.inc();
// Update the Metrics Server with the current encoded data.
server.update(encoded);

// Encode the current Registry in Prometheus format.
let mut encoded = Vec::new();
encode(&mut encoded, &registry).unwrap();
thread::sleep(time::Duration::from_secs(5));
}
});

// Update the Metrics Server with the current encoded data.
server.update(encoded);
handle.join().unwrap();
});

let five_secs = time::Duration::from_secs(5);
thread::sleep(five_secs);
}
// Stop server.
server.stop().unwrap();
}
5 changes: 4 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use std::fmt;
/// The error type for MetricsServer operations.
#[derive(Debug)]
pub enum ServerError {
/// Represents an error while creating a new server.
/// Represents an error encountered while creating a new server.
Create(String),
/// Represents an error encountered while stopping the server.
Stop(String),
}

impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ServerError::Create(s) => write!(f, "error creating metrics server: {}", s),
ServerError::Stop(s) => write!(f, "error stopping metrics server: {}", s),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
//! // Publish your application metrics.
//! let bytes = server.update(Vec::from([1, 2, 3, 4]));
//! assert_eq!(4, bytes);
//!
//! // Stop the server.
//! server.stop().unwrap();
//! ```
//!
//! Start a HTTPS server:
Expand All @@ -37,6 +40,9 @@
//! // Publish your application metrics.
//! let bytes = server.update(Vec::from([1, 2, 3, 4]));
//! assert_eq!(4, bytes);
//!
//! // Stop the server.
//! server.stop().unwrap();
//! ```
mod error;
mod server;
Expand Down
19 changes: 12 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,26 @@ impl MetricsServer {

self
}
}

impl Drop for MetricsServer {
// TODO: should we really be doing this inside drop? It _could_ panic,
// so maybe a shutdown/stop method would be better?
fn drop(&mut self) {
/// Stop serving requests and free thread resources.
pub fn stop(mut self) -> Result<(), ServerError> {
// Signal that we should stop handling requests and unblock the server.
self.shared.stop.store(true, Ordering::Relaxed);
self.shared.server.unblock();

// Because join takes ownership of the thread, we need to call the take method
// on the Option to move the value out of the Some variant and leave a None
// variant in its place.
if let Some(thread) = self.thread.take() {
thread.join().unwrap();
match self.thread.take() {
Some(thread) => thread.join().map_err(|e| {
let err = match e.downcast_ref::<String>() {
Some(s) => s,
None => "unknown",
};

ServerError::Stop(err.to_string())
}),
None => Ok(()),
}
}
}
42 changes: 24 additions & 18 deletions tests/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use metrics_server::MetricsServer;
use metrics_server::{MetricsServer, ServerError};

#[test]
fn test_new_server_invalid_address() {
let _ = MetricsServer::new("invalid:99999999", None, None);
let server = MetricsServer::new("invalid:99999999", None, None);
assert!(server.is_err());
assert!(matches!(server, Err(ServerError::Create(_))));
}

#[test]
fn test_new_http_server() {
let _ = MetricsServer::new("localhost:8001", None, None);
let server = MetricsServer::new("localhost:8001", None, None);
assert!(server.is_ok());
}

#[test]
Expand All @@ -23,10 +26,7 @@ invaid certificate

let server = MetricsServer::new("localhost:8441", Some(cert), Some(key));
assert!(server.is_err());

if let Err(error) = server {
assert!(error.to_string().contains("error creating metrics server"))
}
assert!(matches!(server, Err(ServerError::Create(_))));
}

#[test]
Expand All @@ -41,21 +41,21 @@ fn test_new_server_invalid_private_key() {

let server = MetricsServer::new("localhost:8442", Some(cert), Some(key));
assert!(server.is_err());

if let Err(error) = server {
assert!(error.to_string().contains("error creating metrics server"))
}
assert!(matches!(server, Err(ServerError::Create(_))));
}

#[test]
fn test_new_server_already_running() {
let srv = MetricsServer::new("localhost:8002", None, None)
let server = MetricsServer::new("localhost:8002", None, None)
.unwrap()
.serve();

// Attempt to start an already running server should be ok
// as we will return the pre-existing thread.
srv.serve();
let server = server.serve();

// Stop the server.
server.stop().unwrap();
}

#[test]
Expand All @@ -71,7 +71,7 @@ fn test_new_https_server() {
#[test]
#[should_panic]
fn test_http_server_invalid_address() {
let _ = MetricsServer::http("invalid:99999999");
_ = MetricsServer::http("invalid:99999999");
}

#[test]
Expand Down Expand Up @@ -103,6 +103,9 @@ fn test_http_server_serve() {
res.copy_to(&mut buf).unwrap();
assert_eq!(v, buf);
}

// Stop the server.
server.stop().unwrap();
}

#[test]
Expand All @@ -113,7 +116,7 @@ fn test_https_server_invalid_address() {
let cert = include_bytes!("./certs/certificate.pem").to_vec();
let key = include_bytes!("./certs/private_key.pem").to_vec();

let _ = MetricsServer::https("invalid:99999999", cert, key);
_ = MetricsServer::https("invalid:99999999", cert, key);
}

#[test]
Expand All @@ -124,7 +127,7 @@ fn test_https_server_invalid_certificate() {
let cert = Vec::new();
let key = include_bytes!("./certs/private_key.pem").to_vec();

let _ = MetricsServer::https("localhost:8441", cert, key);
_ = MetricsServer::https("localhost:8441", cert, key);
}

#[test]
Expand All @@ -135,7 +138,7 @@ fn test_https_server_invalid_private_key() {
let cert = include_bytes!("./certs/certificate.pem").to_vec();
let key = Vec::new();

let _ = MetricsServer::https("localhost:8442", cert, key);
_ = MetricsServer::https("localhost:8442", cert, key);
}

#[test]
Expand All @@ -145,5 +148,8 @@ fn test_https_server_serve() {
let cert = include_bytes!("./certs/certificate.pem").to_vec();
let key = include_bytes!("./certs/private_key.pem").to_vec();

let _ = MetricsServer::https("localhost:8443", cert, key);
let server = MetricsServer::https("localhost:8443", cert, key);

// Stop the server.
server.stop().unwrap();
}

0 comments on commit 6873b43

Please sign in to comment.