Skip to content

Commit

Permalink
chore: bump metrics version
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Jan 12, 2025
1 parent c337e9f commit 7c0574c
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 160 deletions.
223 changes: 164 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ log = "0.4"
lru = "0.12.0"
memmap2 = "0.8.0"
mempool_test_utils = { path = "crates/mempool_test_utils", version = "0.0.0" }
metrics = "0.21.0"
metrics-exporter-prometheus = "0.12.1"
metrics = "0.24.1"
metrics-exporter-prometheus = "0.16.1"
metrics-process = "1.0.11"
mockall = "0.12.1"
mockito = "1.4.0"
Expand Down
5 changes: 2 additions & 3 deletions crates/papyrus_monitoring_gateway/src/gateway_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::body::Body;
use axum::http::{Request, StatusCode};
use axum::response::Response;
use axum::Router;
use metrics::{absolute_counter, describe_counter, register_counter};
use metrics::{counter, describe_counter};
use metrics_exporter_prometheus::PrometheusBuilder;
use papyrus_storage::{table_names, test_utils};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -182,9 +182,8 @@ async fn with_metrics() {
let metric_name = "metric_name";
let metric_help = "metric_help";
let metric_value = 8224;
register_counter!(metric_name);
counter!(metric_name).absolute(metric_value);
describe_counter!(metric_name, metric_help);
absolute_counter!(metric_name, metric_value);

let response = request_app(app, "metrics").await;

Expand Down
38 changes: 13 additions & 25 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
// TODO(shahak): remove the advertised_multiaddr arg once we manage external addresses
// in a behaviour.
pub(crate) fn generic_new(mut swarm: SwarmT, advertised_multiaddr: Option<Multiaddr>) -> Self {
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS, 0f64);
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS).set(0f64);
let reported_peer_receivers = FuturesUnordered::new();
reported_peer_receivers.push(futures::future::pending().boxed());
if let Some(address) = advertised_multiaddr.clone() {
Expand Down Expand Up @@ -268,10 +268,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
debug!("Connected to peer id: {peer_id:?}");
gauge!(
papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS,
self.swarm.num_connected_peers() as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS)
.set(self.swarm.num_connected_peers() as f64);
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
match cause {
Expand All @@ -280,10 +278,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
None => debug!("Connection to {peer_id:?} closed."),
}
gauge!(
papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS,
self.swarm.num_connected_peers() as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS)
.set(self.swarm.num_connected_peers() as f64);
}
SwarmEvent::Behaviour(event) => {
self.handle_behaviour_event(event)?;
Expand Down Expand Up @@ -405,10 +401,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
query: Vec<u8>,
) {
self.num_active_inbound_sessions += 1;
gauge!(
papyrus_metrics::PAPYRUS_NUM_ACTIVE_INBOUND_SESSIONS,
self.num_active_inbound_sessions as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_ACTIVE_INBOUND_SESSIONS)
.set(self.num_active_inbound_sessions as f64);
let (report_sender, report_receiver) = oneshot::channel::<()>();
self.handle_new_report_receiver(peer_id, report_receiver);
// TODO: consider returning error instead of panic.
Expand Down Expand Up @@ -572,10 +566,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
outbound_session_id: {outbound_session_id:?}"
);
self.num_active_outbound_sessions += 1;
gauge!(
papyrus_metrics::PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
self.num_active_outbound_sessions as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS)
.set(self.num_active_outbound_sessions as f64);
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
self.sqmr_outbound_report_receivers_awaiting_assignment
.insert(outbound_session_id, report_receiver);
Expand All @@ -598,17 +590,13 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
match session_id {
SessionId::InboundSessionId(_) => {
self.num_active_inbound_sessions -= 1;
gauge!(
papyrus_metrics::PAPYRUS_NUM_ACTIVE_INBOUND_SESSIONS,
self.num_active_inbound_sessions as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_ACTIVE_INBOUND_SESSIONS)
.set(self.num_active_inbound_sessions as f64);
}
SessionId::OutboundSessionId(_) => {
self.num_active_outbound_sessions += 1;
gauge!(
papyrus_metrics::PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS,
self.num_active_outbound_sessions as f64
);
gauge!(papyrus_metrics::PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS)
.set(self.num_active_outbound_sessions as f64);
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ impl BlockData for SignedBlockHeader {
.expect("Vec::first should return a value on a vector of size 1"),
)?
.commit()?;
gauge!(
papyrus_metrics::PAPYRUS_HEADER_MARKER,
self.block_header.block_header_without_hash.block_number.unchecked_next().0 as f64
gauge!(papyrus_metrics::PAPYRUS_HEADER_MARKER).set(
self.block_header.block_header_without_hash.block_number.unchecked_next().0 as f64,
);
// TODO(shahak): Fix code dup with central sync
let time_delta = Utc::now()
Expand All @@ -57,7 +56,7 @@ impl BlockData for SignedBlockHeader {
let header_latency = time_delta.num_seconds();
debug!("Header latency: {}.", header_latency);
if header_latency >= 0 {
gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC, header_latency as f64);
gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC).set(header_latency as f64);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl BlockData for (ThinStateDiff, BlockNumber) {
storage_writer: &mut StorageWriter,
) -> Result<(), StorageError> {
storage_writer.begin_rw_txn()?.append_state_diff(self.1, self.0)?.commit()?;
gauge!(papyrus_metrics::PAPYRUS_STATE_MARKER, self.1.unchecked_next().0 as f64);
gauge!(papyrus_metrics::PAPYRUS_STATE_MARKER).set(self.1.unchecked_next().0 as f64);
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_proc_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn latency_histogram(attr: TokenStream, input: TokenStream) -> TokenStream {
let return_value=#origin_block;
if let Some(start_time) = start_function_time {
let exec_time = start_time.elapsed().as_secs_f64();
metrics::histogram!(#metric_name, exec_time);
metrics::histogram!(#metric_name).record(exec_time);
tracing::debug!("{}: {}", #metric_name, exec_time);
}
return_value
Expand Down
24 changes: 13 additions & 11 deletions crates/papyrus_rpc/src/rpc_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Instant;
use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, TransportProtocol};
use jsonrpsee::types::Params;
use jsonrpsee::Methods;
use metrics::{histogram, increment_counter, register_counter, register_histogram};
use metrics::{counter, histogram};

// Name of the metrics.
const INCOMING_REQUEST: &str = "rpc_incoming_requests";
Expand All @@ -23,14 +23,15 @@ const ILLEGAL_METHOD: &str = "illegal_method";
// Register the metrics and returns a set of the method names.
fn init_metrics(methods: &Methods) -> HashSet<String> {
let mut methods_set: HashSet<String> = HashSet::new();
register_counter!(INCOMING_REQUEST, METHOD_LABEL => ILLEGAL_METHOD);
register_counter!(FAILED_REQUESTS, METHOD_LABEL => ILLEGAL_METHOD);
counter!(INCOMING_REQUEST, METHOD_LABEL => ILLEGAL_METHOD).absolute(0);
counter!(FAILED_REQUESTS, METHOD_LABEL => ILLEGAL_METHOD).absolute(0);
for method in methods.method_names() {
methods_set.insert(method.to_string());
let (method_name, version) = get_method_and_version(method);
register_counter!(FAILED_REQUESTS, METHOD_LABEL => method_name.clone(), VERSION_LABEL => version.clone());
register_counter!(INCOMING_REQUEST, METHOD_LABEL => method_name.clone(), VERSION_LABEL => version.clone());
register_histogram!(REQUEST_LATENCY, METHOD_LABEL => method_name, VERSION_LABEL => version);
counter!(FAILED_REQUESTS, METHOD_LABEL => method_name.clone(), VERSION_LABEL => version.clone()).absolute(0);
counter!(INCOMING_REQUEST, METHOD_LABEL => method_name.clone(), VERSION_LABEL => version.clone()).absolute(0);
histogram!(REQUEST_LATENCY, METHOD_LABEL => method_name, VERSION_LABEL => version)
.record(0);
}
methods_set
}
Expand Down Expand Up @@ -61,14 +62,15 @@ impl Logger for MetricLogger {
if self.methods_set.contains(method_name) {
let (method, version) = get_method_and_version(method_name);
if let jsonrpsee::helpers::MethodResponseResult::Failed(_) = success_or_error {
increment_counter!(FAILED_REQUESTS, METHOD_LABEL=> method.clone(), VERSION_LABEL=> version.clone());
counter!(FAILED_REQUESTS, METHOD_LABEL=> method.clone(), VERSION_LABEL=> version.clone()).increment(1);
}
increment_counter!(INCOMING_REQUEST, METHOD_LABEL=> method.clone(), VERSION_LABEL=> version.clone());
counter!(INCOMING_REQUEST, METHOD_LABEL=> method.clone(), VERSION_LABEL=> version.clone()).increment(1);
let latency = started_at.elapsed().as_secs_f64();
histogram!(REQUEST_LATENCY, latency,METHOD_LABEL=> method, VERSION_LABEL=> version);
histogram!(REQUEST_LATENCY, METHOD_LABEL=> method, VERSION_LABEL=> version)
.record(latency);
} else {
increment_counter!(INCOMING_REQUEST, METHOD_LABEL => ILLEGAL_METHOD);
increment_counter!(FAILED_REQUESTS, METHOD_LABEL => ILLEGAL_METHOD);
counter!(INCOMING_REQUEST, METHOD_LABEL => ILLEGAL_METHOD).increment(1);
counter!(FAILED_REQUESTS, METHOD_LABEL => ILLEGAL_METHOD).increment(1);
}
}

Expand Down
16 changes: 6 additions & 10 deletions crates/papyrus_storage/src/storage_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#[path = "storage_metrics_test.rs"]
mod storage_metrics_test;

use metrics::{absolute_counter, gauge};
use metrics::{counter, gauge};
use tracing::debug;

use crate::{StorageReader, StorageResult};
Expand All @@ -17,15 +17,11 @@ use crate::{StorageReader, StorageResult};
#[allow(clippy::as_conversions)]
pub fn update_storage_metrics(reader: &StorageReader) -> StorageResult<()> {
debug!("updating storage metrics");
gauge!("storage_free_pages_number", reader.db_reader.get_free_pages()? as f64);
gauge!("storage_free_pages_number").set(reader.db_reader.get_free_pages()? as f64);
let info = reader.db_reader.get_db_info()?;
absolute_counter!(
"storage_last_page_number",
u64::try_from(info.last_pgno()).expect("usize should fit in u64")
);
absolute_counter!(
"storage_last_transaction_index",
u64::try_from(info.last_txnid()).expect("usize should fit in u64")
);
counter!("storage_last_page_number")
.absolute(u64::try_from(info.last_pgno()).expect("usize should fit in u64"));
counter!("storage_last_transaction_index")
.absolute(u64::try_from(info.last_txnid()).expect("usize should fit in u64"));
Ok(())
}
42 changes: 14 additions & 28 deletions crates/papyrus_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,10 @@ impl<
.append_block_signature(block_number, signature)?
.append_body(block_number, block.body)?
.commit()?;
metrics::gauge!(
papyrus_metrics::PAPYRUS_HEADER_MARKER,
block_number.unchecked_next().0 as f64
);
metrics::gauge!(
papyrus_metrics::PAPYRUS_BODY_MARKER,
block_number.unchecked_next().0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_HEADER_MARKER)
.set(block_number.unchecked_next().0 as f64);
metrics::gauge!(papyrus_metrics::PAPYRUS_BODY_MARKER)
.set(block_number.unchecked_next().0 as f64);
let time_delta = Utc::now()
- Utc
.timestamp_opt(block.header.block_header_without_hash.timestamp.0 as i64, 0)
Expand All @@ -440,7 +436,7 @@ impl<
let header_latency = time_delta.num_seconds();
debug!("Header latency: {}.", header_latency);
if header_latency >= 0 {
metrics::gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC, header_latency as f64);
metrics::gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC).set(header_latency as f64);
}
Ok(())
}
Expand Down Expand Up @@ -477,15 +473,11 @@ impl<
)?
.commit()?;

metrics::gauge!(
papyrus_metrics::PAPYRUS_STATE_MARKER,
block_number.unchecked_next().0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_STATE_MARKER)
.set(block_number.unchecked_next().0 as f64);
let compiled_class_marker = self.reader.begin_ro_txn()?.get_compiled_class_marker()?;
metrics::gauge!(
papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER,
compiled_class_marker.0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER)
.set(compiled_class_marker.0 as f64);

// Info the user on syncing the block once all the data is stored.
info!("Added block {} with hash {:#064x}.", block_number, block_hash.0);
Expand All @@ -509,10 +501,8 @@ impl<
txn.commit()?;
let compiled_class_marker =
self.reader.begin_ro_txn()?.get_compiled_class_marker()?;
metrics::gauge!(
papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER,
compiled_class_marker.0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER)
.set(compiled_class_marker.0 as f64);
debug!("Added compiled class.");
Ok(())
}
Expand Down Expand Up @@ -554,10 +544,8 @@ impl<
if txn.get_base_layer_block_marker()? != block_number.unchecked_next() {
info!("Verified block {block_number} hash against base layer.");
txn.update_base_layer_block_marker(&block_number.unchecked_next())?.commit()?;
metrics::gauge!(
papyrus_metrics::PAPYRUS_BASE_LAYER_MARKER,
block_number.unchecked_next().0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_BASE_LAYER_MARKER)
.set(block_number.unchecked_next().0 as f64);
}
Ok(())
}
Expand Down Expand Up @@ -691,9 +679,7 @@ fn stream_new_blocks<
let central_block_marker = latest_central_block.map_or(
BlockNumber::default(), |block_hash_and_number| block_hash_and_number.number.unchecked_next()
);
metrics::gauge!(
papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER, central_block_marker.0 as f64
);
metrics::gauge!(papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER).set(central_block_marker.0 as f64);
if header_marker == central_block_marker {
// Only if the node have the last block and state (without casms), sync pending data.
if collect_pending_data && reader.begin_ro_txn()?.get_state_marker()? == header_marker{
Expand Down
5 changes: 3 additions & 2 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use metrics::counter;
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
Expand Down Expand Up @@ -78,7 +79,7 @@ where
let mut manager = MultiHeightManager::new(validator_id, timeouts);
#[allow(clippy::as_conversions)] // FIXME: use int metrics so `as f64` may be removed.
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT).set(current_height.0 as f64);

let must_observer = current_height < start_active_height;
match manager
Expand All @@ -101,7 +102,7 @@ where
}
RunHeightRes::Sync(sync_height) => {
info!("Sync to height: {}. current_height={}", sync_height, current_height);
metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT);
counter!(PAPYRUS_CONSENSUS_SYNC_COUNT).increment(1);
current_height = sync_height.unchecked_next();
}
}
Expand Down
17 changes: 7 additions & 10 deletions crates/starknet_http_server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::{absolute_counter, describe_counter, register_counter};
use metrics::{counter, describe_counter};
use tracing::info;

