Skip to content

Commit

Permalink
Additional Metrics collected and exposed via prometheus (paritytech#5414
Browse files Browse the repository at this point in the history
)

This PR refactors the metrics measuring and Prometheus exposing entity in sc-service into its own submodule and extends the parameters it exposes by:

- system load average (over one, five and 15min)
- the TCP connection state of the process (lsof), refs paritytech#5304
- number of tokio threads
- number of known forks
- counter for items in each unbounded queue (with internal unbounded channels)
- number of file descriptors opened by this process (*nix only at this point)
- number of system threads (*nix only at this point)

refs paritytech#4679

Co-authored-by: Max Inden <[email protected]>
Co-authored-by: Ashley <[email protected]>
  • Loading branch information
3 people authored Apr 4, 2020
1 parent e33dae6 commit 8991aab
Show file tree
Hide file tree
Showing 60 changed files with 1,343 additions and 525 deletions.
601 changes: 365 additions & 236 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ members = [
"primitives/test-primitives",
"primitives/transaction-pool",
"primitives/trie",
"primitives/utils",
"primitives/wasm-interface",
"test-utils/client",
"test-utils/runtime",
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ sp-std = { version = "2.0.0-alpha.5", path = "../primitives/std" }
sp-version = { version = "2.0.0-alpha.5", path = "../primitives/version" }
sp-api = { version = "2.0.0-alpha.5", path = "../primitives/api" }
sp-runtime = { version = "2.0.0-alpha.5", path = "../primitives/runtime" }
sp-utils = { version = "2.0.0-alpha.5", path = "../primitives/utils" }
sp-blockchain = { version = "2.0.0-alpha.5", path = "../primitives/blockchain" }
sp-state-machine = { version = "0.8.0-alpha.5", path = "../primitives/state-machine" }
sc-telemetry = { version = "2.0.0-alpha.5", path = "telemetry" }
Expand Down
2 changes: 2 additions & 0 deletions client/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ sp-keyring = { version = "2.0.0-alpha.5", path = "../../primitives/keyring" }
kvdb = "0.5.0"
log = { version = "0.4.8" }
parking_lot = "0.10.0"
lazy_static = "1.4.0"
sp-core = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/core" }
sp-std = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/std" }
sp-version = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/version" }
sp-api = { version = "2.0.0-alpha.5", path = "../../primitives/api" }
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
sp-runtime = { version = "2.0.0-alpha.5", default-features = false, path = "../../primitives/runtime" }
sp-state-machine = { version = "0.8.0-alpha.5", path = "../../primitives/state-machine" }
sc-telemetry = { version = "2.0.0-alpha.5", path = "../telemetry" }
Expand Down
6 changes: 3 additions & 3 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//! A set of APIs supported by the client along with their primitives.
use std::{fmt, collections::HashSet};
use futures::channel::mpsc;
use sp_core::storage::StorageKey;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand All @@ -28,13 +27,14 @@ use sp_consensus::BlockOrigin;

use crate::blockchain::Info;
use crate::notifications::StorageEventStream;
use sp_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain;

/// Type that implements `futures::Stream` of block import events.
pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
pub type ImportNotifications<Block> = TracingUnboundedReceiver<BlockImportNotification<Block>>;

/// A stream of block finality notifications.
pub type FinalityNotifications<Block> = mpsc::UnboundedReceiver<FinalityNotification<Block>>;
pub type FinalityNotifications<Block> = TracingUnboundedReceiver<FinalityNotification<Block>>;

/// Expected hashes of blocks at given heights.
///
Expand Down
8 changes: 4 additions & 4 deletions client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::{
};

use fnv::{FnvHashSet, FnvHashMap};
use futures::channel::mpsc;
use sp_core::storage::{StorageKey, StorageData};
use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};

/// Storage change set
#[derive(Debug)]
Expand Down Expand Up @@ -67,7 +67,7 @@ impl StorageChangeSet {
}

/// Type that implements `futures::Stream` of storage change events.
pub type StorageEventStream<H> = mpsc::UnboundedReceiver<(H, StorageChangeSet)>;
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;

type SubscriberId = u64;

