diff --git a/crates/net/network/src/budget.rs b/crates/net/network/src/budget.rs index e20d882fe827..0b5e3d3a90bb 100644 --- a/crates/net/network/src/budget.rs +++ b/crates/net/network/src/budget.rs @@ -3,6 +3,11 @@ /// Default is 10 iterations. pub const DEFAULT_BUDGET_TRY_DRAIN_STREAM: u32 = 10; +/// Default budget to try and drain headers and bodies download streams. +/// +/// Default is 4 iterations. +pub const DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS: u32 = 4; + /// Default budget to try and drain [`Swarm`](crate::swarm::Swarm). /// /// Default is 10 [`SwarmEvent`](crate::swarm::SwarmEvent)s. @@ -68,8 +73,8 @@ macro_rules! poll_nested_stream_with_budget { /// Metered poll of the given stream. Breaks with `true` if there maybe is more work. #[macro_export] macro_rules! metered_poll_nested_stream_with_budget { - ($acc:ident, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{ - duration_metered_exec!( + ($acc:expr, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{ + $crate::duration_metered_exec!( { $crate::poll_nested_stream_with_budget!($target, $label, $budget, $poll_stream, $on_ready_some $(, $on_ready_none;)?) }, diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 4f54cbdba7be..9e7b373cac35 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -1,8 +1,8 @@ //! Blocks/Headers management for the p2p network. use crate::{ - budget::DEFAULT_BUDGET_TRY_DRAIN_STREAM, metrics::EthRequestHandlerMetrics, peers::PeersHandle, - poll_nested_stream_with_budget, + budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, + metrics::EthRequestHandlerMetrics, peers::PeersHandle, }; use alloy_rlp::Encodable; use futures::StreamExt; @@ -18,6 +18,7 @@ use std::{ future::Future, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::sync::{mpsc::Receiver, oneshot}; use tokio_stream::wrappers::ReceiverStream; @@ -239,10 +240,12 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - let maybe_more_incoming_requests = poll_nested_stream_with_budget!( + let mut acc = Duration::ZERO; + let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!( + acc, "net::eth", "Incoming eth requests stream", - DEFAULT_BUDGET_TRY_DRAIN_STREAM, + DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, this.incoming_requests.poll_next_unpin(cx), |incoming| { match incoming { @@ -262,6 +265,8 @@ where }, ); + this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64()); + // stream is fully drained and import futures pending if maybe_more_incoming_requests { // make sure we're woken up again diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index a219b6492e96..602dca0987a2 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -161,8 +161,8 @@ impl NetworkManager { // update metrics for whole poll function metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64()); // update poll metrics for nested items - metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); - metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64()); + metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); + metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64()); } } diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 989dc431b64b..1ed8452baaaa 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -69,12 +69,12 @@ pub struct NetworkMetrics { /// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle. /// /// Duration in seconds. - pub(crate) duration_poll_network_handle: Gauge, + pub(crate) acc_duration_poll_network_handle: Gauge, /// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the /// [`NetworkManager`](crate::NetworkManager) future. /// /// Duration in seconds. - pub(crate) duration_poll_swarm: Gauge, + pub(crate) acc_duration_poll_swarm: Gauge, } /// Metrics for SessionManager @@ -226,12 +226,12 @@ pub struct TransactionFetcherMetrics { /// accumulator value passed as a mutable reference. #[macro_export] macro_rules! duration_metered_exec { - ($code:expr, $acc:ident) => {{ - let start = Instant::now(); + ($code:expr, $acc:expr) => {{ + let start = std::time::Instant::now(); let res = $code; - *$acc += start.elapsed(); + $acc += start.elapsed(); res }}; @@ -321,6 +321,10 @@ pub struct EthRequestHandlerMetrics { /// Number of GetNodeData requests received pub(crate) eth_node_data_requests_received_total: Counter, + + /// Duration in seconds of call to poll + /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler). + pub(crate) acc_duration_poll_eth_req_handler: Gauge, } /// Eth67 announcement metrics, track entries by TxType diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 363e3c046294..b95b25071d72 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -50,7 +50,7 @@ use std::{ collections::HashMap, pin::Pin, task::{ready, Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::{debug, trace}; @@ -430,7 +430,6 @@ impl TransactionFetcher { let budget_find_idle_fallback_peer = self .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports); - let acc = &mut search_durations.find_idle_peer; let peer_id = duration_metered_exec!( { let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash( @@ -444,7 +443,7 @@ impl TransactionFetcher { peer_id }, - acc + search_durations.find_idle_peer ); // peer should always exist since `is_session_active` already checked @@ -460,7 +459,6 @@ impl TransactionFetcher { &has_capacity_wrt_pending_pool_imports, ); - let acc = &mut search_durations.fill_request; duration_metered_exec!( { self.fill_request_from_hashes_pending_fetch( @@ -469,7 +467,7 @@ impl TransactionFetcher { budget_fill_request, ) }, - acc + search_durations.fill_request ); // free unused memory diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 48e32c0886e8..1ce5dc94343a 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1231,9 +1231,8 @@ where // this can potentially validate >200k transactions. More if the message size // is bigger than the soft limit on a `PooledTransactions` response which is // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB). - let acc = &mut poll_durations.acc_pending_imports; let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_pending_imports, "net::tx", "Batched pool imports stream", DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, @@ -1242,9 +1241,8 @@ where ); // Advance network/peer related events (update peers map). - let acc = &mut poll_durations.acc_network_events; let maybe_more_network_events = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_network_events, "net::tx", "Network events stream", DEFAULT_BUDGET_TRY_DRAIN_STREAM, @@ -1261,9 +1259,8 @@ where // We don't expect this buffer to be large, since only pending transactions are // emitted here. let mut new_txs = Vec::new(); - let acc = &mut poll_durations.acc_imported_txns; let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_imported_txns, "net::tx", "Pending transactions stream", DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS, @@ -1284,9 +1281,8 @@ where // this can potentially queue >200k transactions for insertion to pool. More // if the message size is bigger than the soft limit on a `PooledTransactions` // response which is 2 MiB. - let acc = &mut poll_durations.acc_fetch_events; let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_fetch_events, "net::tx", "Transaction fetch events stream", DEFAULT_BUDGET_TRY_DRAIN_STREAM, @@ -1308,9 +1304,8 @@ where // validated until they are inserted into the pool, this can potentially queue // >13k transactions for insertion to pool. More if the message size is bigger // than the soft limit on a `Transactions` broadcast message, which is 128 KiB. - let acc = &mut poll_durations.acc_tx_events; let maybe_more_tx_events = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_tx_events, "net::tx", "Network transaction events stream", DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS, @@ -1322,20 +1317,18 @@ where // capacity for this (fetch txns). // // Sends at most one request. - let acc = &mut poll_durations.acc_pending_fetch; duration_metered_exec!( { if this.has_capacity_for_fetching_pending_hashes() { this.on_fetch_hashes_pending_fetch(); } }, - acc + poll_durations.acc_pending_fetch ); // Advance commands (propagate/fetch/serve txns). - let acc = &mut poll_durations.acc_cmds; let maybe_more_commands = metered_poll_nested_stream_with_budget!( - acc, + poll_durations.acc_cmds, "net::tx", "Commands channel", DEFAULT_BUDGET_TRY_DRAIN_STREAM, diff --git a/etc/grafana/dashboards/reth-mempool.json b/etc/grafana/dashboards/reth-mempool.json index ff45a5bf03a8..f93276e509a4 100644 --- a/etc/grafana/dashboards/reth-mempool.json +++ b/etc/grafana/dashboards/reth-mempool.json @@ -2521,7 +2521,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "reth_network_duration_poll_network_handle{instance=\"$instance\"}", + "expr": "reth_network_acc_duration_poll_network_handle{instance=\"$instance\"}", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -2538,7 +2538,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "reth_network_duration_poll_swarm{instance=\"$instance\"}", + "expr": "reth_network_acc_duration_poll_swarm{instance=\"$instance\"}", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true,