Skip to content

Commit

Permalink
[tx-finalizer] Cap validator wait time (MystenLabs#18739)
Browse files Browse the repository at this point in the history
## Description 

This PR adds a cap to how long a validator can wait before waking up. It
makes it slightly easier to reason about the max amount of time these
kinds of threads will stay alive.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Jul 20, 2024
1 parent e23823d commit c166079
Showing 1 changed file with 37 additions and 46 deletions.
83 changes: 37 additions & 46 deletions crates/sui-core/src/validator_tx_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry, Histogram, IntCounter,
Registry,
};
use std::cmp::min;
use std::ops::Add;
#[cfg(any(msim, test))]
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
Expand All @@ -32,11 +33,13 @@ const FINALIZATION_TIMEOUT: Duration = Duration::from_secs(60);
/// Incremental delay for validators to wake up to finalize a transaction.
const VALIDATOR_DELAY_INCREMENTS_SEC: u64 = 10;

const VALIDATOR_MAX_DELAY: Duration = Duration::from_secs(180);

struct ValidatorTxFinalizerMetrics {
num_finalization_attempts: IntCounter,
num_successful_finalizations: IntCounter,
finalization_latency: Histogram,
validator_tx_finalizer_attempt_position: Histogram,
validator_tx_finalizer_attempt_delay: Histogram,
#[cfg(any(msim, test))]
num_finalization_attempts_for_testing: AtomicU64,
#[cfg(test)]
Expand Down Expand Up @@ -65,10 +68,10 @@ impl ValidatorTxFinalizerMetrics {
registry,
)
.unwrap(),
validator_tx_finalizer_attempt_position: register_histogram_with_registry!(
"validator_tx_finalizer_attempt_position",
"Position of the validator in the committee that attempted to finalize the transaction after waking up",
vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
"validator_tx_finalizer_attempt_delay",
"Duration that a validator in the committee waited before attempting to finalize the transaction",
vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
registry,
)
.unwrap(),
Expand Down Expand Up @@ -182,7 +185,9 @@ where
tx: VerifiedSignedTransaction,
) -> anyhow::Result<bool> {
let tx_digest = *tx.digest();
let (position, tx_finalization_delay) = self.determine_finalization_delay(&tx_digest)?;
let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
return Ok(false);
};
let digests = [tx_digest];
select! {
_ = tokio::time::sleep(tx_finalization_delay) => {
Expand All @@ -193,9 +198,10 @@ where
return Ok(false);
}
}

self.metrics
.validator_tx_finalizer_attempt_position
.observe(position as f64);
.validator_tx_finalizer_attempt_delay
.observe(tx_finalization_delay.as_secs_f64());
let start_time = self.metrics.start_finalization();
debug!(
?tx_digest,
Expand All @@ -218,29 +224,21 @@ where
// Validators will wake up one by one with incremental delays to finalize the transaction.
// The hope is that the first few should be able to finalize the transaction,
// and the rest will see it already executed and do not need to do anything.
fn determine_finalization_delay(
&self,
tx_digest: &TransactionDigest,
) -> anyhow::Result<(usize, Duration)> {
let order = self
.agg
.load()
.committee
.shuffle_by_stake_from_tx_digest(tx_digest);
let position = order
.iter()
.position(|&name| name == self.name)
.ok_or_else(|| {
// Somehow the validator is not found in the committee. This should never happen.
// TODO: This is where we should report system invariant violation.
anyhow::anyhow!("Validator {} not found in the committee", self.name)
})?;

fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
let agg = self.agg.load();
let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
let Some(position) = order.iter().position(|&name| name == self.name) else {
// Somehow the validator is not found in the committee. This should never happen.
// TODO: This is where we should report system invariant violation.
error!("Validator {} not found in the committee", self.name);
return None;
};
// TODO: As an optimization, we could also limit the number of validators that would do this.
let extra_delay = position as u64 * VALIDATOR_DELAY_INCREMENTS_SEC;
let delay = self
.tx_finalization_delay
.add(Duration::from_secs(extra_delay));
Ok((position, delay))
Some(min(delay, VALIDATOR_MAX_DELAY))
}
}

Expand All @@ -250,10 +248,11 @@ mod tests {
use crate::authority::AuthorityState;
use crate::authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder};
use crate::authority_client::AuthorityAPI;
use crate::validator_tx_finalizer::ValidatorTxFinalizer;
use crate::validator_tx_finalizer::{ValidatorTxFinalizer, VALIDATOR_MAX_DELAY};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use prometheus::Registry;
use std::cmp::min;
use std::collections::BTreeMap;
use std::iter;
use std::net::SocketAddr;
Expand Down Expand Up @@ -577,13 +576,14 @@ mod tests {

#[tokio::test]
async fn test_validator_tx_finalizer_determine_finalization_delay() {
const COMMITTEE_SIZE: usize = 15;
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(10).unwrap())
.committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
.build();
let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
.build_network_clients();
let auth_agg = Arc::new(auth_agg);
let finalizers = (0..10)
let finalizers = (0..COMMITTEE_SIZE)
.map(|idx| {
ValidatorTxFinalizer::new(
Arc::new(ArcSwap::new(auth_agg.clone())),
Expand All @@ -599,26 +599,17 @@ mod tests {
.map(|finalizer| {
finalizer
.determine_finalization_delay(&tx_digest)
.map(|(pos, delay)| (pos, delay.as_secs()))
.map(|delay| delay.as_secs())
.unwrap()
})
.collect();
delays.sort();
assert_eq!(
delays,
vec![
(0, 60),
(1, 70),
(2, 80),
(3, 90),
(4, 100),
(5, 110),
(6, 120),
(7, 130),
(8, 140),
(9, 150)
]
)
for (idx, delay) in delays.iter().enumerate() {
assert_eq!(
*delay,
min(VALIDATOR_MAX_DELAY.as_secs(), 60 + idx as u64 * 10)
);
}
}
}

Expand Down

0 comments on commit c166079

Please sign in to comment.