diff --git a/crates/sui-core/src/authority_active.rs b/crates/sui-core/src/authority_active.rs
index 6cca383b370ca..24c43599e0e1d 100644
--- a/crates/sui-core/src/authority_active.rs
+++ b/crates/sui-core/src/authority_active.rs
@@ -32,6 +32,7 @@
use arc_swap::ArcSwap;
use std::{
collections::{BTreeMap, HashMap},
+ ops::Deref,
sync::Arc,
time::Duration,
};
@@ -48,7 +49,7 @@ use crate::{
use tokio::time::Instant;
pub mod gossip;
-use gossip::gossip_process;
+use gossip::{gossip_process, node_sync_process};
pub mod checkpoint_driver;
use checkpoint_driver::checkpoint_process;
@@ -233,9 +234,26 @@ where
pub async fn spawn_gossip_process(self, degree: usize) -> JoinHandle<()> {
let active = Arc::new(self);
- let gossip_locals = active;
+ // Number of tasks at most "degree" and no more than committee - 1
+ // (validators do not follow themselves for gossip)
+ let committee = active.state.committee.load().deref().clone();
+ let target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);
+
+ tokio::task::spawn(async move {
+ gossip_process(&active, target_num_tasks).await;
+ })
+ }
+
+ pub async fn spawn_node_sync_process(self) -> JoinHandle<()> {
+ let active = Arc::new(self);
+ let committee = active.state.committee.load().deref().clone();
+ // nodes follow all validators to ensure they can eventually determine
+ // finality of certs. We need to follow 2f+1 _honest_ validators to
+ // eventually find finality, therefore we must follow all validators.
+ let target_num_tasks = committee.voting_rights.len();
+
tokio::task::spawn(async move {
- gossip_process(&gossip_locals, degree).await;
+ node_sync_process(&active, target_num_tasks).await;
})
}
}
diff --git a/crates/sui-core/src/authority_active/gossip/mod.rs b/crates/sui-core/src/authority_active/gossip/mod.rs
index 38bc7230dc593..efe589c6dc0be 100644
--- a/crates/sui-core/src/authority_active/gossip/mod.rs
+++ b/crates/sui-core/src/authority_active/gossip/mod.rs
@@ -8,15 +8,17 @@ use crate::{
safe_client::SafeClient,
};
use async_trait::async_trait;
-use futures::stream::FuturesOrdered;
-use futures::{stream::FuturesUnordered, StreamExt};
+use futures::{
+ stream::{FuturesOrdered, FuturesUnordered},
+ StreamExt,
+};
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_storage::follower_store::FollowerStore;
use sui_types::committee::StakeUnit;
use sui_types::{
- base_types::AuthorityName,
+ base_types::{AuthorityName, ExecutionDigests},
batch::{TxSequenceNumber, UpdateItem},
error::{SuiError, SuiResult},
messages::{
@@ -32,7 +34,7 @@ mod configurable_batch_action_client;
#[cfg(test)]
pub(crate) mod tests;
-struct PeerGossip {
+struct Follower {
peer_name: AuthorityName,
client: SafeClient,
state: Arc,
@@ -50,6 +52,24 @@ use super::ActiveAuthority;
pub async fn gossip_process(active_authority: &ActiveAuthority, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
+{
+ follower_process(active_authority, degree, GossipDigestHandler::new()).await;
+}
+
+pub async fn node_sync_process(active_authority: &ActiveAuthority, degree: usize)
+where
+ A: AuthorityAPI + Send + Sync + 'static + Clone,
+{
+ // TODO: special case follower for node sync.
+ follower_process(active_authority, degree, GossipDigestHandler::new()).await;
+}
+
+async fn follower_process + Copy>(
+ active_authority: &ActiveAuthority,
+ degree: usize,
+ handler: Handler,
+) where
+ A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Make a clone of the active authority and committee, and keep using it until epoch changes.
let mut local_active = Arc::new(active_authority.clone());
@@ -109,11 +129,14 @@ where
peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
gossip_tasks.push(async move {
- let peer_gossip = PeerGossip::new(name, &local_active_ref_copy);
+ let follower = Follower::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
- peer_gossip
- .start(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
+ follower
+ .start(
+ Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15),
+ handler,
+ )
.await
});
k += 1;
@@ -208,11 +231,77 @@ where
})
}
-impl PeerGossip
+#[async_trait]
+trait DigestHandler {
+ async fn handle_digest(&self, follower: &Follower, digest: ExecutionDigests) -> SuiResult;
+}
+
+#[derive(Clone, Copy)]
+struct GossipDigestHandler {}
+
+impl GossipDigestHandler {
+ fn new() -> Self {
+ Self {}
+ }
+
+ async fn process_response(
+ follower: &Follower,
+ response: TransactionInfoResponse,
+ ) -> Result<(), SuiError>
+ where
+ A: AuthorityAPI + Send + Sync + 'static + Clone,
+ {
+ if let Some(certificate) = response.certified_transaction {
+ // Process the certificate from one authority to ourselves
+ follower
+ .aggregator
+ .sync_authority_source_to_destination(
+ ConfirmationTransaction { certificate },
+ follower.peer_name,
+ LocalConfirmationTransactionHandler {
+ state: follower.state.clone(),
+ },
+ )
+ .await?;
+ follower.state.metrics.gossip_sync_count.inc();
+ Ok(())
+ } else {
+ // The authority did not return the certificate, despite returning info
+ // But it should know the certificate!
+ Err(SuiError::ByzantineAuthoritySuspicion {
+ authority: follower.peer_name,
+ })
+ }
+ }
+}
+
+#[async_trait]
+impl DigestHandler for GossipDigestHandler
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
- pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority) -> PeerGossip {
+ async fn handle_digest(&self, follower: &Follower, digest: ExecutionDigests) -> SuiResult {
+ if !follower
+ .state
+ .database
+ .effects_exists(&digest.transaction)?
+ {
+ // Download the certificate
+ let response = follower
+ .client
+ .handle_transaction_info_request(TransactionInfoRequest::from(digest.transaction))
+ .await?;
+ Self::process_response(follower, response).await?;
+ }
+ Ok(())
+ }
+}
+
+impl Follower
+where
+ A: AuthorityAPI + Send + Sync + 'static + Clone,
+{
+ pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority) -> Self {
// TODO: for validator gossip, we should always use None as the start_seq, but we should
// consult the start_seq we retrieved from the db to make sure that the peer is giving
// us new txes.
@@ -239,13 +328,21 @@ where
}
}
- pub async fn start(mut self, duration: Duration) -> (AuthorityName, Result<(), SuiError>) {
+ pub async fn start>(
+ self,
+ duration: Duration,
+ handler: Handler,
+ ) -> (AuthorityName, Result<(), SuiError>) {
let peer_name = self.peer_name;
- let result = self.peer_gossip_for_duration(duration).await;
+ let result = self.follow_peer_for_duration(duration, handler).await;
(peer_name, result)
}
- async fn peer_gossip_for_duration(&mut self, duration: Duration) -> Result<(), SuiError> {
+ async fn follow_peer_for_duration<'a, Handler: DigestHandler>(
+ &self,
+ duration: Duration,
+ handler: Handler,
+ ) -> SuiResult {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();
@@ -307,38 +404,10 @@ where
}
},
digest = &mut queue.next() , if !queue.is_empty() => {
- let digest = digest.unwrap();
- if !self.state.database.effects_exists(&digest.transaction)? {
- // Download the certificate
- let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest.transaction)).await?;
- self.process_response(response).await?;
- }
+ handler.handle_digest(self, digest.unwrap()).await?;
}
};
}
Ok(())
}
-
- async fn process_response(&self, response: TransactionInfoResponse) -> Result<(), SuiError> {
- if let Some(certificate) = response.certified_transaction {
- // Process the certificate from one authority to ourselves
- self.aggregator
- .sync_authority_source_to_destination(
- ConfirmationTransaction { certificate },
- self.peer_name,
- LocalConfirmationTransactionHandler {
- state: self.state.clone(),
- },
- )
- .await?;
- self.state.metrics.gossip_sync_count.inc();
- Ok(())
- } else {
- // The authority did not return the certificate, despite returning info
- // But it should know the certificate!
- Err(SuiError::ByzantineAuthoritySuspicion {
- authority: self.peer_name,
- })
- }
- }
}