From 3a9f58c98e84e3c6014908b7c669be230153f4ed Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 16 Dec 2024 03:51:33 +0800 Subject: [PATCH 1/2] feat: Add Summary metric type Signed-off-by: tison --- Cargo.toml | 9 ++- src/encoding.rs | 16 ++++ src/encoding/protobuf.rs | 72 +++++++++++++++++ src/encoding/text.rs | 62 +++++++++++++++ src/metrics.rs | 7 +- src/metrics/counter.rs | 2 +- src/metrics/summary.rs | 167 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 src/metrics/summary.rs diff --git a/Cargo.toml b/Cargo.toml index 18dd395f..550e9688 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://docs.rs/prometheus-client" [features] default = [] protobuf = ["dep:prost", "dep:prost-types", "dep:prost-build"] +summary = ["dep:fastant", "dep:quantiles"] [workspace] members = ["derive-encode"] @@ -22,8 +23,12 @@ dtoa = "1.0" itoa = "1.0" parking_lot = "0.12" prometheus-client-derive-encode = { version = "0.4.1", path = "derive-encode" } -prost = { version = "0.12.0", optional = true } -prost-types = { version = "0.12.0", optional = true } + +# Optional dependencies +fastant = { version = "0.1", optional = true } +prost = { version = "0.12", optional = true } +prost-types = { version = "0.12", optional = true } +quantiles = { version = "0.7", optional = true } [dev-dependencies] async-std = { version = "1", features = ["attributes"] } diff --git a/src/encoding.rs b/src/encoding.rs index 9e2acac0..f8174353 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -186,6 +186,22 @@ impl MetricEncoder<'_> { ) } + /// Encode a summary. + #[cfg(feature = "summary")] + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + for_both_mut!( + self, + MetricEncoderInner, + e, + e.encode_summary::(sum, count, quantiles) + ) + } + /// Encode a metric family. pub fn encode_family<'s, S: EncodeLabelSet>( &'s mut self, diff --git a/src/encoding/protobuf.rs b/src/encoding/protobuf.rs index 5ec54c91..48b62b18 100644 --- a/src/encoding/protobuf.rs +++ b/src/encoding/protobuf.rs @@ -53,6 +53,8 @@ impl From for openmetrics_data_model::MetricType { MetricType::Counter => openmetrics_data_model::MetricType::Counter, MetricType::Gauge => openmetrics_data_model::MetricType::Gauge, MetricType::Histogram => openmetrics_data_model::MetricType::Histogram, + #[cfg(feature = "summary")] + MetricType::Summary => openmetrics_data_model::MetricType::Summary, MetricType::Info => openmetrics_data_model::MetricType::Info, MetricType::Unknown => openmetrics_data_model::MetricType::Unknown, } @@ -288,6 +290,42 @@ impl MetricEncoder<'_> { Ok(()) } + + #[cfg(feature = "summary")] + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + let quantile = quantiles + .iter() + .enumerate() + .map(|(_, (quantile, value))| { + Ok(openmetrics_data_model::summary_value::Quantile { + quantile: *quantile, + value: *value, + }) + }) + .collect::, std::fmt::Error>>()?; + + self.family.push(openmetrics_data_model::Metric { + labels: self.labels.clone(), + metric_points: vec![openmetrics_data_model::MetricPoint { + value: Some(openmetrics_data_model::metric_point::Value::SummaryValue( + openmetrics_data_model::SummaryValue { + count, + created: None, + quantile, + sum: Some(openmetrics_data_model::summary_value::Sum::DoubleValue(sum)), + }, + )), + ..Default::default() + }], + }); + + Ok(()) + } } impl TryFrom<&Exemplar> @@ -749,6 +787,40 @@ mod tests { } } + #[cfg(feature = "summary")] + #[test] + fn encode_summary() { + use crate::metrics::summary::Summary; + + let mut registry = Registry::default(); + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(1.0); + + let metric_set = encode(®istry).unwrap(); + + let family = metric_set.metric_families.first().unwrap(); + assert_eq!("my_summary", family.name); + assert_eq!("My summary.", family.help); + + assert_eq!( + openmetrics_data_model::MetricType::Summary as i32, + extract_metric_type(&metric_set) + ); + + match extract_metric_point_value(&metric_set) { + openmetrics_data_model::metric_point::Value::SummaryValue(value) => { + assert_eq!( + Some(openmetrics_data_model::summary_value::Sum::DoubleValue(1.0)), + value.sum + ); + assert_eq!(1, value.count); + assert_eq!(11, value.quantile.len()); + } + _ => panic!("wrong value type"), + } + } + #[test] fn encode_histogram() { let mut registry = Registry::default(); diff --git a/src/encoding/text.rs b/src/encoding/text.rs index 8f3c63df..0710ceb8 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -395,6 +395,40 @@ impl MetricEncoder<'_> { }) } + #[cfg(feature = "summary")] + pub fn encode_summary( + &mut self, + sum: f64, + count: u64, + quantiles: &[(f64, f64)], + ) -> Result<(), std::fmt::Error> { + self.write_prefix_name_unit()?; + self.write_suffix("sum")?; + self.encode_labels::(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(dtoa::Buffer::new().format(sum))?; + self.newline()?; + + self.write_prefix_name_unit()?; + self.write_suffix("count")?; + self.encode_labels::(None)?; + self.writer.write_str(" ")?; + self.writer.write_str(itoa::Buffer::new().format(count))?; + self.newline()?; + + for (_, (quantile, result)) in quantiles.iter().enumerate() { + self.write_prefix_name_unit()?; + self.encode_labels(Some(&[("quantile", *quantile)]))?; + + self.writer.write_str(" ")?; + self.writer.write_str(result.to_string().as_str())?; + + self.newline()?; + } + + Ok(()) + } + pub fn encode_histogram( &mut self, sum: f64, @@ -1160,6 +1194,34 @@ mod tests { assert_eq!(&response[response.len() - 20..], "ogins_total 0\n# EOF\n"); } + #[cfg(feature = "summary")] + #[test] + fn encode_summary() { + use crate::metrics::summary::Summary; + + let mut registry = Registry::default(); + let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0); + registry.register("my_summary", "My summary", summary.clone()); + summary.observe(0.10); + summary.observe(0.20); + summary.observe(0.30); + + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let expected = "# HELP my_summary My summary.\n".to_owned() + + "# TYPE my_summary summary\n" + + "my_summary_sum 0.6000000000000001\n" + + "my_summary_count 3\n" + + "my_summary{quantile=\"0.5\"} 0.2\n" + + "my_summary{quantile=\"0.9\"} 0.3\n" + + "my_summary{quantile=\"0.99\"} 0.3\n" + + "# EOF\n"; + assert_eq!(expected, encoded); + + parse_with_python_client(encoded); + } + fn parse_with_python_client(input: String) { pyo3::prepare_freethreaded_python(); diff --git a/src/metrics.rs b/src/metrics.rs index cd389527..b43ac812 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -6,6 +6,8 @@ pub mod family; pub mod gauge; pub mod histogram; pub mod info; +#[cfg(feature = "summary")] +pub mod summary; /// A metric that is aware of its Open Metrics metric type. pub trait TypedMetric { @@ -21,12 +23,13 @@ pub enum MetricType { Gauge, Histogram, Info, + #[cfg(feature = "summary")] + Summary, Unknown, // Not (yet) supported metric types. // // GaugeHistogram, // StateSet, - // Summary } impl MetricType { @@ -37,6 +40,8 @@ impl MetricType { MetricType::Gauge => "gauge", MetricType::Histogram => "histogram", MetricType::Info => "info", + #[cfg(feature = "summary")] + MetricType::Summary => "summary", MetricType::Unknown => "unknown", } } diff --git a/src/metrics/counter.rs b/src/metrics/counter.rs index 7016c361..c4cecd27 100644 --- a/src/metrics/counter.rs +++ b/src/metrics/counter.rs @@ -265,7 +265,7 @@ mod tests { // Map infinite, subnormal and NaN to 0.0. .map(|f| if f.is_normal() { f } else { 0.0 }) .collect(); - let sum = fs.iter().sum(); + let sum: f64 = fs.iter().sum(); let counter = Counter::::default(); for f in fs { counter.inc_by(f); diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs new file mode 100644 index 00000000..59a3ff30 --- /dev/null +++ b/src/metrics/summary.rs @@ -0,0 +1,167 @@ +//! Module implementing an Open Metrics summary. +//! +//! See [`Summary`] for details. + +use crate::encoding::{EncodeMetric, MetricEncoder, NoLabelSet}; +use crate::metrics::{MetricType, TypedMetric}; +use fastant::Instant; +use parking_lot::RwLock; +use quantiles::ckms::CKMS; +use std::sync::Arc; +use std::time::Duration; + +/// Open Metrics [`Summary`] to measure distributions of discrete events. +#[derive(Debug)] +pub struct Summary { + target_quantile: Vec, + target_error: f64, + max_age_buckets: usize, + max_age_seconds: u64, + stream_duration: Duration, + inner: Arc>, +} + +impl Clone for Summary { + fn clone(&self) -> Self { + Summary { + target_quantile: self.target_quantile.clone(), + target_error: self.target_error, + max_age_buckets: self.max_age_buckets, + max_age_seconds: self.max_age_seconds, + stream_duration: self.stream_duration, + inner: self.inner.clone(), + } + } +} + +#[derive(Debug)] +pub(crate) struct InnerSummary { + sum: f64, + count: u64, + quantile_streams: Vec>, + // head_stream is like a cursor which carries the index + // of the stream in the quantile_streams that we want to query. + head_stream_idx: usize, + // timestamp at which the head_stream_idx was last rotated. + last_rotated_timestamp: Instant, +} + +impl Summary { + /// Create a new [`Summary`]. + pub fn new( + max_age_buckets: usize, + max_age_seconds: u64, + target_quantile: Vec, + target_error: f64, + ) -> Self { + let mut streams: Vec> = Vec::new(); + for _ in 0..max_age_buckets { + streams.push(CKMS::new(target_error)); + } + + let stream_duration = Duration::from_secs(max_age_seconds / (max_age_buckets as u64)); + let last_rotated_timestamp = Instant::now(); + + if target_quantile.iter().any(|&x| x > 1.0 || x < 0.0) { + panic!("Quantile value out of range"); + } + + Summary { + max_age_buckets, + max_age_seconds, + stream_duration, + target_quantile, + target_error, + inner: Arc::new(RwLock::new(InnerSummary { + sum: Default::default(), + count: Default::default(), + quantile_streams: streams, + head_stream_idx: 0, + last_rotated_timestamp, + })), + } + } + + /// Observe the given value. + pub fn observe(&self, v: f64) { + self.rotate_buckets(); + + let mut inner = self.inner.write(); + inner.sum += v; + inner.count += 1; + + // insert quantiles into all streams/buckets. + for stream in inner.quantile_streams.iter_mut() { + stream.insert(v); + } + } + + /// Retrieve the values of the summary metric. + pub(crate) fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { + self.rotate_buckets(); + + let inner = self.inner.read(); + let sum = inner.sum; + let count = inner.count; + let mut quantile_values: Vec<(f64, f64)> = Vec::new(); + + for q in self.target_quantile.iter() { + match inner.quantile_streams[inner.head_stream_idx].query(*q) { + Some((_, v)) => quantile_values.push((*q, v)), + None => continue, + }; + } + (sum, count, quantile_values) + } + + fn rotate_buckets(&self) { + let mut inner = self.inner.write(); + if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { + inner.last_rotated_timestamp = Instant::now(); + if inner.head_stream_idx == self.max_age_buckets { + inner.head_stream_idx = 0; + } else { + inner.head_stream_idx += 1; + } + }; + } +} + +impl TypedMetric for Summary { + const TYPE: MetricType = MetricType::Summary; +} + +impl EncodeMetric for Summary { + fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> { + let (sum, count, quantiles) = self.get(); + encoder.encode_summary::(sum, count, &quantiles) + } + + fn metric_type(&self) -> MetricType { + Self::TYPE + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn summary() { + let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01); + summary.observe(1.0); + summary.observe(5.0); + summary.observe(10.0); + + let (s, c, q) = summary.get(); + assert_eq!(16.0, s); + assert_eq!(3, c); + assert_eq!(vec![(0.5, 5.0), (0.9, 10.0), (0.99, 10.0)], q); + } + + #[test] + #[should_panic(expected = "Quantile value out of range")] + fn summary_panic() { + Summary::new(5, 10, vec![1.0, 5.0, 9.0], 0.01); + } +} From 22865b4d3d01d6870360c3ffe3ce4d100ccc36d5 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 16 Dec 2024 03:56:03 +0800 Subject: [PATCH 2/2] fixup continous lock Signed-off-by: tison --- src/metrics/summary.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/metrics/summary.rs b/src/metrics/summary.rs index 59a3ff30..4a980408 100644 --- a/src/metrics/summary.rs +++ b/src/metrics/summary.rs @@ -84,9 +84,9 @@ impl Summary { /// Observe the given value. pub fn observe(&self, v: f64) { - self.rotate_buckets(); - let mut inner = self.inner.write(); + self.rotate_buckets(&mut inner); + inner.sum += v; inner.count += 1; @@ -98,7 +98,9 @@ impl Summary { /// Retrieve the values of the summary metric. pub(crate) fn get(&self) -> (f64, u64, Vec<(f64, f64)>) { - self.rotate_buckets(); + let mut inner = self.inner.write(); + self.rotate_buckets(&mut inner); + drop(inner); let inner = self.inner.read(); let sum = inner.sum; @@ -114,8 +116,7 @@ impl Summary { (sum, count, quantile_values) } - fn rotate_buckets(&self) { - let mut inner = self.inner.write(); + fn rotate_buckets(&self, inner: &mut InnerSummary) { if inner.last_rotated_timestamp.elapsed() >= self.stream_duration { inner.last_rotated_timestamp = Instant::now(); if inner.head_stream_idx == self.max_age_buckets {