Skip to content

Commit

Permalink
Execution driver uses NodeSyncState for certificate execution (Mysten…
Browse files Browse the repository at this point in the history
…Labs#3323)

* Use a single NodeSyncState owned by ActiveAuthority

* Refactor node_sync to support execution driver

* Execution driver uses NodeSync for cert execution

* Work around rust-lang/rust#99492

* Increase limit so tests pass
  • Loading branch information
mystenmark authored Jul 21, 2022
1 parent 5889a4a commit 6e78098
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 235 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ prometheus = "0.13.1"
arc-swap = "1.5.0"
tokio-retry = "0.3"
scopeguard = "1.1"
once_cell = "1.11.0"

sui-adapter = { path = "../sui-adapter" }
sui-framework = { path = "../sui-framework" }
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ pub struct AuthorityState {
///
/// Repeating valid commands should produce no changes and return no error.
impl AuthorityState {
pub fn is_fullnode(&self) -> bool {
!self.committee.load().authority_exists(&self.name)
}

/// Get a broadcast receiver for updates
pub fn subscribe_batch(&self) -> BroadcastReceiver {
self.batch_channels.subscribe()
Expand Down Expand Up @@ -442,6 +446,12 @@ impl AuthorityState {
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
if self.is_fullnode() {
return Err(SuiError::GenericStorageError(
"cannot execute cert without effects on fullnode".into(),
));
}

let digest = certificate.digest();
debug!(?digest, "handle_confirmation_transaction");

Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl<S: Eq + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// Get all stored certificate digests
pub fn get_pending_certificates(
pub fn get_pending_digests(
&self,
) -> SuiResult<Vec<(InternalSequenceNumber, TransactionDigest)>> {
Ok(self.pending_execution.iter().collect())
Expand All @@ -420,7 +420,7 @@ impl<S: Eq + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

// Empty the pending_execution table, and remove the certs from the certificates table.
pub fn remove_all_pending_certificates(&self) -> SuiResult {
let all_pending_tx = self.get_pending_certificates()?;
let all_pending_tx = self.get_pending_digests()?;
let mut batch = self.pending_execution.batch();
batch = batch.delete_batch(
&self.certificates,
Expand Down
34 changes: 28 additions & 6 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ use tokio::task::JoinHandle;
use tracing::info;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority::AuthorityState,
authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
node_sync::{NodeSyncHandle, NodeSyncState},
};
use once_cell::sync::OnceCell;
use tokio::time::Instant;

pub mod gossip;
Expand Down Expand Up @@ -107,7 +110,9 @@ impl AuthorityHealth {
pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
pub node_sync_store: Arc<NodeSyncStore>,
pub node_sync_state: Arc<NodeSyncState<A>>,
node_sync_handle: OnceCell<NodeSyncHandle>,

pub follower_store: Arc<FollowerStore>,
// The network interfaces to other authorities
pub net: ArcSwap<AuthorityAggregator<A>>,
Expand All @@ -124,6 +129,14 @@ impl<A> ActiveAuthority<A> {
) -> SuiResult<Self> {
let committee = authority.clone_committee();

let net = Arc::new(net);

let node_sync_state = Arc::new(NodeSyncState::new(
authority.clone(),
net.clone(),
node_sync_store,
));

Ok(ActiveAuthority {
health: Arc::new(Mutex::new(
committee
Expand All @@ -132,9 +145,10 @@ impl<A> ActiveAuthority<A> {
.collect(),
)),
state: authority,
node_sync_store,
node_sync_state,
node_sync_handle: OnceCell::new(),
follower_store,
net: ArcSwap::from(Arc::new(net)),
net: ArcSwap::from(net),
})
}

Expand Down Expand Up @@ -206,7 +220,8 @@ impl<A> Clone for ActiveAuthority<A> {
fn clone(&self) -> Self {
ActiveAuthority {
state: self.state.clone(),
node_sync_store: self.node_sync_store.clone(),
node_sync_state: self.node_sync_state.clone(),
node_sync_handle: self.node_sync_handle.clone(),
follower_store: self.follower_store.clone(),
net: ArcSwap::from(self.net.load().clone()),
health: self.health.clone(),
Expand All @@ -218,6 +233,13 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
fn node_sync_handle(&self) -> NodeSyncHandle {
let node_sync_state = self.node_sync_state.clone();
self.node_sync_handle
.get_or_init(|| NodeSyncHandle::new(node_sync_state))
.clone()
}

pub async fn sync_to_latest_checkpoint(&self, metrics: &CheckpointMetrics) -> SuiResult {
self.sync_to_latest_checkpoint_with_config(metrics, Default::default())
.await
Expand Down Expand Up @@ -277,7 +299,7 @@ where
let target_num_tasks = committee.num_members();

tokio::task::spawn(async move {
node_sync_process(&self, target_num_tasks, self.node_sync_store.clone()).await;
node_sync_process(&self, target_num_tasks).await;
})
}

Expand Down
47 changes: 22 additions & 25 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ use sui_types::{
};
use tokio::time::Instant;

use crate::epoch::reconfiguration::Reconfigurable;
use futures::stream::StreamExt;

use crate::{
authority::AuthorityState,
authority_aggregator::{AuthorityAggregator, ReduceOutput},
authority_client::AuthorityAPI,
checkpoints::CheckpointStore,
node_sync::NodeSyncState,
epoch::reconfiguration::Reconfigurable,
};

use sui_storage::node_sync_store::NodeSyncStore;
use sui_types::committee::{Committee, StakeUnit};
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -649,13 +648,25 @@ where
let (past, contents) =
get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?;

sync_checkpoint_certs(
state.clone(),
active_authority.node_sync_store.clone(),
net.clone(),
&contents,
)
.await?;
let errors = active_authority
.node_sync_handle()
.sync_checkpoint(&contents)
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(?seq, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db.lock().process_new_checkpoint_certificate(
&past,
Expand All @@ -669,20 +680,6 @@ where
Ok(())
}

/// Fetch and execute all certificates in the checkpoint.
async fn sync_checkpoint_certs<A>(
state: Arc<AuthorityState>,
node_sync_store: Arc<NodeSyncStore>,
net: Arc<AuthorityAggregator<A>>,
contents: &CheckpointContents,
) -> SuiResult
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let sync = NodeSyncState::new(state, net, node_sync_store);
sync.sync_checkpoint(contents).await
}

pub async fn get_one_checkpoint_with_contents<A>(
net: Arc<AuthorityAggregator<A>>,
sequence_number: CheckpointSequenceNumber,
Expand Down
66 changes: 17 additions & 49 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
use std::sync::Arc;
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::CertifiedTransaction};
use tracing::{debug, info};
use typed_store::Map;

use crate::authority::AuthorityStore;
use crate::authority_client::AuthorityAPI;

use super::{gossip::LocalCertificateHandler, ActiveAuthority};
use futures::{stream, StreamExt};

use super::ActiveAuthority;

#[cfg(test)]
pub(crate) mod tests;
Expand Down Expand Up @@ -74,54 +75,21 @@ async fn execute_pending<A>(active_authority: &ActiveAuthority<A>) -> SuiResult<
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let _committee = active_authority.state.committee.load().clone();
let net = active_authority.net.load().clone();

// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_certificates()?;

// Get all the actual certificates mapping to these pending transactions
let certs = active_authority
.state
.database
.certificates
.multi_get(pending_transactions.iter().map(|(_, d)| *d))?;

// Zip seq, digest with certs. Note the cert must exist in the DB
let cert_seq: Vec<_> = pending_transactions
.iter()
.zip(certs.iter())
.map(|((i, d), c)| (i, d, c.as_ref().expect("certificate must exist")))
.collect();

let local_handler = LocalCertificateHandler {
state: active_authority.state.clone(),
};

// TODO: implement properly efficient execution for the block of transactions.
let mut executed = vec![];
for (i, d, c) in cert_seq {
// Only execute if not already executed.
if active_authority.state.database.effects_exists(d)? {
executed.push(*i);
continue;
}

debug!(digest=?d, "Pending execution for certificate.");

// Sync and Execute with local authority state
net.sync_certificate_to_authority_with_timeout_inner(
c.clone(),
active_authority.state.name,
&local_handler,
tokio::time::Duration::from_secs(10),
10,
)
.await?;

// Remove from the execution list
executed.push(*i);
}
let pending_transactions = active_authority.state.database.get_pending_digests()?;

let sync_handle = active_authority.node_sync_handle();

// Send them for execution
let executed = sync_handle
// map to extract digest
.handle_execution_request(pending_transactions.iter().map(|(_, digest)| *digest))
// zip results back together with seq
.zip(stream::iter(pending_transactions.iter()))
// filter out errors
.filter_map(|(result, (seq, _))| async move { result.ok().map(|_| seq) })
.collect()
.await;

// Now update the pending store.
active_authority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn pending_exec_storage_notify() {
// get back the certificates
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(num_certs, certs_back.len());
}
Expand Down Expand Up @@ -166,7 +166,7 @@ async fn pending_exec_full() {
.expect("Storage is ok");
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(num_certs, certs_back.len());

Expand All @@ -177,7 +177,7 @@ async fn pending_exec_full() {
// get back the certificates
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(0, certs_back.len());
}
14 changes: 4 additions & 10 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
authority::AuthorityState,
authority_aggregator::{AuthorityAggregator, CertificateHandler},
authority_client::AuthorityAPI,
node_sync::NodeSyncDigestHandler,
safe_client::SafeClient,
};
use async_trait::async_trait;
Expand All @@ -20,7 +19,7 @@ use std::{
sync::Arc,
time::Duration,
};
use sui_storage::{follower_store::FollowerStore, node_sync_store::NodeSyncStore};
use sui_storage::follower_store::FollowerStore;
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::{AuthorityName, ExecutionDigests},
Expand Down Expand Up @@ -76,19 +75,14 @@ where
.await;
}

pub async fn node_sync_process<A>(
active_authority: &ActiveAuthority<A>,
degree: usize,
node_sync_store: Arc<NodeSyncStore>,
) where
pub async fn node_sync_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let state = active_authority.state.clone();
let aggregator = active_authority.net.load().clone();
follower_process(
active_authority,
degree,
NodeSyncDigestHandler::new(state, aggregator, node_sync_store),
active_authority.node_sync_handle(),
GossipType::Full,
)
.await;
Expand Down
9 changes: 4 additions & 5 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,8 @@ where

pub async fn handle_transaction_and_effects_info_request(
&self,
digests: &ExecutionDigests,
digest: &TransactionDigest,
effects_digest: Option<&TransactionEffectsDigest>,
// authorities known to have the effects we are requesting.
authorities: Option<&BTreeSet<AuthorityName>>,
timeout_total: Option<Duration>,
Expand All @@ -1636,7 +1637,7 @@ where
|authority, client| {
Box::pin(async move {
let resp = client
.handle_transaction_and_effects_info_request(digests)
.handle_transaction_and_effects_info_request(digest, effects_digest)
.await?;

match (resp.certified_transaction, resp.signed_effects) {
Expand All @@ -1647,9 +1648,7 @@ where
// cert and effects, so if they now say they don't, they're byzantine.
Err(SuiError::ByzantineAuthoritySuspicion { authority })
} else {
Err(SuiError::TransactionNotFound {
digest: digests.transaction,
})
Err(SuiError::TransactionNotFound { digest: *digest })
}
}
}
Expand Down
Loading

0 comments on commit 6e78098

Please sign in to comment.