Skip to content

Commit

Permalink
[Narwhal] simplify GC logic in Synchronizer (MystenLabs#13579)
Browse files Browse the repository at this point in the history
## Description 

GC logic in Synchronizer is a bit more complicated than necessary.
Refactor it to just accept all certificates at and below gc round + 1.

## Test Plan 

CI

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Sep 5, 2023
1 parent 2309e6c commit 4aa519a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 53 deletions.
89 changes: 46 additions & 43 deletions narwhal/primary/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Inner {
let digest = certificate.digest();

// Validate that certificates are accepted in causal order.
// Currently it is relatively cheap because of certificate store caching.
// This should be relatively cheap because of certificate store caching.
if certificate.round() > self.gc_round.load(Ordering::Acquire) + 1 {
let existence = self
.certificate_store
Expand Down Expand Up @@ -283,6 +283,12 @@ impl Inner {
}
Ok(result)
}

#[cfg(test)]
async fn get_suspended_stats(&self) -> (usize, usize) {
let state = self.state.lock().await;
(state.num_suspended(), state.num_missing())
}
}

/// `Synchronizer` helps this primary and other peers stay in sync with each other,
Expand Down Expand Up @@ -421,16 +427,16 @@ impl Synchronizer {
.certificates_aggregators
.lock()
.retain(|k, _| k > &gc_round);
// Accept certificates at gc round + 1, if there is any.
// Accept certificates at and below gc round + 1, if there is any.
let mut state = inner.state.lock().await;
for suspended_cert in state.run_gc(gc_round) {
let suspended_certs = state.accept_children(
while let Some(suspended_cert) = state.run_gc_once(gc_round) {
let suspended_children_certs = state.accept_children(
suspended_cert.certificate.round(),
suspended_cert.certificate.digest(),
);
// Iteration must be in causal order.
for suspended in
iter::once(suspended_cert).chain(suspended_certs.into_iter())
iter::once(suspended_cert).chain(suspended_children_certs.into_iter())
{
match inner.accept_suspended_certificate(&state, suspended).await {
Ok(()) => {}
Expand Down Expand Up @@ -1097,12 +1103,18 @@ impl Synchronizer {
/// certificate to `CertificateFetcher` which will trigger range fetching of missing
/// certificates.
#[cfg(test)]
pub async fn get_missing_parents(
pub(crate) async fn get_missing_parents(
&self,
certificate: &Certificate,
) -> DagResult<Vec<CertificateDigest>> {
self.inner.get_missing_parents(certificate).await
}

/// Returns the number of suspended certificates and missing certificates.
#[cfg(test)]
pub(crate) async fn get_suspended_stats(&self) -> (usize, usize) {
self.inner.get_suspended_stats().await
}
}

/// Holds information for a suspended certificate. The certificate can be accepted into the DAG
Expand Down Expand Up @@ -1235,49 +1247,40 @@ impl State {
to_accept
}

/// Runs GC on the suspended certificates, returns a list that can be accepted at gc round + 1.
/// It is caller's responsibility to check if some children of the returned certificates can
/// also be accepted.
fn run_gc(&mut self, gc_round: Round) -> Vec<SuspendedCertificate> {
// Remove suspended certificates below gc round, and collect digests for certificates just
// above the gc round.
let mut gc_certificates = Vec::new();
let mut certificates_above_gc_round = HashSet::new();
while let Some(((round, digest), children)) = self.missing.iter().next() {
if *round > gc_round {
break;
}
if *round == gc_round {
certificates_above_gc_round.extend(children.iter().cloned());
/// Runs GC on the suspended certificates.
/// Returns one certificate that can be GC'ed and accepted, or None.
///
/// It is the caller's responsibility to check if any children of the returned certificate
/// can also be accepted.
fn run_gc_once(&mut self, gc_round: Round) -> Option<SuspendedCertificate> {
// Accept suspended certificates at and below gc round + 1, because their parents will not
// be accepted into the DAG store anymore, in sanitize_certificate().
while let Some(((round, digest), _children)) = self.missing.first_key_value() {
// Note that gc_round is the highest round where certificates are gc'ed, and which will
// never be in a consensus commit.
if *round > gc_round + 1 {
return None;
}
// It is ok to notify waiters here (via Drop). The certificate will never and does
// not need to get into certificate store.
if let Some(suspended) = self.suspended.remove(digest) {
gc_certificates.push(suspended);
if let Some(mut suspended) = self.suspended.remove(digest) {
// Clear the missing_parents field to be consistent with other accepted
// certificates.
suspended.missing_parents.clear();
return Some(suspended);
}
self.missing.remove(&(*round, *digest));
}
// Notify waiters on GC'ed certificates.
for suspended in gc_certificates {
suspended
.notify
.notify()
.expect("Suspended certificate should be notified once.");
}
// All certificates at gc round + 1 can be accepted.
let mut to_accept = Vec::new();
for digest in certificates_above_gc_round {
let mut suspended_cert = self
.suspended
.remove(&digest)
.expect("Inconsistency found!");
suspended_cert.missing_parents.clear();
to_accept.push(suspended_cert);
// GC the missing children info even if there is no corresponding suspended certificate.
// NOTE: when there is a corresponding suspended certificate, the missing children info
// will be read and cleared in accept_children().
self.missing.pop_first();
}
to_accept
None
}

fn num_suspended(&self) -> usize {
self.suspended.len()
}

#[cfg(test)]
fn num_missing(&self) -> usize {
self.missing.len()
}
}
34 changes: 24 additions & 10 deletions narwhal/primary/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ async fn gc_suspended_certificates() {
&primary_channel_metrics,
));

// Make fake certificates.
// Make 5 rounds of fake certificates.
let committee: Committee = fixture.committee();
let genesis = Certificate::genesis(&committee)
.iter()
Expand Down Expand Up @@ -878,8 +878,14 @@ async fn gc_suspended_certificates() {
Err(e) => panic!("Unexpected error {e}"),
}
}
// Round 2~5 certificates are suspended.
// Round 1~4 certificates are missing and referenced as parents.
assert_eq!(
synchronizer.get_suspended_stats().await,
(NUM_AUTHORITIES * 4, NUM_AUTHORITIES * 4)
);

// Re-insertion of missing certificate as fetched certificates should be ok.
// Re-insertion of missing certificate as fetched certificates should be suspended too.
for cert in &certificates[NUM_AUTHORITIES * 2..NUM_AUTHORITIES * 4] {
match synchronizer
.try_accept_fetched_certificate(cert.clone())
Expand All @@ -892,22 +898,30 @@ async fn gc_suspended_certificates() {
Err(e) => panic!("Unexpected error {e}"),
}
}
assert_eq!(
synchronizer.get_suspended_stats().await,
(NUM_AUTHORITIES * 4, NUM_AUTHORITIES * 4)
);

// At commit round 8, round 3 becomes the GC round. Round 4 and 5 will be accepted.
// At commit round 8, round 3 becomes the GC round.
let _ = tx_consensus_round_updates.send(ConsensusRound::new(8, gc_round(8, GC_DEPTH)));

// Wait for all notifications to arrive.
accept.collect::<Vec<()>>().await;

// Compare received and expected certificates.
// Expected to receive:
// Round 2~4 certificates will be accepted because of GC.
// Round 5 certificates will be accepted because of no missing dependencies.
let expected_certificates: HashMap<_, _> = certificates[NUM_AUTHORITIES..]
.iter()
.map(|cert| (cert.digest(), cert.clone()))
.collect();
let mut received_certificates = HashMap::new();
for _ in 0..NUM_AUTHORITIES * 2 {
for _ in 0..expected_certificates.len() {
let cert = rx_new_certificates.try_recv().unwrap();
received_certificates.insert(cert.digest(), cert);
}
let expected_certificates: HashMap<_, _> = certificates[NUM_AUTHORITIES * 3..]
.iter()
.map(|cert| (cert.digest(), cert.clone()))
.collect();
assert_eq!(received_certificates, expected_certificates);
assert_eq!(expected_certificates, received_certificates);
// Suspended and missing certificates are cleared.
assert_eq!(synchronizer.get_suspended_stats().await, (0, 0));
}

0 comments on commit 4aa519a

Please sign in to comment.