Skip to content

Commit

Permalink
[Consensus 2.0] register failure points for new consensus for simtests (
Browse files Browse the repository at this point in the history
MystenLabs#17275)

## Description 

* Adds failure points for new consensus
* Verify commits timestamp monotonicity and panics if that fails

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored Apr 24, 2024
1 parent 0d2d137 commit 1293197
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ prost.workspace = true
rand.workspace = true
serde.workspace = true
shared-crypto.workspace = true
sui-macros.workspace = true
sui-protocol-config.workspace = true
tap.workspace = true
thiserror.workspace = true
Expand Down
13 changes: 13 additions & 0 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytes::Bytes;
use consensus_config::AuthorityIndex;
use futures::{ready, stream, task, Stream, StreamExt};
use parking_lot::RwLock;
use sui_macros::{fail_point, fail_point_async};
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -71,6 +72,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
peer: AuthorityIndex,
serialized_block: Bytes,
) -> ConsensusResult<()> {
fail_point_async!("consensus-rpc-response");

let peer_hostname = &self.context.committee.authority(peer).hostname;

// TODO: dedup block verifications, here and with fetched blocks.
Expand Down Expand Up @@ -221,6 +224,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
peer: AuthorityIndex,
last_received: Round,
) -> ConsensusResult<BlockStream> {
fail_point_async!("consensus-rpc-response");

let dag_state = self.dag_state.read();
// Find recent own blocks that have not been received by the peer.
// If last_received is a valid and more blocks have been proposed since then, this call is
Expand All @@ -233,6 +238,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
);
let broadcasted_blocks =
BroadcastedBlockStream::new(peer, self.rx_block_broadcaster.resubscribe());

// Return a stream of blocks that first yields missed blocks as requested, then new blocks.
Ok(Box::pin(missed_blocks.chain(
broadcasted_blocks.map(|block| block.serialized().clone()),
Expand All @@ -245,6 +251,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
block_refs: Vec<BlockRef>,
highest_accepted_rounds: Vec<Round>,
) -> ConsensusResult<Vec<Bytes>> {
fail_point_async!("consensus-rpc-response");

const MAX_ADDITIONAL_BLOCKS: usize = 10;
if block_refs.len() > self.context.parameters.max_blocks_per_fetch {
return Err(ConsensusError::TooManyFetchBlocksRequested(peer));
Expand Down Expand Up @@ -309,6 +317,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
start: CommitIndex,
end: CommitIndex,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
fail_point_async!("consensus-rpc-response");

// Compute an exclusive end index and bound the maximum number of commits scanned.
let exclusive_end =
(end + 1).min(start + self.context.parameters.commit_sync_batch_size as CommitIndex);
Expand Down Expand Up @@ -376,6 +386,9 @@ impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
let maybe_item = loop {
let (result, rx) = ready!(self.inner.poll(cx));
self.inner.set(make_recv_future(rx));

fail_point!("consensus-rpc-response");

match result {
Ok(item) => break Some(item),
Err(broadcast::error::RecvError::Closed) => {
Expand Down
5 changes: 3 additions & 2 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ impl CommitObserver {
for commit in unsent_commits {
// Commit index must be continuous.
assert_eq!(commit.index(), last_sent_commit_index + 1);
let committed_subdag = load_committed_subdag_from_store(self.store.as_ref(), commit);
self.sender.send(committed_subdag).unwrap_or_else(|e| {
let committed_sub_dag = load_committed_subdag_from_store(self.store.as_ref(), commit);

self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
panic!(
"Failed to send commit during recovery, probably due to shutdown: {:?}",
e
Expand Down
4 changes: 4 additions & 0 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use consensus_config::ProtocolKeyPair;
use itertools::Itertools as _;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
use sui_macros::fail_point;
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -263,6 +264,9 @@ impl Core {
fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
if let Some(block) = self.try_new_block(force) {
self.signals.new_block(block.clone())?;

fail_point!("consensus-after-propose");

// The new block may help commit.
self.try_commit()?;
return Ok(Some(block));
Expand Down
4 changes: 4 additions & 0 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,10 @@ impl DagState {
return;
}
assert_eq!(commit.index(), last_commit.index() + 1);

if commit.timestamp_ms() < last_commit.timestamp_ms() {
panic!("Commit timestamps do not monotonically increment, prev commit {:?}, new commit {:?}", last_commit, commit);
}
} else {
assert_eq!(commit.index(), 1);
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Linearizer {

committed_sub_dags.push(sub_dag);
}

// Committed blocks must be persisted to storage before sending them to Sui and executing
// their transactions.
// Commit metadata can be persisted more lazily because they are recoverable. Uncommitted
Expand Down
4 changes: 4 additions & 0 deletions consensus/core/src/storage/rocksdb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{

use bytes::Bytes;
use consensus_config::AuthorityIndex;
use sui_macros::fail_point;
use typed_store::{
metrics::SamplingInterval,
reopen,
Expand Down Expand Up @@ -99,6 +100,8 @@ impl RocksDBStore {

impl Store for RocksDBStore {
fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
fail_point!("consensus-store-before-write");

let mut batch = self.blocks.batch();
for block in write_batch.blocks {
let block_ref = block.reference();
Expand Down Expand Up @@ -149,6 +152,7 @@ impl Store for RocksDBStore {
}
}
batch.write()?;
fail_point!("consensus-store-after-write");
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::error::{ConsensusError, ConsensusResult};
use crate::network::NetworkClient;
use crate::{BlockAPI, Round};
use consensus_config::AuthorityIndex;
use sui_macros::fail_point_async;

/// The number of concurrent fetch blocks requests per authority
const FETCH_BLOCKS_CONCURRENCY: usize = 5;
Expand Down Expand Up @@ -599,6 +600,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
)
.await;

fail_point_async!("consensus-delay");

let resp = match resp {
Ok(Err(err)) => {
// Add a delay before retrying - if that is needed. If request has timed out then eventually
Expand Down Expand Up @@ -642,6 +645,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.inc();
let total_requested = missing_blocks.len();

fail_point_async!("consensus-delay");

// Fetch blocks from peers
let results = Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, core_dispatcher.clone(), dag_state).await;
if results.is_empty() {
Expand Down
21 changes: 20 additions & 1 deletion crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,17 @@ mod test {
}
});

// Narwhal fail points.
// Narwhal & Consensus 2.0 fail points.
let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes_clone = keep_alive_nodes.clone();
register_fail_points(
&[
"narwhal-rpc-response",
"narwhal-store-before-write",
"narwhal-store-after-write",
"consensus-store-before-write",
"consensus-store-after-write",
"consensus-after-propose",
],
move || {
handle_failpoint(
Expand All @@ -282,6 +285,22 @@ mod test {
},
);
register_fail_point_async("narwhal-delay", || delay_failpoint(10..20, 0.001));

let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes_clone = keep_alive_nodes.clone();
register_fail_point_async("consensus-rpc-response", move || {
let dead_validator = dead_validator.clone();
let keep_alive_nodes_clone = keep_alive_nodes_clone.clone();
async move {
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes_clone.clone(),
0.001,
);
}
});
register_fail_point_async("consensus-delay", || delay_failpoint(10..20, 0.001));

register_fail_point_async("writeback-cache-commit", || delay_failpoint(10..20, 0.001));

test_simulated_load(TestInitData::new(&test_cluster).await, 120).await;
Expand Down

0 comments on commit 1293197

Please sign in to comment.