Skip to content

Commit

Permalink
[Consensus 2.0] use DagBuilder (MystenLabs#17411)
Browse files Browse the repository at this point in the history
## Description 

This PR swaps the use of the `build_dag` method with the `DagBuilder` in
the `linearizer, commit_observer and dag_state`

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored May 2, 2024
1 parent 6bae3a6 commit c7a380c
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 170 deletions.
80 changes: 38 additions & 42 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ mod tests {
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

use super::*;
use crate::test_dag_builder::DagBuilder;
use crate::{
block::{BlockRef, Round},
commit::DEFAULT_WAVE_LENGTH,
context::Context,
dag_state::DagState,
leader_schedule::{LeaderSchedule, LeaderSwapTable},
storage::mem_store::MemStore,
test_dag::{build_dag, get_all_uncommitted_leader_blocks},
};

#[test]
Expand All @@ -189,7 +188,6 @@ mod tests {
context.clone(),
mem_store.clone(),
)));
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
let last_processed_commit_round = 0;
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel();
Expand All @@ -207,15 +205,17 @@ mod tests {

// Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
let num_rounds = 10;
build_dag(context.clone(), dag_state.clone(), None, num_rounds);
let leaders = get_all_uncommitted_leader_blocks(
dag_state.clone(),
leader_schedule,
num_rounds,
DEFAULT_WAVE_LENGTH,
false,
1,
);
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());

let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();

let commits = observer.handle_commit(leaders.clone()).unwrap();

Expand All @@ -235,17 +235,11 @@ mod tests {
if idx == 0 {
// First subdag includes the leader block plus all ancestor blocks
// of the leader minus the genesis round blocks
assert_eq!(
subdag.blocks.len(),
(num_authorities * (DEFAULT_WAVE_LENGTH - 1) as usize) + 1
);
assert_eq!(subdag.blocks.len(), 1);
} else {
// Every subdag after will be missing the leader block from the previous
// committed subdag
assert_eq!(
subdag.blocks.len(),
(num_authorities * DEFAULT_WAVE_LENGTH as usize)
);
assert_eq!(subdag.blocks.len(), num_authorities);
}
for block in subdag.blocks.iter() {
expected_stored_refs.push(block.reference());
Expand Down Expand Up @@ -288,7 +282,6 @@ mod tests {
context.clone(),
mem_store.clone(),
)));
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
let last_processed_commit_round = 0;
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel();
Expand All @@ -306,15 +299,17 @@ mod tests {

// Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
let num_rounds = 10;
build_dag(context.clone(), dag_state.clone(), None, num_rounds);
let leaders = get_all_uncommitted_leader_blocks(
dag_state.clone(),
leader_schedule,
num_rounds,
DEFAULT_WAVE_LENGTH,
false,
1,
);
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());

let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();

// Commit first batch of leaders (2) and "receive" the subdags as the
// consumer of the consensus output channel.
Expand Down Expand Up @@ -367,7 +362,7 @@ mod tests {
.unwrap(),
);

let expected_last_sent_index = 3;
let expected_last_sent_index = num_rounds as usize;
while let Ok(subdag) = receiver.try_recv() {
tracing::info!("{subdag} was sent but not processed by consumer");
assert_eq!(subdag, commits[processed_subdag_index]);
Expand Down Expand Up @@ -425,7 +420,6 @@ mod tests {
context.clone(),
mem_store.clone(),
)));
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
let last_processed_commit_round = 0;
let last_processed_commit_index = 0;
let (sender, mut receiver) = unbounded_channel();
Expand All @@ -443,19 +437,21 @@ mod tests {

// Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
let num_rounds = 10;
build_dag(context.clone(), dag_state.clone(), None, num_rounds);
let leaders = get_all_uncommitted_leader_blocks(
dag_state.clone(),
leader_schedule,
num_rounds,
DEFAULT_WAVE_LENGTH,
false,
1,
);
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());

let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();

// Commit all of the leaders and "receive" the subdags as the consumer of
// the consensus output channel.
let expected_last_processed_index: usize = 3;
let expected_last_processed_index: usize = 10;
let expected_last_processed_round =
expected_last_processed_index as u32 * DEFAULT_WAVE_LENGTH;
let commits = observer.handle_commit(leaders.clone()).unwrap();
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl Core {
if !self.leaders_exist(quorum_round) {
return None;
}

if Duration::from_millis(
self.context
.clock
Expand Down
19 changes: 16 additions & 3 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,10 @@ mod test {
use parking_lot::RwLock;

use super::*;
use crate::test_dag_builder::DagBuilder;
use crate::{
block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
storage::{mem_store::MemStore, WriteBatch},
test_dag::build_dag,
};

#[test]
Expand Down Expand Up @@ -1493,7 +1493,16 @@ mod test {

// WHEN a fully connected DAG up to round 4 is created, then round 4 blocks should be returned as quorum
{
let round_4_blocks = build_dag(context, dag_state.clone(), None, 4);
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder
.layers(1..=4)
.build()
.persist_layers(dag_state.clone());
let round_4_blocks: Vec<_> = dag_builder
.blocks(4..=4)
.into_iter()
.map(|block| block.reference())
.collect();

let last_quorum = dag_state.read().last_quorum();

Expand Down Expand Up @@ -1546,7 +1555,11 @@ mod test {
// WHEN adding some blocks for authorities, only the last ones should be returned
{
// add blocks up to round 4
build_dag(context.clone(), dag_state.clone(), None, 4);
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder
.layers(1..=4)
.build()
.persist_layers(dag_state.clone());

// add block 5 for authority 0
let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
Expand Down
Loading

0 comments on commit c7a380c

Please sign in to comment.