#[cfg(test)]
Expand All @@ -14,27 +14,24 @@ pub(crate) const ADDED_TRANSACTIONS_FAILURE: (&str, &str, u64) =

pub(crate) fn init_metrics() {
info!("Initializing HTTP Server metrics");
register_counter!(ADDED_TRANSACTIONS_TOTAL.0);
counter!(ADDED_TRANSACTIONS_TOTAL.0).absolute(ADDED_TRANSACTIONS_TOTAL.2);
describe_counter!(ADDED_TRANSACTIONS_TOTAL.0, ADDED_TRANSACTIONS_TOTAL.1);
absolute_counter!(ADDED_TRANSACTIONS_TOTAL.0, ADDED_TRANSACTIONS_TOTAL.2);

register_counter!(ADDED_TRANSACTIONS_SUCCESS.0);
counter!(ADDED_TRANSACTIONS_SUCCESS.0).absolute(ADDED_TRANSACTIONS_SUCCESS.2);
describe_counter!(ADDED_TRANSACTIONS_SUCCESS.0, ADDED_TRANSACTIONS_SUCCESS.1);
absolute_counter!(ADDED_TRANSACTIONS_SUCCESS.0, ADDED_TRANSACTIONS_SUCCESS.2);

register_counter!(ADDED_TRANSACTIONS_FAILURE.0);
counter!(ADDED_TRANSACTIONS_FAILURE.0).absolute(ADDED_TRANSACTIONS_FAILURE.2);
describe_counter!(ADDED_TRANSACTIONS_FAILURE.0, ADDED_TRANSACTIONS_FAILURE.1);
absolute_counter!(ADDED_TRANSACTIONS_FAILURE.0, ADDED_TRANSACTIONS_FAILURE.2);
}

pub(crate) fn record_added_transaction() {
metrics::increment_counter!(ADDED_TRANSACTIONS_TOTAL.0);
counter!(ADDED_TRANSACTIONS_TOTAL.0).increment(1);
}

pub(crate) fn record_added_transaction_status(add_tx_success: bool) {
if add_tx_success {
metrics::increment_counter!(ADDED_TRANSACTIONS_SUCCESS.0);
counter!(ADDED_TRANSACTIONS_SUCCESS.0).increment(1);
} else {
metrics::increment_counter!(ADDED_TRANSACTIONS_FAILURE.0);
counter!(ADDED_TRANSACTIONS_FAILURE.0).increment(1);
}
}
Loading

0 comments on commit 7c0574c

Please sign in to comment.