Skip to content

Commit

Permalink
feat(pruner): prune receipts based on log emitters during live sync (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Aug 18, 2023
1 parent 890eacb commit 24632ac
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 10 deletions.
47 changes: 42 additions & 5 deletions crates/primitives/src/prune/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,61 @@ impl ContractLogsPruneConfig {
/// Given the `tip` block number, consolidates the structure so it can easily be queried for
/// filtering across a range of blocks.
///
/// The [`BlockNumber`] key of the map should be viewed as `PruneMode::Before(block)`.
/// Example:
///
/// `{ addrA: Before(872), addrB: Before(500), addrC: Distance(128) }`
///
/// for `tip: 1000`, gets transformed to a map such as:
///
/// `{ 500: [addrB], 872: [addrA, addrC] }`
///
/// The [`BlockNumber`] key of the new map should be viewed as `PruneMode::Before(block)`, which
/// makes the previous result equivalent to
///
/// `{ Before(500): [addrB], Before(872): [addrA, addrC] }`
pub fn group_by_block(
&self,
tip: BlockNumber,
pruned_block: Option<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PrunePartError> {
let mut map = BTreeMap::new();
let pruned_block = pruned_block.unwrap_or_default();

for (address, mode) in self.0.iter() {
// Getting `None`, means that there is nothing to prune yet, so we need it to include in
// the BTreeMap (block = 0), otherwise it will be excluded.
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
// other receipts.
let block = mode
.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default();
let block = (pruned_block + 1).max(
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default(),
);

map.entry(block).or_insert_with(Vec::new).push(address)
}
Ok(map)
}

/// Returns the lowest block where we start filtering logs which use `PruneMode::Distance(_)`.
pub fn lowest_block_with_distance(
&self,
tip: BlockNumber,
pruned_block: Option<BlockNumber>,
) -> Result<Option<BlockNumber>, PrunePartError> {
let pruned_block = pruned_block.unwrap_or_default();
let mut lowest = None;

for (_, mode) in self.0.iter() {
if let PruneMode::Distance(_) = mode {
if let Some((block, _)) =
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
{
lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
}
}
}

Ok(lowest.map(|lowest| lowest.max(pruned_block)))
}
}
165 changes: 165 additions & 0 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use reth_db::{
};
use reth_primitives::{
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
MINIMUM_PRUNING_DISTANCE,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
Expand Down Expand Up @@ -102,6 +103,15 @@ impl<DB: Database> Pruner<DB> {
.record(part_start.elapsed())
}

if !self.modes.contract_logs_filter.is_empty() {
let part_start = Instant::now();
self.prune_receipts_by_logs(&provider, tip_block_number)?;
self.metrics
.get_prune_part_metrics(PrunePart::ContractLogs)
.duration_seconds
.record(part_start.elapsed())
}

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_transaction_lookup(tip_block_number)?
{
Expand Down Expand Up @@ -251,13 +261,168 @@ impl<DB: Database> Pruner<DB> {
"Pruned receipts"
);
},
|_| false,
)?;

provider.save_prune_checkpoint(
PrunePart::Receipts,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;

// `PrunePart::Receipts` overrides `PrunePart::ContractLogs`, so we can preemptively
// limit their pruning start point.
provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;

Ok(())
}

/// Prune receipts up to the provided block by filtering logs. Works as in inclusion list, and
/// removes every receipt not belonging to it.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_receipts_by_logs(
&self,
provider: &DatabaseProviderRW<'_, DB>,
tip_block_number: BlockNumber,
) -> PrunerResult {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of 128.
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
.prune_target_block(
tip_block_number,
MINIMUM_PRUNING_DISTANCE,
PrunePart::ContractLogs,
)?
.map(|(bn, _)| bn)
.unwrap_or_default();

// Figure out what receipts have already been pruned, so we can have an accurate
// `address_filter`
let pruned = provider
.get_prune_checkpoint(PrunePart::ContractLogs)?
.map(|checkpoint| checkpoint.block_number);

let address_filter =
self.modes.contract_logs_filter.group_by_block(tip_block_number, pruned)?;

// Splits all transactions in different block ranges. Each block range will have its own
// filter address list and will check it while going through the table
//
// Example:
// For an `address_filter` such as:
// { block9: [a1, a2], block20: [a3, a4, a5] }
//
// The following structures will be created in the exact order as showed:
// `block_ranges`: [
// (block0, block8, 0 addresses),
// (block9, block19, 2 addresses),
// (block20, to_block, 5 addresses)
// ]
// `filtered_addresses`: [a1, a2, a3, a4, a5]
//
// The first range will delete all receipts between block0 - block8
// The second range will delete all receipts between block9 - 19, except the ones with
// emitter logs from these addresses: [a1, a2].
// The third range will delete all receipts between block20 - to_block, except the ones with
// emitter logs from these addresses: [a1, a2, a3, a4, a5]
let mut block_ranges = vec![];
let mut blocks_iter = address_filter.iter().peekable();
let mut filtered_addresses = vec![];

while let Some((start_block, addresses)) = blocks_iter.next() {
filtered_addresses.extend_from_slice(addresses);

// This will clear all receipts before the first appearance of a contract log
if block_ranges.is_empty() {
block_ranges.push((0, *start_block - 1, 0));
}

let end_block =
blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);

// Addresses in lower block ranges, are still included in the inclusion list for future
// ranges.
block_ranges.push((*start_block, end_block, filtered_addresses.len()));
}

for (start_block, end_block, num_addresses) in block_ranges {
let range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::ContractLogs,
end_block,
)? {
Some(range) => range,
None => {
trace!(
target: "pruner",
block_range = format!("{start_block}..={end_block}"),
"No receipts to prune."
);
continue
}
};

let total = range.clone().count();
let mut processed = 0;

provider.prune_table_with_iterator_in_batches::<tables::Receipts>(
range,
self.batch_sizes.receipts,
|rows| {
processed += rows;
trace!(
target: "pruner",
%rows,
block_range = format!("{start_block}..={end_block}"),
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned receipts"
);
},
|receipt| {
num_addresses > 0 &&
receipt.logs.iter().any(|log| {
filtered_addresses[..num_addresses].contains(&&log.address)
})
},
)?;

// If this is the last block range, avoid writing an unused checkpoint
if end_block != to_block {
// This allows us to query for the transactions in the next block range with
// [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate
// checkpoint, which should be adjusted in the end.
provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneCheckpoint {
block_number: end_block,
prune_mode: PruneMode::Before(end_block + 1),
},
)?;
}
}

