Skip to content

Commit

Permalink
perf(net): decrease budget EthRequestHandler + metrics (paradigmxyz…
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane authored May 30, 2024
1 parent 2a8a693 commit a8a4f67
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 34 deletions.
9 changes: 7 additions & 2 deletions crates/net/network/src/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
/// Default is 10 iterations.
pub const DEFAULT_BUDGET_TRY_DRAIN_STREAM: u32 = 10;

/// Default budget to try and drain headers and bodies download streams.
///
/// Default is 4 iterations.
pub const DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS: u32 = 4;

/// Default budget to try and drain [`Swarm`](crate::swarm::Swarm).
///
/// Default is 10 [`SwarmEvent`](crate::swarm::SwarmEvent)s.
Expand Down Expand Up @@ -68,8 +73,8 @@ macro_rules! poll_nested_stream_with_budget {
/// Metered poll of the given stream. Breaks with `true` if there maybe is more work.
#[macro_export]
macro_rules! metered_poll_nested_stream_with_budget {
($acc:ident, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
duration_metered_exec!(
($acc:expr, $target:literal, $label:literal, $budget:ident, $poll_stream:expr, $on_ready_some:expr $(, $on_ready_none:expr;)? $(,)?) => {{
$crate::duration_metered_exec!(
{
$crate::poll_nested_stream_with_budget!($target, $label, $budget, $poll_stream, $on_ready_some $(, $on_ready_none;)?)
},
Expand Down
13 changes: 9 additions & 4 deletions crates/net/network/src/eth_requests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Blocks/Headers management for the p2p network.
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_STREAM, metrics::EthRequestHandlerMetrics, peers::PeersHandle,
poll_nested_stream_with_budget,
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics, peers::PeersHandle,
};
use alloy_rlp::Encodable;
use futures::StreamExt;
Expand All @@ -18,6 +18,7 @@ use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -239,10 +240,12 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

let maybe_more_incoming_requests = poll_nested_stream_with_budget!(
let mut acc = Duration::ZERO;
let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
acc,
"net::eth",
"Incoming eth requests stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
this.incoming_requests.poll_next_unpin(cx),
|incoming| {
match incoming {
Expand All @@ -262,6 +265,8 @@ where
},
);

this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());

// stream is fully drained and import futures pending
if maybe_more_incoming_requests {
// make sure we're woken up again
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ impl<C> NetworkManager<C> {
// update metrics for whole poll function
metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
// update poll metrics for nested items
metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64());
metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
}
}

Expand Down
14 changes: 9 additions & 5 deletions crates/net/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ pub struct NetworkMetrics {
/// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle.
///
/// Duration in seconds.
pub(crate) duration_poll_network_handle: Gauge,
pub(crate) acc_duration_poll_network_handle: Gauge,
/// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the
/// [`NetworkManager`](crate::NetworkManager) future.
///
/// Duration in seconds.
pub(crate) duration_poll_swarm: Gauge,
pub(crate) acc_duration_poll_swarm: Gauge,
}

/// Metrics for SessionManager
Expand Down Expand Up @@ -226,12 +226,12 @@ pub struct TransactionFetcherMetrics {
/// accumulator value passed as a mutable reference.
#[macro_export]
macro_rules! duration_metered_exec {
($code:expr, $acc:ident) => {{
let start = Instant::now();
($code:expr, $acc:expr) => {{
let start = std::time::Instant::now();

let res = $code;

*$acc += start.elapsed();
$acc += start.elapsed();

res
}};
Expand Down Expand Up @@ -321,6 +321,10 @@ pub struct EthRequestHandlerMetrics {

/// Number of GetNodeData requests received
pub(crate) eth_node_data_requests_received_total: Counter,

/// Duration in seconds of call to poll
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
pub(crate) acc_duration_poll_eth_req_handler: Gauge,
}

/// Eth67 announcement metrics, track entries by TxType
Expand Down
8 changes: 3 additions & 5 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::{
collections::HashMap,
pin::Pin,
task::{ready, Context, Poll},
time::{Duration, Instant},
time::Duration,
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
Expand Down Expand Up @@ -430,7 +430,6 @@ impl TransactionFetcher {
let budget_find_idle_fallback_peer = self
.search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);

let acc = &mut search_durations.find_idle_peer;
let peer_id = duration_metered_exec!(
{
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
Expand All @@ -444,7 +443,7 @@ impl TransactionFetcher {

peer_id
},
acc
search_durations.find_idle_peer
);

// peer should always exist since `is_session_active` already checked
Expand All @@ -460,7 +459,6 @@ impl TransactionFetcher {
&has_capacity_wrt_pending_pool_imports,
);

let acc = &mut search_durations.fill_request;
duration_metered_exec!(
{
self.fill_request_from_hashes_pending_fetch(
Expand All @@ -469,7 +467,7 @@ impl TransactionFetcher {
budget_fill_request,
)
},
acc
search_durations.fill_request
);

// free unused memory
Expand Down
21 changes: 7 additions & 14 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,9 +1231,8 @@ where
// this can potentially validate >200k transactions. More if the message size
// is bigger than the soft limit on a `PooledTransactions` response which is
// 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
let acc = &mut poll_durations.acc_pending_imports;
let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_pending_imports,
"net::tx",
"Batched pool imports stream",
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
Expand All @@ -1242,9 +1241,8 @@ where
);

// Advance network/peer related events (update peers map).
let acc = &mut poll_durations.acc_network_events;
let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_network_events,
"net::tx",
"Network events stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
Expand All @@ -1261,9 +1259,8 @@ where
// We don't expect this buffer to be large, since only pending transactions are
// emitted here.
let mut new_txs = Vec::new();
let acc = &mut poll_durations.acc_imported_txns;
let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_imported_txns,
"net::tx",
"Pending transactions stream",
DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
Expand All @@ -1284,9 +1281,8 @@ where
// this can potentially queue >200k transactions for insertion to pool. More
// if the message size is bigger than the soft limit on a `PooledTransactions`
// response which is 2 MiB.
let acc = &mut poll_durations.acc_fetch_events;
let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_fetch_events,
"net::tx",
"Transaction fetch events stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
Expand All @@ -1308,9 +1304,8 @@ where
// validated until they are inserted into the pool, this can potentially queue
// >13k transactions for insertion to pool. More if the message size is bigger
// than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
let acc = &mut poll_durations.acc_tx_events;
let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_tx_events,
"net::tx",
"Network transaction events stream",
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
Expand All @@ -1322,20 +1317,18 @@ where
// capacity for this (fetch txns).
//
// Sends at most one request.
let acc = &mut poll_durations.acc_pending_fetch;
duration_metered_exec!(
{
if this.has_capacity_for_fetching_pending_hashes() {
this.on_fetch_hashes_pending_fetch();
}
},
acc
poll_durations.acc_pending_fetch
);

// Advance commands (propagate/fetch/serve txns).
let acc = &mut poll_durations.acc_cmds;
let maybe_more_commands = metered_poll_nested_stream_with_budget!(
acc,
poll_durations.acc_cmds,
"net::tx",
"Commands channel",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
Expand Down
4 changes: 2 additions & 2 deletions etc/grafana/dashboards/reth-mempool.json
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "reth_network_duration_poll_network_handle{instance=\"$instance\"}",
"expr": "reth_network_acc_duration_poll_network_handle{instance=\"$instance\"}",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
Expand All @@ -2538,7 +2538,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "reth_network_duration_poll_swarm{instance=\"$instance\"}",
"expr": "reth_network_acc_duration_poll_swarm{instance=\"$instance\"}",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
Expand Down

0 comments on commit a8a4f67

Please sign in to comment.