Skip to content

Commit

Permalink
fix(stages): transaction lookup stage checkpoint calculation (paradig…
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Jul 26, 2023
1 parent aa5d39d commit 74bbe5a
Showing 1 changed file with 73 additions and 6 deletions.
79 changes: 73 additions & 6 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use reth_db::{
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H256,
PrunePart, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::DatabaseProviderRW;
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader};
use tokio::sync::mpsc;
use tracing::*;

Expand Down Expand Up @@ -183,9 +183,20 @@ fn calculate_hash(

fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64,
// If `TxHashNumber` table was pruned, we will have a number of entries in it not matching
// the actual number of processed transactions. To fix that, we add the number of pruned
// `TxHashNumber` entries.
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64 + pruned_entries,
total: provider.tx_ref().entries::<tables::Transactions>()? as u64,
})
}
Expand All @@ -202,8 +213,13 @@ mod tests {
generators,
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
use reth_provider::{BlockReader, ProviderError, TransactionsProvider};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, H256,
MAINNET,
};
use reth_provider::{
BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider,
};

// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
Expand Down Expand Up @@ -321,6 +337,57 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}

#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let max_pruned_block = 30;
let max_processed_block = 70;

let mut tx_hash_numbers = Vec::new();
let mut tx_hash_number = 0;
for block in &blocks[..=max_processed_block] {
for transaction in &block.body {
if block.number > max_pruned_block {
tx_hash_numbers.push((transaction.hash, tx_hash_number));
}
tx_hash_number += 1;
}
}
tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");

let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
prune_mode: PruneMode::Full,
},
)
.expect("save stage checkpoint");
provider.commit().expect("commit");

let db = tx.inner_raw();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().expect("provider rw");

assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {
processed: blocks[..=max_processed_block]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
total: blocks.iter().map(|block| block.body.len() as u64).sum::<u64>()
}
);
}

struct TransactionLookupTestRunner {
tx: TestTransaction,
threshold: u64,
Expand Down

0 comments on commit 74bbe5a

Please sign in to comment.