// If there are contracts using `PruneMode::Distance(_)` there will be receipts before
// `to_block` that become eligible to be pruned in future runs. Therefore, our
// checkpoint is not actually `to_block`, but the `lowest_block_with_distance` from any
// contract. This ensures that in future pruner runs we can
// prune all these receipts between the previous `lowest_block_with_distance` and the new
// one using `get_next_tx_num_range_from_checkpoint`.
let checkpoint_block = self
.modes
.contract_logs_filter
.lowest_block_with_distance(tip_block_number, pruned)?
.unwrap_or(to_block);

provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneCheckpoint {
block_number: checkpoint_block - 1,
prune_mode: PruneMode::Before(checkpoint_block),
},
)?;

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/src/post_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ impl PostState {
let contract_log_pruner = self
.prune_modes
.contract_logs_filter
.group_by_block(tip)
.group_by_block(tip, None)
.map_err(|e| Error::Custom(e.to_string()))?;

// Empty implies that there is going to be
Expand Down
13 changes: 9 additions & 4 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,27 +629,32 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
keys: impl IntoIterator<Item = T::Key>,
) -> std::result::Result<usize, DatabaseError> {
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {})
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {}, |_| false)
}

/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
/// every `batch_size` pruned rows with number of total rows pruned.
///
/// `skip_filter` can be used to skip pruning certain elements.
///
/// Returns number of rows pruned.
pub fn prune_table_with_iterator_in_batches<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
batch_size: usize,
mut batch_callback: impl FnMut(usize),
skip_filter: impl Fn(&T::Value) -> bool,
) -> std::result::Result<usize, DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut deleted = 0;

for key in keys {
if cursor.seek_exact(key)?.is_some() {
cursor.delete_current()?;
if let Some((_, value)) = cursor.seek_exact(key)? {
if !skip_filter(&value) {
cursor.delete_current()?;
deleted += 1;
}
}
deleted += 1;

if deleted % batch_size == 0 {
batch_callback(deleted);
Expand Down

0 comments on commit 24632ac

Please sign in to comment.