Expand All @@ -82,7 +82,7 @@ pub struct StorageNotifications<Block: BlockT> {
FnvHashSet<SubscriberId>
)>,
sinks: FnvHashMap<SubscriberId, (
mpsc::UnboundedSender<(Block::Hash, StorageChangeSet)>,
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>,
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
)>,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<Block: BlockT> StorageNotifications<Block> {


// insert sink
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items");
self.sinks.insert(current_id, (tx, keys, child_keys));
rx
}
Expand Down
1 change: 1 addition & 0 deletions client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl<Block: BlockT> HeaderBackend<Block> for TestApi {
finalized_hash: Default::default(),
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
number_leaves: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sc-client-api = { version = "2.0.0-alpha.5", path = "../api" }
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" }
sc-network = { version = "0.8.0-alpha.5", path = "../network" }
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
sc-service = { version = "0.8.0-alpha.5", default-features = false, path = "../service" }
sp-state-machine = { version = "0.8.0-alpha.5", path = "../../primitives/state-machine" }
Expand Down
8 changes: 8 additions & 0 deletions client/cli/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::{Future, future, future::FutureExt};
use futures::select;
use futures::pin_mut;
use sc_service::{AbstractService, Configuration};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use crate::error;

#[cfg(target_family = "unix")]
Expand Down Expand Up @@ -73,6 +74,13 @@ fn build_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new()
.thread_name("main-tokio-")
.threaded_scheduler()
.on_thread_start(||{
TOKIO_THREADS_ALIVE.inc();
TOKIO_THREADS_TOTAL.inc();
})
.on_thread_stop(||{
TOKIO_THREADS_ALIVE.dec();
})
.enable_all()
.build()
}
Expand Down
1 change: 1 addition & 0 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ impl<Block: BlockT> sc_client::blockchain::HeaderBackend<Block> for BlockchainDb
genesis_hash: meta.genesis_hash,
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
number_leaves: self.leaves.read().count(),
}
}

Expand Down
1 change: 1 addition & 0 deletions client/db/src/light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
genesis_hash: meta.genesis_hash,
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
number_leaves: 1,
}
}

Expand Down
1 change: 1 addition & 0 deletions client/finality-grandpa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ assert_matches = "1.3.0"
parity-scale-codec = { version = "1.3.0", features = ["derive"] }
sp-arithmetic = { version = "2.0.0-alpha.5", path = "../../primitives/arithmetic" }
sp-runtime = { version = "2.0.0-alpha.5", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-alpha.5", path = "../../primitives/utils" }
sp-consensus = { version = "0.8.0-alpha.5", path = "../../primitives/consensus/common" }
sp-core = { version = "2.0.0-alpha.5", path = "../../primitives/core" }
sp-api = { version = "2.0.0-alpha.5", path = "../../primitives/api" }
Expand Down
8 changes: 4 additions & 4 deletions client/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use sp_finality_grandpa::AuthorityId;

use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug};
use futures::channel::mpsc;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use prometheus_endpoint::{CounterVec, Opts, PrometheusError, register, Registry, U64};
use rand::seq::SliceRandom;

Expand Down Expand Up @@ -1254,7 +1254,7 @@ impl Metrics {
pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>,
set_state: environment::SharedVoterSetState<Block>,
report_sender: mpsc::UnboundedSender<PeerReport>,
report_sender: TracingUnboundedSender<PeerReport>,
metrics: Option<Metrics>,
}

Expand All @@ -1266,7 +1266,7 @@ impl<Block: BlockT> GossipValidator<Block> {
config: crate::Config,
set_state: environment::SharedVoterSetState<Block>,
prometheus_registry: Option<&Registry>,
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
) -> (GossipValidator<Block>, TracingUnboundedReceiver<PeerReport>) {
let metrics = match prometheus_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
Expand All @@ -1276,7 +1276,7 @@ impl<Block: BlockT> GossipValidator<Block> {
None => None,
};

let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator");
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
Expand Down
3 changes: 2 additions & 1 deletion client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use gossip::{
use sp_finality_grandpa::{
AuthorityPair, AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber,
};
use sp_utils::mpsc::TracingUnboundedReceiver;

pub mod gossip;
mod periodic;
Expand Down Expand Up @@ -165,7 +166,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
// channel implementation.
gossip_validator_report_stream: Arc<Mutex<mpsc::UnboundedReceiver<PeerReport>>>,
gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
}

impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}
Expand Down
10 changes: 6 additions & 4 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
//! Periodic rebroadcast of neighbor packets.
use futures_timer::Delay;
use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use futures::{future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use log::debug;
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};

use sc_network::PeerId;
use sp_runtime::traits::{NumberFor, Block as BlockT};
Expand All @@ -31,7 +32,7 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
/// A sender used to send neighbor packets to a background job.
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
TracingUnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
);

