Skip to content

Commit

Permalink
Postpone flushing aggregated counters to maintanence.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jun 7, 2024
1 parent 4be858c commit fe408af
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 230 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
.env
.DS_Store
config*.yaml
avail_light_store
avail_path*
avail_*
debug.plist
/identity.toml
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog

## v1.9.0
## 1.9.1

- Postpone flushing aggregated counters to maintanence step

## [v1.9.0](https://github.com/availproject/avail-light/releases/tag/v1.9.0) - 2024-04-06

- Add metric aggregation on client side in order to decrease the telemetry server load
- Add `avail.light.starts` metric counter which allows measuring number of restarts
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "avail-light"
version = "1.9.0"
version = "1.9.1"
authors = ["Avail Team"]
default-run = "avail-light"
edition = "2021"
Expand Down
6 changes: 4 additions & 2 deletions src/bin/avail-light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ async fn run(shutdown: Controller<String>) -> Result<()> {
result
},
)));
ot_metrics.count(MetricCounter::Starts).await;

info!("Waiting for first finalized header...");
let block_header = match shutdown
Expand Down Expand Up @@ -400,6 +399,7 @@ async fn run(shutdown: Controller<String>) -> Result<()> {
replication_factor: cfg.replication_factor,
query_timeout: cfg.query_timeout,
pruning_interval: cfg.store_pruning_interval,
telemetry_flush_interval: cfg.ot_flush_block_interval,
};

tokio::task::spawn(shutdown.with_cancel(avail_light::maintenance::run(
Expand Down Expand Up @@ -434,13 +434,15 @@ async fn run(shutdown: Controller<String>) -> Result<()> {
db.clone(),
light_network_client,
(&cfg).into(),
ot_metrics,
ot_metrics.clone(),
state.clone(),
channels,
shutdown.clone(),
)));
}

ot_metrics.count(MetricCounter::Starts).await;

Ok(())
}

