From b0a5da6a1fcf8f58028ec77ec4701f14e951ae80 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Mon, 27 Jun 2022 11:24:42 -0700 Subject: [PATCH] Fix node sync follower (#2741) * Wait until we have processed all txes in a batch before updating follower store * NodeSyncDigestHandler::handle_digest returns final status of processing tx --- .../src/authority_active/gossip/mod.rs | 43 +++++++++++-------- .../src/authority_active/gossip/node_sync.rs | 27 +++++++++--- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/crates/sui-core/src/authority_active/gossip/mod.rs b/crates/sui-core/src/authority_active/gossip/mod.rs index 11c8e020b784d..89b114d3bb37e 100644 --- a/crates/sui-core/src/authority_active/gossip/mod.rs +++ b/crates/sui-core/src/authority_active/gossip/mod.rs @@ -46,7 +46,6 @@ struct Follower { aggregator: Arc>, } -const EACH_ITEM_DELAY_MS: u64 = 1_000; const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000; const REFRESH_FOLLOWER_PERIOD_SECS: u64 = 60; @@ -247,6 +246,7 @@ where #[async_trait] trait DigestHandler { + /// handle_digest async fn handle_digest(&self, follower: &Follower, digest: ExecutionDigests) -> SuiResult; } @@ -359,7 +359,8 @@ where ) -> SuiResult { // Global timeout, we do not exceed this time in this task. let mut timeout = Box::pin(tokio::time::sleep(duration)); - let mut queue = FuturesOrdered::new(); + let mut results = FuturesOrdered::new(); + let mut batch_seq_to_record = None; let req = BatchInfoRequest { start: self.max_seq, @@ -379,26 +380,23 @@ where match items { Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) )) => { let next_seq = signed_batch.batch.next_sequence_number; - self.follower_store.record_next_sequence(&self.peer_name, next_seq)?; - match self.max_seq { - Some(max_seq) => { - if next_seq < max_seq { - info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", next_seq, max_seq); - } + batch_seq_to_record = Some(next_seq); + if let Some(max_seq) = self.max_seq { + if next_seq < max_seq { + info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", next_seq, max_seq); } - None => {} } }, // Upon receiving a transaction digest, store it if it is not processed already. - Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))))) => { - if !self.state.database.effects_exists(&digest.transaction)? { - queue.push(async move { - tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await; - digest - }); - self.state.metrics.gossip_queued_count.inc(); - } + Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => { + let fut = handler.handle_digest(self, digest); + results.push(async move { + fut.await?; + Ok::(seq) + }); + + self.state.metrics.gossip_queued_count.inc(); }, // Return any errors. @@ -417,8 +415,15 @@ where }, } }, - digest = &mut queue.next() , if !queue.is_empty() => { - handler.handle_digest(self, digest.unwrap()).await?; + + result = &mut results.next() , if !results.is_empty() => { + let seq = result.unwrap()?; + if let Some(batch_seq) = batch_seq_to_record { + if seq >= batch_seq { + self.follower_store.record_next_sequence(&self.peer_name, batch_seq)?; + batch_seq_to_record = None; + } + } } }; } diff --git a/crates/sui-core/src/authority_active/gossip/node_sync.rs b/crates/sui-core/src/authority_active/gossip/node_sync.rs index ed5d92f4d9615..5eda219cefcae 100644 --- a/crates/sui-core/src/authority_active/gossip/node_sync.rs +++ b/crates/sui-core/src/authority_active/gossip/node_sync.rs @@ -20,10 +20,10 @@ use sui_types::{ use std::ops::Deref; use std::sync::{Arc, Mutex}; -use tokio::sync::{broadcast, mpsc, Semaphore}; +use tokio::sync::{broadcast, mpsc, oneshot, Semaphore}; use tokio::task::JoinHandle; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; const NODE_SYNC_QUEUE_LEN: usize = 500; @@ -149,6 +149,7 @@ where struct DigestsMessage { digests: ExecutionDigests, peer: AuthorityName, + tx: oneshot::Sender, } /// NodeSyncState is shared by any number of NodeSyncDigestHandler's, and receives DigestsMessage @@ -179,16 +180,27 @@ where // https://github.com/tokio-rs/tokio/discussions/2648 let limit = Arc::new(Semaphore::new(MAX_NODE_SYNC_CONCURRENCY)); - while let Some(DigestsMessage { digests, peer }) = receiver.recv().await { + while let Some(DigestsMessage { digests, peer, tx }) = receiver.recv().await { let permit = Arc::clone(&limit).acquire_owned().await; let state = state.clone(); tokio::spawn(async move { let _permit = permit; // hold semaphore permit until task completes // TODO: must send status back to follower so that it knows whether to advance // the watermark. - if let Err(error) = state.process_digest(peer, digests).await { + let res = state.process_digest(peer, digests).await; + if let Err(error) = &res { error!(?digests, ?peer, "process_digest failed: {}", error); } + if tx.send(res).is_err() { + // This will happen any time the follower times out and restarts, but + // that's ok - the follower won't have marked this digest as processed so it + // will be retried. + debug!( + ?digests, + ?peer, + "could not send process_digest response to caller", + ); + } }); } }) @@ -396,15 +408,20 @@ where A: AuthorityAPI + Send + Sync + 'static + Clone, { async fn handle_digest(&self, follower: &Follower, digests: ExecutionDigests) -> SuiResult { + let (tx, rx) = oneshot::channel(); self.sender .send(DigestsMessage { digests, peer: follower.peer_name, + tx, }) .await .map_err(|e| SuiError::GenericAuthorityError { error: e.to_string(), - }) + })?; + rx.await.map_err(|e| SuiError::GenericAuthorityError { + error: e.to_string(), + })? } }