Skip to content

Commit

Permalink
[Narwal] Added metric to report total use of channel (MystenLabs#4963)
Browse files Browse the repository at this point in the history
* Added total metering facility

* Added total metrics to worker channels

* Intrument the primary

* Gauge -> Counter

* Fix fmt

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Oct 4, 2022
1 parent 37eae48 commit 79fc963
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 53 deletions.
171 changes: 169 additions & 2 deletions narwhal/primary/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use network::metrics::NetworkMetrics;
use prometheus::{
core::{AtomicI64, GenericGauge},
default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
register_int_counter_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Registry,
};
use std::time::Duration;
use tonic::Code;
Expand Down Expand Up @@ -89,6 +90,50 @@ pub struct PrimaryChannelMetrics {
pub tx_committed_certificates: IntGauge,
/// occupancy of the channel from the `primary::Core` to the `Consensus`
pub tx_new_certificates: IntGauge,

// totals
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::PayloadReceiver`
pub tx_others_digests_total: IntCounter,
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::Proposer`
pub tx_our_digests_total: IntCounter,
/// total received on channel from the `primary::Core` to the `primary::Proposer`
pub tx_parents_total: IntCounter,
/// total received on channel from the `primary::Proposer` to the `primary::Core`
pub tx_headers_total: IntCounter,
/// total received on channel from the `primary::Synchronizer` to the `primary::HeaderWaiter`
pub tx_sync_headers_total: IntCounter,
/// total received on channel from the `primary::Synchronizer` to the `primary::CertificaterWaiter`
pub tx_sync_certificates_total: IntCounter,
/// total received on channel from the `primary::HeaderWaiter` to the `primary::Core`
pub tx_headers_loopback_total: IntCounter,
/// total received on channel from the `primary::CertificateWaiter` to the `primary::Core`
pub tx_certificates_loopback_total: IntCounter,
/// total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::Core`
pub tx_primary_messages_total: IntCounter,
/// total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::Helper`
pub tx_helper_requests_total: IntCounter,
/// total received on channel from the `primary::ConsensusAPIGrpc` (when external consensus is being
/// used) & `executor::Subscriber` (when internal consensus, ex Bullshark, is being used) to
/// the `primary::BlockWaiter`.
pub tx_get_block_commands_total: IntCounter,
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::BlockWaiter`
pub tx_batches_total: IntCounter,
/// total received on channel from the `primary::ConsensusAPIGrpc` to the `primary::BlockRemover`
pub tx_block_removal_commands_total: IntCounter,
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::BlockRemover`
pub tx_batch_removal_total: IntCounter,
/// total received on channel from the `primary::BlockSynchronizerHandler` to the `primary::BlockSynchronizer`
pub tx_block_synchronizer_commands_total: IntCounter,
/// total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::BlockSynchronizer`
pub tx_availability_responses_total: IntCounter,
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::StateHandler`
pub tx_state_handler_total: IntCounter,
/// total received on channel from the reconfigure notification to most components.
pub tx_reconfigure_total: IntCounter,
/// total received on channel from the `Consensus` to the `primary::Core`
pub tx_committed_certificates_total: IntCounter,
/// total received on channel from the `primary::Core` to the `Consensus`
pub tx_new_certificates_total: IntCounter,
}

impl PrimaryChannelMetrics {
Expand All @@ -108,6 +153,22 @@ impl PrimaryChannelMetrics {
pub const DESC_GET_BLOCK_COMMANDS: &'static str =
"occupancy of the channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter`";

// The consistent use of this constant in the below, as well as in `node::spawn_primary` is
// load-bearing, see `replace_registered_committed_certificates_metric`.
pub const NAME_COMMITTED_CERTS_TOTAL: &'static str = "tx_committed_certificates_total";
pub const DESC_COMMITTED_CERTS_TOTAL: &'static str =
"total received on channel from the `Consensus` to the `primary::Core`";
// The consistent use of this constant in the below, as well as in `node::spawn_primary` is
// load-bearing, see `replace_registered_new_certificates_metric`.
pub const NAME_NEW_CERTS_TOTAL: &'static str = "tx_new_certificates_total";
pub const DESC_NEW_CERTS_TOTAL: &'static str =
"total received on channel from the `primary::Core` to the `Consensus`";
// The consistent use of this constant in the below, as well as in `node::spawn_primary` is
// load-bearing, see `replace_registered_tx_get_block_commands_metric`.
pub const NAME_GET_BLOCK_COMMANDS_TOTAL: &'static str = "tx_get_block_commands_total";
pub const DESC_GET_BLOCK_COMMANDS_TOTAL: &'static str =
"total received on channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter`";

pub fn new(registry: &Registry) -> Self {
Self {
tx_others_digests: register_int_gauge_with_registry!(
Expand Down Expand Up @@ -210,6 +271,112 @@ impl PrimaryChannelMetrics {
Self::DESC_NEW_CERTS,
registry
).unwrap(),

// totals

tx_others_digests_total: register_int_counter_with_registry!(
"tx_others_digests_total",
"total received on channel from the `primary::WorkerReceiverHandler` to the `primary::PayloadReceiver`",
registry
).unwrap(),
tx_our_digests_total: register_int_counter_with_registry!(
"tx_our_digests_total",
"total received on channel from the `primary::WorkerReceiverHandler` to the `primary::Proposer`",
registry
).unwrap(),
tx_parents_total: register_int_counter_with_registry!(
"tx_parents_total",
"total received on channel from the `primary::Core` to the `primary::Proposer`",
registry
).unwrap(),
tx_headers_total: register_int_counter_with_registry!(
"tx_headers_total",
"total received on channel from the `primary::Proposer` to the `primary::Core`",
registry
).unwrap(),
tx_sync_headers_total: register_int_counter_with_registry!(
"tx_sync_headers_total",
"total received on channel from the `primary::Synchronizer` to the `primary::HeaderWaiter`",
registry
).unwrap(),
tx_sync_certificates_total: register_int_counter_with_registry!(
"tx_sync_certificates_total",
"total received on channel from the `primary::Synchronizer` to the `primary::CertificaterWaiter`",
registry
).unwrap(),
tx_headers_loopback_total: register_int_counter_with_registry!(
"tx_headers_loopback_total",
"total received on channel from the `primary::HeaderWaiter` to the `primary::Core`",
registry
).unwrap(),
tx_certificates_loopback_total: register_int_counter_with_registry!(
"tx_certificates_loopback_total",
"total received on channel from the `primary::CertificateWaiter` to the `primary::Core`",
registry
).unwrap(),
tx_primary_messages_total: register_int_counter_with_registry!(
"tx_primary_messages_total",
"total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::Core`",
registry
).unwrap(),
tx_helper_requests_total: register_int_counter_with_registry!(
"tx_helper_requests_total",
"total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::Helper`",
registry
).unwrap(),
tx_get_block_commands_total: register_int_counter_with_registry!(
"tx_get_block_commands_total",
"total received on channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter`",
registry
).unwrap(),
tx_batches_total: register_int_counter_with_registry!(
"tx_batches_total",
"total received on channel from the `primary::WorkerReceiverHandler` to the `primary::BlockWaiter`",
registry
).unwrap(),
tx_block_removal_commands_total: register_int_counter_with_registry!(
"tx_block_removal_commands_total",
"total received on channel from the `primary::ConsensusAPIGrpc` to the `primary::BlockRemover`",
registry
).unwrap(),
tx_batch_removal_total: register_int_counter_with_registry!(
"tx_batch_removal_total",
"total received on channel from the `primary::WorkerReceiverHandler` to the `primary::BlockRemover`",
registry
).unwrap(),
tx_block_synchronizer_commands_total: register_int_counter_with_registry!(
"tx_block_synchronizer_commands_total",
"total received on channel from the `primary::BlockSynchronizerHandler` to the `primary::BlockSynchronizer`",
registry
).unwrap(),
tx_availability_responses_total: register_int_counter_with_registry!(
"tx_availability_responses_total",
"total received on channel from the `primary::PrimaryReceiverHandler` to the `primary::BlockSynchronizer`",
registry
).unwrap(),
tx_state_handler_total: register_int_counter_with_registry!(
"tx_state_handler_total",
"total received on channel from the `primary::WorkerReceiverHandler` to the `primary::StateHandler`",
registry
).unwrap(),
tx_reconfigure_total: register_int_counter_with_registry!(
"tx_reconfigure_total",
"total received on channel from the reconfigure notification to most components.",
registry
).unwrap(),
tx_committed_certificates_total: register_int_counter_with_registry!(
Self::NAME_COMMITTED_CERTS_TOTAL,
Self::DESC_COMMITTED_CERTS_TOTAL,
registry
).unwrap(),
tx_new_certificates_total: register_int_counter_with_registry!(
Self::NAME_NEW_CERTS_TOTAL,
Self::DESC_NEW_CERTS_TOTAL,
registry
).unwrap(),



}
}

Expand Down
82 changes: 57 additions & 25 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tower::ServiceBuilder;
use tracing::info;
use types::{
error::DagError,
metered_channel::{channel, Receiver, Sender},
metered_channel::{channel_with_total, Receiver, Sender},
BatchDigest, BatchMessage, Certificate, Header, HeaderDigest, PrimaryToPrimary,
PrimaryToPrimaryServer, ReconfigureNotification, RoundVoteDigestPair, WorkerInfoResponse,
WorkerPrimaryError, WorkerPrimaryMessage, WorkerToPrimary, WorkerToPrimaryServer,
Expand Down Expand Up @@ -106,54 +106,86 @@ impl Primary {
let outbound_network_metrics = Arc::new(metrics.outbound_network_metrics.unwrap());
let node_metrics = Arc::new(metrics.node_metrics.unwrap());

let (tx_others_digests, rx_others_digests) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_others_digests);
let (tx_our_digests, rx_our_digests) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_our_digests);
let (tx_parents, rx_parents) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_parents);
let (tx_headers, rx_headers) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_headers);
let (tx_sync_headers, rx_sync_headers) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_sync_headers);
let (tx_sync_certificates, rx_sync_certificates) = channel(
let (tx_others_digests, rx_others_digests) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_others_digests,
&primary_channel_metrics.tx_others_digests_total,
);
let (tx_our_digests, rx_our_digests) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_our_digests,
&primary_channel_metrics.tx_our_digests_total,
);
let (tx_parents, rx_parents) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_parents,
&primary_channel_metrics.tx_parents_total,
);
let (tx_headers, rx_headers) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_headers,
&primary_channel_metrics.tx_headers_total,
);
let (tx_sync_headers, rx_sync_headers) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_sync_headers,
&primary_channel_metrics.tx_sync_headers_total,
);
let (tx_sync_certificates, rx_sync_certificates) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_sync_certificates,
&primary_channel_metrics.tx_sync_certificates_total,
);
let (tx_headers_loopback, rx_headers_loopback) = channel(
let (tx_headers_loopback, rx_headers_loopback) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_headers_loopback,
&primary_channel_metrics.tx_headers_loopback_total,
);
let (tx_certificates_loopback, rx_certificates_loopback) = channel(
let (tx_certificates_loopback, rx_certificates_loopback) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_certificates_loopback,
&primary_channel_metrics.tx_certificates_loopback_total,
);
let (tx_primary_messages, rx_primary_messages) = channel(
let (tx_primary_messages, rx_primary_messages) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_primary_messages,
&primary_channel_metrics.tx_primary_messages_total,
);
let (tx_helper_requests, rx_helper_requests) = channel(
let (tx_helper_requests, rx_helper_requests) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_helper_requests,
&primary_channel_metrics.tx_helper_requests_total,
);
let (tx_batches, rx_batches) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_batches,
&primary_channel_metrics.tx_batches_total,
);
let (tx_batches, rx_batches) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_batches);
let (tx_block_removal_commands, rx_block_removal_commands) = channel(
let (tx_block_removal_commands, rx_block_removal_commands) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_block_removal_commands,
&primary_channel_metrics.tx_block_removal_commands_total,
);
let (tx_batch_removal, rx_batch_removal) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_batch_removal);
let (tx_block_synchronizer_commands, rx_block_synchronizer_commands) = channel(
let (tx_batch_removal, rx_batch_removal) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_batch_removal,
&primary_channel_metrics.tx_batch_removal_total,
);
let (tx_block_synchronizer_commands, rx_block_synchronizer_commands) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_block_synchronizer_commands,
&primary_channel_metrics.tx_block_synchronizer_commands_total,
);
let (tx_availability_responses, rx_availability_responses) = channel(
let (tx_availability_responses, rx_availability_responses) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_availability_responses,
&primary_channel_metrics.tx_availability_responses_total,
);
let (tx_state_handler, rx_state_handler) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_state_handler,
&primary_channel_metrics.tx_state_handler_total,
);
let (tx_state_handler, rx_state_handler) =
channel(CHANNEL_CAPACITY, &primary_channel_metrics.tx_state_handler);

// we need to hack the gauge from this consensus channel into the primary registry
// This avoids a cyclic dependency in the initialization of consensus and primary
Expand Down
Loading

0 comments on commit 79fc963

Please sign in to comment.