diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index 50ff07c84812d..6a7c6ef6a4cbb 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -169,14 +169,13 @@ mod tests { use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use super::*; + use crate::test_dag_builder::DagBuilder; use crate::{ block::{BlockRef, Round}, commit::DEFAULT_WAVE_LENGTH, context::Context, dag_state::DagState, - leader_schedule::{LeaderSchedule, LeaderSwapTable}, storage::mem_store::MemStore, - test_dag::{build_dag, get_all_uncommitted_leader_blocks}, }; #[test] @@ -189,7 +188,6 @@ mod tests { context.clone(), mem_store.clone(), ))); - let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); let last_processed_commit_round = 0; let last_processed_commit_index = 0; let (sender, mut receiver) = unbounded_channel(); @@ -207,15 +205,17 @@ mod tests { // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3. let num_rounds = 10; - build_dag(context.clone(), dag_state.clone(), None, num_rounds); - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule, - num_rounds, - DEFAULT_WAVE_LENGTH, - false, - 1, - ); + let mut builder = DagBuilder::new(context.clone()); + builder + .layers(1..=num_rounds) + .build() + .persist_layers(dag_state.clone()); + + let leaders = builder + .leader_blocks(1..=num_rounds) + .into_iter() + .map(Option::unwrap) + .collect::>(); let commits = observer.handle_commit(leaders.clone()).unwrap(); @@ -235,17 +235,11 @@ mod tests { if idx == 0 { // First subdag includes the leader block plus all ancestor blocks // of the leader minus the genesis round blocks - assert_eq!( - subdag.blocks.len(), - (num_authorities * (DEFAULT_WAVE_LENGTH - 1) as usize) + 1 - ); + assert_eq!(subdag.blocks.len(), 1); } else { // Every subdag after will be missing the leader block from the previous // committed subdag - assert_eq!( - subdag.blocks.len(), - (num_authorities * DEFAULT_WAVE_LENGTH as usize) - ); + assert_eq!(subdag.blocks.len(), num_authorities); } for block in subdag.blocks.iter() { expected_stored_refs.push(block.reference()); @@ -288,7 +282,6 @@ mod tests { context.clone(), mem_store.clone(), ))); - let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); let last_processed_commit_round = 0; let last_processed_commit_index = 0; let (sender, mut receiver) = unbounded_channel(); @@ -306,15 +299,17 @@ mod tests { // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3. let num_rounds = 10; - build_dag(context.clone(), dag_state.clone(), None, num_rounds); - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule, - num_rounds, - DEFAULT_WAVE_LENGTH, - false, - 1, - ); + let mut builder = DagBuilder::new(context.clone()); + builder + .layers(1..=num_rounds) + .build() + .persist_layers(dag_state.clone()); + + let leaders = builder + .leader_blocks(1..=num_rounds) + .into_iter() + .map(Option::unwrap) + .collect::>(); // Commit first batch of leaders (2) and "receive" the subdags as the // consumer of the consensus output channel. @@ -367,7 +362,7 @@ mod tests { .unwrap(), ); - let expected_last_sent_index = 3; + let expected_last_sent_index = num_rounds as usize; while let Ok(subdag) = receiver.try_recv() { tracing::info!("{subdag} was sent but not processed by consumer"); assert_eq!(subdag, commits[processed_subdag_index]); @@ -425,7 +420,6 @@ mod tests { context.clone(), mem_store.clone(), ))); - let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); let last_processed_commit_round = 0; let last_processed_commit_index = 0; let (sender, mut receiver) = unbounded_channel(); @@ -443,19 +437,21 @@ mod tests { // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3. let num_rounds = 10; - build_dag(context.clone(), dag_state.clone(), None, num_rounds); - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule, - num_rounds, - DEFAULT_WAVE_LENGTH, - false, - 1, - ); + let mut builder = DagBuilder::new(context.clone()); + builder + .layers(1..=num_rounds) + .build() + .persist_layers(dag_state.clone()); + + let leaders = builder + .leader_blocks(1..=num_rounds) + .into_iter() + .map(Option::unwrap) + .collect::>(); // Commit all of the leaders and "receive" the subdags as the consumer of // the consensus output channel. - let expected_last_processed_index: usize = 3; + let expected_last_processed_index: usize = 10; let expected_last_processed_round = expected_last_processed_index as u32 * DEFAULT_WAVE_LENGTH; let commits = observer.handle_commit(leaders.clone()).unwrap(); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 4d3b4ce319863..87d17dfc4630e 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -302,6 +302,7 @@ impl Core { if !self.leaders_exist(quorum_round) { return None; } + if Duration::from_millis( self.context .clock diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index cbaf3875b4e46..8810fcff5c0bd 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -736,10 +736,10 @@ mod test { use parking_lot::RwLock; use super::*; + use crate::test_dag_builder::DagBuilder; use crate::{ block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock}, storage::{mem_store::MemStore, WriteBatch}, - test_dag::build_dag, }; #[test] @@ -1493,7 +1493,16 @@ mod test { // WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum { - let round_4_blocks = build_dag(context, dag_state.clone(), None, 4); + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=4) + .build() + .persist_layers(dag_state.clone()); + let round_4_blocks: Vec<_> = dag_builder + .blocks(4..=4) + .into_iter() + .map(|block| block.reference()) + .collect(); let last_quorum = dag_state.read().last_quorum(); @@ -1546,7 +1555,11 @@ mod test { // WHEN adding some blocks for authorities, only the last ones should be returned { // add blocks up to round 4 - build_dag(context.clone(), dag_state.clone(), None, 4); + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=4) + .build() + .persist_layers(dag_state.clone()); // add block 5 for authority 0 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()); diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index 173dbc419279d..4ac1374af0c5c 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -147,12 +147,12 @@ impl Linearizer { #[cfg(test)] mod tests { use super::*; + use crate::test_dag_builder::DagBuilder; use crate::{ commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH}, context::Context, leader_schedule::{LeaderSchedule, LeaderSwapTable}, storage::mem_store::MemStore, - test_dag::{build_dag, get_all_uncommitted_leader_blocks}, }; #[test] @@ -165,19 +165,20 @@ mod tests { Arc::new(MemStore::new()), ))); let mut linearizer = Linearizer::new(dag_state.clone()); - let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3. let num_rounds: u32 = 10; - build_dag(context.clone(), dag_state.clone(), None, num_rounds); - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule, - num_rounds, - DEFAULT_WAVE_LENGTH, - false, - 1, - ); + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=num_rounds) + .build() + .persist_layers(dag_state.clone()); + + let leaders = dag_builder + .leader_blocks(1..=num_rounds) + .into_iter() + .map(Option::unwrap) + .collect::>(); let commits = linearizer.handle_commit(leaders.clone()); for (idx, subdag) in commits.into_iter().enumerate() { @@ -185,19 +186,12 @@ mod tests { assert_eq!(subdag.leader, leaders[idx].reference()); assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms()); if idx == 0 { - // First subdag includes the leader block plus all ancestor blocks - // of the leader minus the genesis round blocks - assert_eq!( - subdag.blocks.len(), - (num_authorities * (DEFAULT_WAVE_LENGTH - 1) as usize) + 1 - ); + // First subdag includes the leader block only + assert_eq!(subdag.blocks.len(), 1); } else { // Every subdag after will be missing the leader block from the previous // committed subdag - assert_eq!( - subdag.blocks.len(), - (num_authorities * DEFAULT_WAVE_LENGTH as usize) - ); + assert_eq!(subdag.blocks.len(), num_authorities); } for block in subdag.blocks.iter() { assert!(block.round() <= leaders[idx].round()); @@ -219,112 +213,76 @@ mod tests { let mut linearizer = Linearizer::new(dag_state.clone()); let wave_length = DEFAULT_WAVE_LENGTH; - let mut blocks = vec![]; - let mut ancestors = None; let leader_round_wave_1 = 3; + let leader_round_wave_2 = leader_round_wave_1 + wave_length; - // Build dag layers for rounds 0 ~ 2 and maintain list of blocks to be included - // in the subdag of the leader of wave 1. - for n in 0..leader_round_wave_1 { - ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, n)); - blocks.extend(ancestors.clone().unwrap()); - } - - // Build dag layer for round 3 which is the leader round of wave 1 - ancestors = Some(build_dag( - context.clone(), - dag_state.clone(), - ancestors, - leader_round_wave_1, - )); - - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule.clone(), - leader_round_wave_1, - wave_length, - false, - 1, + // Build a Dag from round 1..=6 + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=leader_round_wave_2).build(); + + // Now retrieve all the blocks up to round leader_round_wave_1 - 1 + // And then only the leader of round leader_round_wave_1 + // Also store those to DagState + let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1); + blocks.push( + dag_builder + .leader_block(leader_round_wave_1) + .expect("Leader block should have been found"), ); + dag_state.write().accept_blocks(blocks.clone()); - // Add leader block to first committed subdag blocks - blocks.push(leaders[0].reference()); - - let mut last_committed_rounds = vec![0; num_authorities]; - for block in blocks.iter() { - let last_committed_round = last_committed_rounds[block.author]; - last_committed_rounds[block.author] = std::cmp::max(last_committed_round, block.round); - } - - let first_leader = leaders[0].clone(); + let first_leader = dag_builder + .leader_block(leader_round_wave_1) + .expect("Wave 1 leader round block should exist"); let mut last_commit_index = 1; let first_commit_data = TrustedCommit::new_for_test( last_commit_index, CommitDigest::MIN, 0, first_leader.reference(), - blocks.clone(), + blocks.into_iter().map(|block| block.reference()).collect(), ); dag_state.write().add_commit(first_commit_data); - blocks.clear(); - let leader_round_wave_2 = leader_round_wave_1 + wave_length; - - // Add all nonleader blocks from round 3 to the blocks which will be part - // of the second committed subdag - blocks.extend( - ancestors - .clone() - .unwrap() - .into_iter() - .filter(|block_ref| block_ref.author != first_leader.author()) - .collect::>(), - ); - - // Build dag layers for rounds 4 ~ 5 and maintain list of blocks to be included - // in the subdag of the leader of wave 1. - for n in leader_round_wave_1 + 1..leader_round_wave_2 { - ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, n)); - blocks.extend(ancestors.clone().unwrap()); - } - - // Build dag layer for round 6 which is the leader round of wave 2 - build_dag( - context.clone(), - dag_state.clone(), - ancestors, - leader_round_wave_2, + // Now take all the blocks from round `leader_round_wave_1` up to round `leader_round_wave_2-1` + let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1); + // Filter out leader block of round `leader_round_wave_1` + blocks.retain(|block| { + !(block.round() == leader_round_wave_1 + && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0)) + }); + // Add the leader block of round `leader_round_wave_2` + blocks.push( + dag_builder + .leader_block(leader_round_wave_2) + .expect("Leader block should have been found"), ); + // Write them in dag state + dag_state.write().accept_blocks(blocks.clone()); - let leaders = get_all_uncommitted_leader_blocks( - dag_state.clone(), - leader_schedule, - leader_round_wave_2, - wave_length, - false, - 1, - ); + let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect(); - // Add leader block to second committed subdag blocks - blocks.push(leaders[1].reference()); + // Now get the latest leader which is the leader round of wave 2 + let leader = dag_builder + .leader_block(leader_round_wave_2) + .expect("Leader block should exist"); last_commit_index += 1; - let second_leader = leaders[1].clone(); let expected_second_commit = TrustedCommit::new_for_test( last_commit_index, CommitDigest::MIN, 0, - second_leader.reference(), + leader.reference(), blocks.clone(), ); - let commit = linearizer.handle_commit(vec![second_leader.clone()]); + let commit = linearizer.handle_commit(vec![leader.clone()]); assert_eq!(commit.len(), 1); let subdag = &commit[0]; tracing::info!("{subdag:?}"); - assert_eq!(subdag.leader, second_leader.reference()); - assert_eq!(subdag.timestamp_ms, second_leader.timestamp_ms()); + assert_eq!(subdag.leader, leader.reference()); + assert_eq!(subdag.timestamp_ms, leader.timestamp_ms()); assert_eq!(subdag.commit_index, expected_second_commit.index()); // Using the same sorting as used in CommittedSubDag::sort diff --git a/consensus/core/src/test_dag.rs b/consensus/core/src/test_dag.rs index 9a4f8a6322289..06e138eb67ff5 100644 --- a/consensus/core/src/test_dag.rs +++ b/consensus/core/src/test_dag.rs @@ -7,10 +7,9 @@ use consensus_config::AuthorityIndex; use parking_lot::RwLock; use crate::{ - block::{genesis_blocks, BlockRef, BlockTimestampMs, Round, Slot, TestBlock, VerifiedBlock}, + block::{genesis_blocks, BlockRef, BlockTimestampMs, Round, TestBlock, VerifiedBlock}, context::Context, dag_state::DagState, - leader_schedule::LeaderSchedule, }; // todo: remove this once tests have been refactored to use DagBuilder/DagParser @@ -89,28 +88,3 @@ pub(crate) fn build_dag_layer( } references } - -// Leader blocks start from round 1 as we do not consider any blocks from genesis -// round as a leader block. -// TODO: confirm pipelined & multi-leader cases work properly and interaction with -// DagState flush -pub(crate) fn get_all_uncommitted_leader_blocks( - dag_state: Arc>, - leader_schedule: LeaderSchedule, - num_rounds: u32, - wave_length: u32, - pipelined: bool, - num_leaders: u32, -) -> Vec { - let mut blocks = Vec::new(); - for round in 1..=num_rounds { - for leader_offset in 0..num_leaders { - if pipelined || round % wave_length == 0 { - let slot = Slot::new(round, leader_schedule.elect_leader(round, leader_offset)); - let uncommitted_blocks = dag_state.read().get_uncommitted_blocks_at_slot(slot); - blocks.extend(uncommitted_blocks); - } - } - } - blocks -} diff --git a/consensus/core/src/test_dag_builder.rs b/consensus/core/src/test_dag_builder.rs index 2ca3264a1d74f..f8d84280b09b7 100644 --- a/consensus/core/src/test_dag_builder.rs +++ b/consensus/core/src/test_dag_builder.rs @@ -107,6 +107,46 @@ impl DagBuilder { } } + pub(crate) fn blocks(&self, rounds: RangeInclusive) -> Vec { + assert!( + !self.blocks.is_empty(), + "No blocks have been created, please make sure that you have called build method" + ); + self.blocks + .iter() + .filter_map(|(block_ref, block)| rounds.contains(&block_ref.round).then_some(block)) + .cloned() + .collect::>() + } + + pub(crate) fn leader_blocks( + &self, + rounds: RangeInclusive, + ) -> Vec> { + assert!( + !self.blocks.is_empty(), + "No blocks have been created, please make sure that you have called build method" + ); + rounds + .into_iter() + .map(|round| self.leader_block(round)) + .collect() + } + + pub(crate) fn leader_block(&self, round: Round) -> Option { + assert!( + !self.blocks.is_empty(), + "No blocks have been created, please make sure that you have called build method" + ); + self.blocks + .iter() + .find(|(block_ref, block)| { + block_ref.round == round + && block_ref.author == self.leader_schedule.elect_leader(round, 0) + }) + .map(|(_block_ref, block)| block.clone()) + } + pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self { self.wave_length = wave_length; self @@ -392,6 +432,7 @@ impl<'a> LayerBuilder<'a> { } pub fn persist_layers(&self, dag_state: Arc>) { + assert!(!self.blocks.is_empty(), "Called to persist layers although no blocks have been created. Make sure you have called build before."); dag_state.write().accept_blocks(self.blocks.clone()); }