From 5d45325e43afaf1e7cd60142ae764494db105602 Mon Sep 17 00:00:00 2001 From: Andrew Kirillov <20803092+akirillo@users.noreply.github.com> Date: Tue, 17 Jan 2023 17:41:05 -0800 Subject: [PATCH] feat(net): Metered senders (#726) Co-authored-by: Bjerg --- Cargo.lock | 9 +++ Cargo.toml | 1 + crates/metrics/common/Cargo.toml | 19 ++++++ crates/metrics/common/src/lib.rs | 10 ++++ crates/metrics/common/src/metered_sender.rs | 64 +++++++++++++++++++++ crates/net/common/Cargo.toml | 2 +- crates/net/common/src/bandwidth_meter.rs | 26 ++++----- 7 files changed, 117 insertions(+), 14 deletions(-) create mode 100644 crates/metrics/common/Cargo.toml create mode 100644 crates/metrics/common/src/lib.rs create mode 100644 crates/metrics/common/src/metered_sender.rs diff --git a/Cargo.lock b/Cargo.lock index fd195d1aa604..390af41439ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4260,6 +4260,15 @@ dependencies = [ "libc", ] +[[package]] +name = "reth-metrics-common" +version = "0.1.0" +dependencies = [ + "metrics", + "reth-metrics-derive", + "tokio", +] + [[package]] name = "reth-metrics-derive" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ec9a87ecaff3..7808f7dab021 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "crates/tasks", "crates/transaction-pool", "crates/metrics/metrics-derive", + "crates/metrics/common", "crates/cli/utils", ] default-members = ["bin/reth"] diff --git a/crates/metrics/common/Cargo.toml b/crates/metrics/common/Cargo.toml new file mode 100644 index 000000000000..ced323c4f325 --- /dev/null +++ b/crates/metrics/common/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "reth-metrics-common" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/paradigmxyz/reth" +description = """ +Common metric types across the Reth codebase +""" + +[dependencies] +# reth +reth-metrics-derive = { path = "../../metrics/metrics-derive" } + +# async +tokio = { version = "1.21.2", features = ["full"] } + +# metrics +metrics = "0.20.1" diff --git a/crates/metrics/common/src/lib.rs b/crates/metrics/common/src/lib.rs new file mode 100644 index 000000000000..b4cae9b049a5 --- /dev/null +++ b/crates/metrics/common/src/lib.rs @@ -0,0 +1,10 @@ +#![warn(missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Common metric types that can be used across the Reth codebase + +pub mod metered_sender; diff --git a/crates/metrics/common/src/metered_sender.rs b/crates/metrics/common/src/metered_sender.rs new file mode 100644 index 000000000000..d7bb6324736e --- /dev/null +++ b/crates/metrics/common/src/metered_sender.rs @@ -0,0 +1,64 @@ +//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages +//! sent, number of errors, etc. + +use metrics::Counter; +use reth_metrics_derive::Metrics; +use tokio::sync::mpsc::{ + error::{SendError, TrySendError}, + Sender, +}; + +/// Network throughput metrics +#[derive(Metrics)] +#[metrics(dynamic = true)] +struct MeteredSenderMetrics { + /// Number of messages sent + messages_sent: Counter, + /// Number of failed message deliveries + send_errors: Counter, +} + +/// Manages updating the network throughput metrics for a metered stream +pub struct MeteredSender { + /// The [`Sender`] that this wraps around + sender: Sender, + /// Holds the gauges for inbound and outbound throughput + metrics: MeteredSenderMetrics, +} + +impl MeteredSender { + /// Creates a new [`MeteredSender`] wrapping around the provided [`Sender`] + pub fn new(sender: Sender, scope: &'static str) -> Self { + Self { sender, metrics: MeteredSenderMetrics::new(scope) } + } + + /// Calls the underlying [`Sender`]'s `try_send`, incrementing the appropriate + /// metrics depending on the result. + pub fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + match self.sender.try_send(message) { + Ok(()) => { + self.metrics.messages_sent.increment(1); + Ok(()) + } + Err(error) => { + self.metrics.send_errors.increment(1); + Err(error) + } + } + } + + /// Calls the underlying [`Sender`]'s `send`, incrementing the appropriate + /// metrics depending on the result. + pub async fn send(&mut self, value: T) -> Result<(), SendError> { + match self.sender.send(value).await { + Ok(()) => { + self.metrics.messages_sent.increment(1); + Ok(()) + } + Err(error) => { + self.metrics.send_errors.increment(1); + Err(error) + } + } + } +} diff --git a/crates/net/common/Cargo.toml b/crates/net/common/Cargo.toml index 2630a1a0bfe2..7994f6503929 100644 --- a/crates/net/common/Cargo.toml +++ b/crates/net/common/Cargo.toml @@ -14,4 +14,4 @@ reth-primitives = { path = "../../primitives" } # async pin-project = "1.0" -tokio = { version = "1.21.2", features = ["full"] } \ No newline at end of file +tokio = { version = "1.21.2", features = ["full"] } diff --git a/crates/net/common/src/bandwidth_meter.rs b/crates/net/common/src/bandwidth_meter.rs index 6b4fd366851b..c98f9097141f 100644 --- a/crates/net/common/src/bandwidth_meter.rs +++ b/crates/net/common/src/bandwidth_meter.rs @@ -1,4 +1,4 @@ -//! Support for monitoring bandwidth. Takes heavy inspiration from https://github.com/libp2p/rust-libp2p/blob/master/src/bandwidth.rs +//! Support for metering bandwidth. Takes heavy inspiration from https://github.com/libp2p/rust-libp2p/blob/master/src/bandwidth.rs // Copyright 2019 Parity Technologies (UK) Ltd. // @@ -90,7 +90,7 @@ pub struct MeteredStream { /// The stream this instruments #[pin] inner: S, - /// The [`BandwidthMeter`] struct this uses to monitor bandwidth + /// The [`BandwidthMeter`] struct this uses to meter bandwidth meter: BandwidthMeter, } @@ -209,13 +209,13 @@ mod tests { // Taken in large part from https://docs.rs/tokio/latest/tokio/io/struct.DuplexStream.html#example let (client, server) = duplex(64); - let mut monitored_client = MeteredStream::new(client); - let mut monitored_server = MeteredStream::new(server); + let mut metered_client = MeteredStream::new(client); + let mut metered_server = MeteredStream::new(server); - duplex_stream_ping_pong(&mut monitored_client, &mut monitored_server).await; + duplex_stream_ping_pong(&mut metered_client, &mut metered_server).await; - assert_bandwidth_counts(monitored_client.get_bandwidth_meter(), 4, 4); - assert_bandwidth_counts(monitored_server.get_bandwidth_meter(), 4, 4); + assert_bandwidth_counts(metered_client.get_bandwidth_meter(), 4, 4); + assert_bandwidth_counts(metered_server.get_bandwidth_meter(), 4, 4); } #[tokio::test] @@ -252,18 +252,18 @@ mod tests { let shared_client_bandwidth_meter = BandwidthMeter::default(); let shared_server_bandwidth_meter = BandwidthMeter::default(); - let mut monitored_client_1 = + let mut metered_client_1 = MeteredStream::new_with_meter(client_1, shared_client_bandwidth_meter.clone()); - let mut monitored_server_1 = + let mut metered_server_1 = MeteredStream::new_with_meter(server_1, shared_server_bandwidth_meter.clone()); - let mut monitored_client_2 = + let mut metered_client_2 = MeteredStream::new_with_meter(client_2, shared_client_bandwidth_meter.clone()); - let mut monitored_server_2 = + let mut metered_server_2 = MeteredStream::new_with_meter(server_2, shared_server_bandwidth_meter.clone()); - duplex_stream_ping_pong(&mut monitored_client_1, &mut monitored_server_1).await; - duplex_stream_ping_pong(&mut monitored_client_2, &mut monitored_server_2).await; + duplex_stream_ping_pong(&mut metered_client_1, &mut metered_server_1).await; + duplex_stream_ping_pong(&mut metered_client_2, &mut metered_server_2).await; assert_bandwidth_counts(&shared_client_bandwidth_meter, 8, 8); assert_bandwidth_counts(&shared_server_bandwidth_meter, 8, 8);