Skip to content

Commit

Permalink
[Consensus 2.0] trigger leader timeout on min round delay as well (My…
Browse files Browse the repository at this point in the history
…stenLabs#17282)

## Description 

This PR is amending the leader timeout component to trigger a block
creation both when:
* the min round delay has pass - without though trying to `force` the
block creation. This ensures that even though all the blocks have been
received on a round before the min round, an block creation attempt will
be made without having to reach the max leader timeout
* when the leader timeout has passed, as before, it will attempt to
create a new block

The approach for now has been kept simple and both timers will trigger
unless a new round has advanced. More smart decisions could be made as
follow ups.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored Apr 22, 2024
1 parent 4af4f0a commit 2fa3972
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 44 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ mod tests {
Ok(block_refs)
}

async fn force_new_block(&self, _round: Round) -> Result<(), CoreError> {
async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
unimplemented!()
}

Expand Down
22 changes: 15 additions & 7 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,23 @@ impl Core {
.set(self.threshold_clock.get_round() as i64);
}

/// Force creating a new block for the dictated round. This is used when a leader timeout occurs.
pub(crate) fn force_new_block(
/// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
/// when the min timeout expires or max. When `force = true` , then any checks like previous round
/// leader existence will get skipped.
pub(crate) fn new_block(
&mut self,
round: Round,
force: bool,
) -> ConsensusResult<Option<VerifiedBlock>> {
let _scope = monitored_scope("Core::force_new_block");
let _scope = monitored_scope("Core::new_block");
if self.last_proposed_round() < round {
self.context.metrics.node_metrics.leader_timeout_total.inc();
return self.try_propose(true);
self.context
.metrics
.node_metrics
.leader_timeout_total
.with_label_values(&[&format!("{force}")])
.inc();
return self.try_propose(force);
}
Ok(None)
}
Expand Down Expand Up @@ -1081,7 +1089,7 @@ mod test {
// Now try to create the blocks for round 4 via the leader timeout method which should
// ignore any leader checks or min round delay.
for (core, _, _, _, store) in cores.iter_mut() {
assert!(core.force_new_block(4).unwrap().is_some());
assert!(core.new_block(4, true).unwrap().is_some());
assert_eq!(core.last_proposed_round(), 4);

// Check commits have been persisted to store
Expand Down Expand Up @@ -1195,7 +1203,7 @@ mod test {

// try to propose to ensure that we are covering the case where we miss the leader authority 3
core.add_blocks(last_round_blocks.clone()).unwrap();
core.force_new_block(round).unwrap();
core.new_block(round, true).unwrap();

let block = core.last_proposed_block();
assert_eq!(block.round(), round);
Expand Down
16 changes: 9 additions & 7 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
enum CoreThreadCommand {
/// Add blocks to be processed and accepted
AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
/// Called when a leader timeout occurs and a block should be produced
ForceNewBlock(Round, oneshot::Sender<()>),
/// Called when the min round has passed or the leader timeout occurred and a block should be produced.
/// When the command is called with `force = true`, then the block will be created for `round` skipping
/// any checks (ex leader existence of previous round). More information can be found on the `Core` component.
NewBlock(Round, oneshot::Sender<()>, bool),
/// Request missing blocks that need to be synced.
GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
}
Expand All @@ -41,7 +43,7 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
-> Result<BTreeSet<BlockRef>, CoreError>;

async fn force_new_block(&self, round: Round) -> Result<(), CoreError>;
async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;

async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
}
Expand Down Expand Up @@ -77,8 +79,8 @@ impl CoreThread {
let missing_blocks = self.core.add_blocks(blocks)?;
sender.send(missing_blocks).ok();
}
CoreThreadCommand::ForceNewBlock(round, sender) => {
self.core.force_new_block(round)?;
CoreThreadCommand::NewBlock(round, sender, force) => {
self.core.new_block(round, force)?;
sender.send(()).ok();
}
CoreThreadCommand::GetMissing(sender) => {
Expand Down Expand Up @@ -159,9 +161,9 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
receiver.await.map_err(Shutdown)
}

async fn force_new_block(&self, round: Round) -> Result<(), CoreError> {
async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
let (sender, receiver) = oneshot::channel();
self.send(CoreThreadCommand::ForceNewBlock(round, sender))
self.send(CoreThreadCommand::NewBlock(round, sender, force))
.await;
receiver.await.map_err(Shutdown)
}
Expand Down
98 changes: 73 additions & 25 deletions consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) struct LeaderTimeoutTask<D: CoreThreadDispatcher> {
dispatcher: Arc<D>,
new_round_receiver: watch::Receiver<Round>,
leader_timeout: Duration,
min_round_delay: Duration,
stop: Receiver<()>,
}

Expand All @@ -43,6 +44,7 @@ impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
stop,
new_round_receiver: signals_receivers.new_round_receiver(),
leader_timeout: context.parameters.leader_timeout,
min_round_delay: context.parameters.min_round_delay,
};
let handle = tokio::spawn(async move { me.run().await });

Expand All @@ -55,35 +57,56 @@ impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
async fn run(&mut self) {
let new_round = &mut self.new_round_receiver;
let mut leader_round: Round = *new_round.borrow_and_update();
let mut leader_round_timed_out = false;
let mut min_leader_round_timed_out = false;
let mut max_leader_round_timed_out = false;
let timer_start = Instant::now();
let leader_timeout = sleep_until(timer_start + self.leader_timeout);
let min_leader_timeout = sleep_until(timer_start + self.min_round_delay);
let max_leader_timeout = sleep_until(timer_start + self.leader_timeout);

tokio::pin!(leader_timeout);
tokio::pin!(min_leader_timeout);
tokio::pin!(max_leader_timeout);

loop {
tokio::select! {
// when leader timer expires then we attempt to trigger the creation of a new block.
// when the min leader timer expires then we attempt to trigger the creation of a new block.
// If we already timed out before then the branch gets disabled so we don't attempt
// all the time to produce already produced blocks for that round.
() = &mut leader_timeout, if !leader_round_timed_out => {
if let Err(err) = self.dispatcher.force_new_block(leader_round).await {
() = &mut min_leader_timeout, if !min_leader_round_timed_out => {
if let Err(err) = self.dispatcher.new_block(leader_round, false).await {
warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
return;
}
leader_round_timed_out = true;
min_leader_round_timed_out = true;
},
// When the max leader timer expires then we attempt to trigger the creation of a new block. This
// call is made with `force = true` to bypass any checks that allow to propose immediately if block
// not already produced.
// Keep in mind that first the min timeout should get triggered and then the max timeout, only
// if the round has not advanced in the meantime. Otherwise, the max timeout will not get
// triggered at all.
() = &mut max_leader_timeout, if !max_leader_round_timed_out => {
if let Err(err) = self.dispatcher.new_block(leader_round, true).await {
warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
return;
}
max_leader_round_timed_out = true;
}

// a new round has been produced. Reset the leader timeout.
Ok(_) = new_round.changed() => {
leader_round = *new_round.borrow_and_update();
debug!("New round has been received {leader_round}, resetting timer");

leader_round_timed_out = false;
min_leader_round_timed_out = false;
max_leader_round_timed_out = false;

leader_timeout
let now = Instant::now();
min_leader_timeout
.as_mut()
.reset(Instant::now() + self.leader_timeout);
.reset(now + self.min_round_delay);
max_leader_timeout
.as_mut()
.reset(now + self.leader_timeout);
},
_ = &mut self.stop => {
debug!("Stop signal has been received, now shutting down");
Expand Down Expand Up @@ -113,12 +136,12 @@ mod tests {

#[derive(Clone, Default)]
struct MockCoreThreadDispatcher {
force_new_block_calls: Arc<Mutex<Vec<(Round, Instant)>>>,
new_block_calls: Arc<Mutex<Vec<(Round, bool, Instant)>>>,
}

impl MockCoreThreadDispatcher {
async fn get_force_new_block_calls(&self) -> Vec<(Round, Instant)> {
let mut binding = self.force_new_block_calls.lock();
async fn get_new_block_calls(&self) -> Vec<(Round, bool, Instant)> {
let mut binding = self.new_block_calls.lock();
let all_calls = binding.drain(0..);
all_calls.into_iter().collect()
}
Expand All @@ -133,10 +156,10 @@ mod tests {
todo!()
}

async fn force_new_block(&self, round: Round) -> Result<(), CoreError> {
self.force_new_block_calls
async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
self.new_block_calls
.lock()
.push((round, Instant::now()));
.push((round, force, Instant::now()));
Ok(())
}

Expand All @@ -150,8 +173,10 @@ mod tests {
let (context, _signers) = Context::new_for_test(4);
let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let leader_timeout = Duration::from_millis(500);
let min_round_delay = Duration::from_millis(50);
let parameters = Parameters {
leader_timeout,
min_round_delay,
..Default::default()
};
let context = Arc::new(context.with_parameters(parameters));
Expand All @@ -165,14 +190,29 @@ mod tests {
// send a signal that a new round has been produced.
signals.new_round(10);

// wait enough until a force_new_block has been received
sleep(2 * leader_timeout).await;
let all_calls = dispatcher.get_force_new_block_calls().await;
// wait enough until the min round delay has passed and a new_block call is triggered
sleep(2 * min_round_delay).await;
let all_calls = dispatcher.get_new_block_calls().await;
assert_eq!(all_calls.len(), 1);

let (round, force, timestamp) = all_calls[0];
assert_eq!(round, 10);
assert!(!force);
assert!(
min_round_delay <= timestamp - start,
"Leader timeout min setting {:?} should be less than actual time difference {:?}",
min_round_delay,
timestamp - start
);

// wait enough until a new_block has been received
sleep(2 * leader_timeout).await;
let all_calls = dispatcher.get_new_block_calls().await;
assert_eq!(all_calls.len(), 1);

let (round, timestamp) = all_calls[0];
let (round, force, timestamp) = all_calls[0];
assert_eq!(round, 10);
assert!(force);
assert!(
leader_timeout <= timestamp - start,
"Leader timeout setting {:?} should be less than actual time difference {:?}",
Expand All @@ -182,7 +222,7 @@ mod tests {

// now wait another 2 * leader_timeout, no other call should be received
sleep(2 * leader_timeout).await;
let all_calls = dispatcher.get_force_new_block_calls().await;
let all_calls = dispatcher.get_new_block_calls().await;

assert_eq!(all_calls.len(), 0);
}
Expand All @@ -192,8 +232,10 @@ mod tests {
let (context, _signers) = Context::new_for_test(4);
let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
let leader_timeout = Duration::from_millis(500);
let min_round_delay = Duration::from_millis(50);
let parameters = Parameters {
leader_timeout,
min_round_delay,
..Default::default()
};
let context = Arc::new(context.with_parameters(parameters));
Expand All @@ -207,16 +249,22 @@ mod tests {
// now send some signals with some small delay between them, but not enough so every round
// manages to timeout and call the force new block method.
signals.new_round(13);
sleep(leader_timeout / 2).await;
sleep(min_round_delay / 2).await;
signals.new_round(14);
sleep(leader_timeout / 2).await;
sleep(min_round_delay / 2).await;
signals.new_round(15);
sleep(2 * leader_timeout).await;

// only the last one should be received
let all_calls = dispatcher.get_force_new_block_calls().await;
let (round, timestamp) = all_calls[0];
let all_calls = dispatcher.get_new_block_calls().await;
let (round, force, timestamp) = all_calls[0];
assert_eq!(round, 15);
assert!(!force);
assert!(min_round_delay < timestamp - now);

let (round, force, timestamp) = all_calls[1];
assert_eq!(round, 15);
assert!(force);
assert!(leader_timeout < timestamp - now);
}
}
7 changes: 4 additions & 3 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub(crate) struct NodeMetrics {
pub last_committed_leader_round: IntGauge,
pub commit_round_advancement_interval: Histogram,
pub last_decided_leader_round: IntGauge,
pub leader_timeout_total: IntCounter,
pub leader_timeout_total: IntCounterVec,
pub missing_blocks_total: IntCounter,
pub missing_blocks_after_fetch_total: IntCounter,
pub quorum_receive_latency: Histogram,
Expand Down Expand Up @@ -275,9 +275,10 @@ impl NodeMetrics {
"The last round where a commit decision was made.",
registry,
).unwrap(),
leader_timeout_total: register_int_counter_with_registry!(
leader_timeout_total: register_int_counter_vec_with_registry!(
"leader_timeout_total",
"Total number of leader timeouts",
"Total number of leader timeouts, either when the min round time has passed, or max leader timeout",
&["timeout_type"],
registry,
).unwrap(),
missing_blocks_total: register_int_counter_with_registry!(
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ mod tests {
Ok(BTreeSet::new())
}

async fn force_new_block(&self, _round: Round) -> Result<(), CoreError> {
async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
todo!()
}

Expand Down

0 comments on commit 2fa3972

Please sign in to comment.