Skip to content

Commit

Permalink
chore(stages): reduce the progress logging (paradigmxyz#11653)
Browse files Browse the repository at this point in the history
Signed-off-by: jsvisa <[email protected]>
  • Loading branch information
jsvisa authored Oct 12, 2024
1 parent 9ec4c00 commit de736a5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 42 deletions.
20 changes: 11 additions & 9 deletions crates/stages/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::log_progress;
use alloy_primitives::{keccak256, B256};
use itertools::Itertools;
use reth_config::config::{EtlConfig, HashingConfig};
Expand All @@ -18,6 +19,7 @@ use std::{
fmt::Debug,
ops::{Range, RangeInclusive},
sync::mpsc::{self, Receiver},
time::Instant,
};
use tracing::*;

Expand Down Expand Up @@ -186,16 +188,16 @@ where
let mut hashed_account_cursor =
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;

let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
let total = collector.len();
let mut last_log = Instant::now();
for (index, item) in collector.iter()?.enumerate() {
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_account",
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
log_progress!(
"sync::stages::hashing_account",
index,
total,
last_log,
"Inserting hashes"
);

let (key, value) = item?;
hashed_account_cursor
Expand Down
20 changes: 11 additions & 9 deletions crates/stages/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::log_progress;
use alloy_primitives::{bytes::BufMut, keccak256, B256};
use itertools::Itertools;
use reth_config::config::{EtlConfig, HashingConfig};
Expand All @@ -19,6 +20,7 @@ use reth_storage_errors::provider::ProviderResult;
use std::{
fmt::Debug,
sync::mpsc::{self, Receiver},
time::Instant,
};
use tracing::*;

Expand Down Expand Up @@ -117,17 +119,17 @@ where

collect(&mut channels, &mut collector)?;

let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
let total = collector.len();
let mut last_log = Instant::now();
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
for (index, item) in collector.iter()?.enumerate() {
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_storage",
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
log_progress!(
"sync::stages::hashing_storage",
index,
total,
last_log,
"Inserting hashes"
);

let (addr_key, value) = item?;
cursor.append_dup(
Expand Down
24 changes: 14 additions & 10 deletions crates/stages/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::log_progress;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
use futures_util::StreamExt;
use reth_config::config::EtlConfig;
Expand Down Expand Up @@ -25,6 +26,7 @@ use reth_storage_errors::provider::ProviderError;
use std::{
sync::Arc,
task::{ready, Context, Poll},
time::Instant,
};
use tokio::sync::watch;
use tracing::*;
Expand Down Expand Up @@ -95,9 +97,9 @@ where
provider: &impl DBProvider<Tx: DbTxMut>,
static_file_provider: StaticFileProvider,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();
let total = self.header_collector.len();

info!(target: "sync::stages::headers", total = total_headers, "Writing headers");
info!(target: "sync::stages::headers", total, "Writing headers");

// Consistency check of expected headers in static files vs DB is done on provider::sync_gap
// when poll_execute_ready is polled.
Expand All @@ -113,13 +115,11 @@ where
// Although headers were downloaded in reverse order, the collector iterates it in ascending
// order
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
let interval = (total_headers / 10).max(1);
let mut last_log = Instant::now();
for (index, header) in self.header_collector.iter()?.enumerate() {
let (_, header_buf) = header?;

if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
}
log_progress!("sync::stages::headers", index, total, last_log, "Writing headers");

let sealed_header: SealedHeader =
bincode::deserialize::<serde_bincode_compat::SealedHeader<'_>>(&header_buf)
Expand Down Expand Up @@ -147,7 +147,7 @@ where
writer.append_header(&header, td, &header_hash)?;
}

info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index");
info!(target: "sync::stages::headers", total, "Writing headers hash index");

let mut cursor_header_numbers =
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
Expand All @@ -168,9 +168,13 @@ where
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;

if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
}
log_progress!(
"sync::stages::headers",
index,
total,
last_log,
"Writing headers hash index"
);

if first_sync {
cursor_header_numbers.append(
Expand Down
8 changes: 6 additions & 2 deletions crates/stages/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::utils::LOG_INTERVAL;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
Expand All @@ -17,6 +18,7 @@ use reth_stages_api::{
UnwindInput, UnwindOutput,
};
use reth_storage_errors::provider::ProviderError;
use std::time::Instant;
use tracing::*;

/// The transaction lookup stage.
Expand Down Expand Up @@ -147,16 +149,18 @@ where
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;

let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
let mut last_log = Instant::now();
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
if index > 0 && index % interval == 0 {
let now = Instant::now();
if now.duration_since(last_log) >= LOG_INTERVAL {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
last_log = now;
}

let key = RawKey::<TxHash>::from_vec(hash);
Expand Down
44 changes: 32 additions & 12 deletions crates/stages/stages/src/stages/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,36 @@ use reth_db_api::{
use reth_etl::Collector;
use reth_provider::DBProvider;
use reth_stages_api::StageError;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use std::{
collections::HashMap,
hash::Hash,
ops::RangeBounds,
time::{Duration, Instant},
};
use tracing::info;

/// Number of blocks before pushing indices from cache to [`Collector`]
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;

/// Log interval for progress.
pub(crate) const LOG_INTERVAL: Duration = Duration::from_secs(5);

/// Log progress at a regular interval.
#[macro_export]
macro_rules! log_progress {
($target:expr, $index:expr, $total:expr, $last_log:expr, $message:expr) => {
let now = std::time::Instant::now();
if now.duration_since($last_log) >= $crate::stages::utils::LOG_INTERVAL {
info!(
target: $target,
progress = %format!("{:.2}%", ($index as f64 / $total as f64) * 100.0),
$message
);
$last_log = now;
}
}
}

/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
/// [`Collector`].
///
Expand Down Expand Up @@ -65,18 +89,16 @@ where
};

// observability
let total_changesets = provider.tx_ref().entries::<CS>()?;
let interval = (total_changesets / 1000).max(1);
let total = provider.tx_ref().entries::<CS>()?;
let mut last_log = Instant::now();

let mut flush_counter = 0;
let mut current_block_number = u64::MAX;
for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
for (index, entry) in changeset_cursor.walk_range(range)?.enumerate() {
let (block_number, key) = partial_key_factory(entry?);
cache.entry(key).or_default().push(block_number);

if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}
log_progress!("sync::stages::index_history", index, total, last_log, "Collecting indices");

// Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
if current_block_number != block_number {
Expand Down Expand Up @@ -120,17 +142,15 @@ where
let mut current_list = Vec::<u64>::new();

// observability
let total_entries = collector.len();
let interval = (total_entries / 100).max(1);
let total = collector.len();
let mut last_log = Instant::now();

for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;

if index > 0 && index % interval == 0 && total_entries > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
log_progress!("sync::stages::index_history", index, total, last_log, "Writing indices");

// AccountsHistory: `Address`.
// StorageHistory: `Address.StorageKey`.
Expand Down

0 comments on commit de736a5

Please sign in to comment.