impl<B: BlockT> NeighborPacketSender<B> {
Expand All @@ -54,14 +55,15 @@ impl<B: BlockT> NeighborPacketSender<B> {
pub(super) struct NeighborPacketWorker<B: BlockT> {
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
delay: Delay,
rx: mpsc::UnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
}

impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}

impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new() -> (Self, NeighborPacketSender<B>){
let (tx, rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
("mpsc_grandpa_neighbor_packet_worker");
let delay = Delay::new(REBROADCAST_AFTER);

(NeighborPacketWorker {
Expand Down
12 changes: 6 additions & 6 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Tests for the communication portion of the GRANDPA crate.
use futures::channel::mpsc;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use futures::prelude::*;
use sc_network::{Event as NetworkEvent, ObservedRole, PeerId};
use sc_network_test::{Block, Hash};
Expand All @@ -33,20 +33,20 @@ use super::{AuthorityId, VoterSet, Round, SetId};

#[derive(Debug)]
pub(crate) enum Event {
EventStream(mpsc::UnboundedSender<NetworkEvent>),
EventStream(TracingUnboundedSender<NetworkEvent>),
WriteNotification(sc_network::PeerId, Vec<u8>),
Report(sc_network::PeerId, sc_network::ReputationChange),
Announce(Hash),
}

#[derive(Clone)]
pub(crate) struct TestNetwork {
sender: mpsc::UnboundedSender<Event>,
sender: TracingUnboundedSender<Event>,
}

impl sc_network_gossip::Network<Block> for TestNetwork {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("test");
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::pin(rx)
}
Expand Down Expand Up @@ -97,7 +97,7 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
pub(crate) struct Tester {
pub(crate) net_handle: super::NetworkBridge<Block, TestNetwork>,
gossip_validator: Arc<GossipValidator<Block>>,
pub(crate) events: mpsc::UnboundedReceiver<Event>,
pub(crate) events: TracingUnboundedReceiver<Event>,
}

impl Tester {
Expand Down Expand Up @@ -161,7 +161,7 @@ pub(crate) fn make_test_network() -> (
impl Future<Output = Tester>,
TestNetwork,
) {
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("test");
let net = TestNetwork { sender: tx };

#[derive(Clone)]
Expand Down
6 changes: 3 additions & 3 deletions client/finality-grandpa/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use std::{sync::Arc, collections::HashMap};

use log::{debug, trace, info};
use parity_scale_codec::Encode;
use futures::channel::mpsc;
use parking_lot::RwLockWriteGuard;

use sp_blockchain::{BlockStatus, well_known_cache_keys};
use sc_client_api::{backend::Backend, utils::is_descendent_of};
use sp_utils::mpsc::TracingUnboundedSender;
use sp_api::{TransactionFor};

use sp_consensus::{
Expand Down Expand Up @@ -57,7 +57,7 @@ pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
inner: Arc<Client>,
select_chain: SC,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
authority_set_hard_forks: HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>,
_phantom: PhantomData<Backend>,
Expand Down Expand Up @@ -536,7 +536,7 @@ impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Clie
inner: Arc<Client>,
select_chain: SC,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
) -> GrandpaBlockImport<Backend, Block, Client, SC> {
Expand Down
12 changes: 6 additions & 6 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
use futures::prelude::*;
use futures::StreamExt;
use log::{debug, info};
use futures::channel::mpsc;
use sc_client_api::{
backend::{AuxStore, Backend},
LockImportRun, BlockchainEvents, CallExecutor,
Expand All @@ -70,6 +69,7 @@ use sc_keystore::KeyStorePtr;
use sp_inherents::InherentDataProviders;
use sp_consensus::{SelectChain, BlockImport};
use sp_core::Pair;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG};
use serde_json;

Expand Down Expand Up @@ -379,7 +379,7 @@ pub struct LinkHalf<Block: BlockT, C, SC> {
client: Arc<C>,
select_chain: SC,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
}

/// Provider for the Grandpa authority set configured on the genesis block.
Expand Down Expand Up @@ -476,7 +476,7 @@ where
}
)?;

let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded();
let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");

// create pending change objects with 0 delay and enacted on finality
// (i.e. standard changes) for each authority set hard fork.
Expand Down Expand Up @@ -598,7 +598,7 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
/// The inherent data providers.
pub inherent_data_providers: InherentDataProviders,
/// If supplied, can be used to hook on telemetry connection established events.
pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>,
pub telemetry_on_connect: Option<TracingUnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// The prometheus metrics registry.
Expand Down Expand Up @@ -718,7 +718,7 @@ impl Metrics {
struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
voter: Pin<Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>>,
env: Arc<Environment<B, Block, C, N, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: NetworkBridge<Block, N>,

/// Prometheus metrics.
Expand All @@ -742,7 +742,7 @@ where
select_chain: SC,
voting_rule: VR,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
Expand Down
Loading

0 comments on commit 8991aab

Please sign in to comment.