Expand Down
13 changes: 5 additions & 8 deletions src/fat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub async fn process_block(
metrics.count(MetricCounter::SessionBlockCounter).await;
metrics
.record(MetricValue::TotalBlockNumber(header.number))
.await?;
.await;

let block_number = header.number;
let header_hash: H256 = Encode::using_encoded(header, blake2_256).into();
Expand Down Expand Up @@ -168,7 +168,7 @@ pub async fn process_block(
.record(MetricValue::RPCCallDuration(
partition_rpc_retrieve_time_elapsed.as_secs_f64(),
))
.await?;
.await;

if rpc_fetched.len() >= dimensions.cols().get().into() {
let data_cells = rpc_fetched
Expand Down Expand Up @@ -224,12 +224,9 @@ pub async fn run(
};

if let Some(seconds) = cfg.block_processing_delay.sleep_duration(received_at) {
if let Err(error) = metrics
metrics
.record(MetricValue::BlockProcessingDelay(seconds.as_secs_f64()))
.await
{
error!("Cannot record block processing delay: {}", error);
}
.await;
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
}
Expand Down Expand Up @@ -382,7 +379,7 @@ mod tests {

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| Ok(()));
mock_metrics.expect_record().returning(|_| ());

process_block(
&mock_client,
Expand Down
23 changes: 10 additions & 13 deletions src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn process_block(
metrics.count(MetricCounter::SessionBlockCounter).await;
metrics
.record(MetricValue::TotalBlockNumber(header.number))
.await?;
.await;

let block_number = header.number;
let header_hash: H256 = Encode::using_encoded(&header, blake2_256).into();
Expand Down Expand Up @@ -109,30 +109,30 @@ pub async fn process_block(

metrics
.record(MetricValue::DHTFetched(fetch_stats.dht_fetched))
.await?;
.await;

metrics
.record(MetricValue::DHTFetchedPercentage(
fetch_stats.dht_fetched_percentage,
))
.await?;
.await;

metrics
.record(MetricValue::DHTFetchDuration(
fetch_stats.dht_fetch_duration,
))
.await?;
.await;

if let Some(rpc_fetched) = fetch_stats.rpc_fetched {
metrics
.record(MetricValue::NodeRPCFetched(rpc_fetched))
.await?;
.await;
}

if let Some(rpc_fetch_duration) = fetch_stats.rpc_fetch_duration {
metrics
.record(MetricValue::NodeRPCFetchDuration(rpc_fetch_duration))
.await?;
.await;
}
(positions.len(), fetched.len(), unfetched.len())
},
Expand All @@ -158,7 +158,7 @@ pub async fn process_block(
);
metrics
.record(MetricValue::BlockConfidence(confidence))
.await?;
.await;

// push latest mined block's header into column family specified
// for keeping block headers, to be used
Expand Down Expand Up @@ -210,12 +210,9 @@ pub async fn run(
};

if let Some(seconds) = cfg.block_processing_delay.sleep_duration(received_at) {
if let Err(error) = metrics
metrics
.record(MetricValue::BlockProcessingDelay(seconds.as_secs_f64()))
.await
{
error!("Cannot record block processing delay: {}", error);
}
.await;
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
}
Expand Down Expand Up @@ -354,7 +351,7 @@ mod tests {

let mut mock_metrics = telemetry::MockMetrics::new();
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| Ok(()));
mock_metrics.expect_record().returning(|_| ());
process_block(
db,
&mock_network_client,
Expand Down
25 changes: 16 additions & 9 deletions src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
use tokio::sync::broadcast;
#[cfg(not(feature = "kademlia-rocksdb"))]
use tracing::error;
use tracing::{debug, info};
use tracing::{debug, error, info};

use crate::{
network::p2p::Client as P2pClient,
Expand All @@ -18,6 +16,7 @@ pub struct StaticConfigParams {
pub replication_factor: u16,
pub query_timeout: u32,
pub pruning_interval: u32,
pub telemetry_flush_interval: u32,
}

pub async fn process_block(
Expand All @@ -35,6 +34,14 @@ pub async fn process_block(
}
}

if block_number % static_config_params.telemetry_flush_interval == 0 {
info!(block_number, "Flushing metrics...");
match metrics.flush().await {
Ok(()) => info!(block_number, "Flushing metrics finished"),
Err(error) => error!(block_number, "Flushing metrics failed: {error:#}"),
}
}

p2p_client
.shrink_kademlia_map()
.await
Expand All @@ -52,24 +59,24 @@ pub async fn process_block(
debug!("Connected peers: {:?}", connected_peers);

let peers_num_metric = MetricValue::ConnectedPeersNum(peers_num);
metrics.record(peers_num_metric).await?;
metrics.record(peers_num_metric).await;

metrics
.record(MetricValue::BlockConfidenceTreshold(
.record(MetricValue::BlockConfidenceThreshold(
static_config_params.block_confidence_treshold,
))
.await?;
.await;
metrics
.record(MetricValue::ReplicationFactor(
static_config_params.replication_factor,
))
.await?;
.await;
metrics
.record(MetricValue::QueryTimeout(
static_config_params.query_timeout,
))
.await?;
metrics.record(MetricValue::HealthCheck()).await?;
.await;
metrics.record(MetricValue::HealthCheck()).await;

info!(block_number, map_size, "Maintenance completed");
Ok(())
Expand Down
11 changes: 9 additions & 2 deletions src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::types::Origin;

pub mod otlp;

#[derive(Debug)]
pub enum MetricCounter {
Starts,
SessionBlockCounter,
Expand Down Expand Up @@ -39,6 +40,10 @@ impl Display for MetricCounter {
}

impl MetricCounter {
fn is_buffered(&self) -> bool {
!matches!(self, MetricCounter::Starts)
}

fn is_allowed(&self, origin: &Origin) -> bool {
match (origin, self) {
(Origin::External, MetricCounter::Starts) => true,
Expand Down Expand Up @@ -70,6 +75,7 @@ impl MetricCounter {
}
}

#[derive(Clone, Debug)]
pub enum MetricValue {
TotalBlockNumber(u32),
DHTFetched(f64),
Expand All @@ -78,7 +84,7 @@ pub enum MetricValue {
NodeRPCFetched(f64),
NodeRPCFetchDuration(f64),
BlockConfidence(f64),
BlockConfidenceTreshold(f64),
BlockConfidenceThreshold(f64),
RPCCallDuration(f64),
DHTPutDuration(f64),
DHTPutSuccess(f64),
Expand Down Expand Up @@ -116,5 +122,6 @@ impl MetricValue {
#[async_trait]
pub trait Metrics {
async fn count(&self, counter: MetricCounter);
async fn record(&self, value: MetricValue) -> Result<()>;
async fn record(&self, value: MetricValue);
async fn flush(&self) -> Result<()>;
}
Loading

0 comments on commit fe408af

Please sign in to comment.