Skip to content

Commit

Permalink
Onboard quorum driver to fullnode 1/n (MystenLabs#3778)
Browse files Browse the repository at this point in the history
* commit

* only enable quorum drive on fullnode
  • Loading branch information
longbowlu authored Aug 5, 2022
1 parent 6c009b9 commit bfb41f2
Show file tree
Hide file tree
Showing 15 changed files with 422 additions and 111 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::gateway_state::GatewayState;
use sui_node::metrics;
use sui_node::SuiNode;
use sui_sdk::crypto::Keystore;
use sui_types::base_types::ObjectID;
use sui_types::base_types::SuiAddress;
use tokio::sync::OnceCell;
Expand Down
46 changes: 37 additions & 9 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@ use crate::authority::AuthorityState;
use async_trait::async_trait;
use futures::{stream::BoxStream, TryStreamExt};
use multiaddr::Multiaddr;
use mysten_network::config::Config;
use narwhal_crypto::traits::ToFromBytes;
use std::collections::BTreeMap;
use std::sync::Arc;

use sui_config::genesis::Genesis;
use sui_network::{api::ValidatorClient, tonic};
use sui_types::{error::SuiError, messages::*};

use sui_types::crypto::AuthorityPublicKeyBytes;
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::{error::SuiError, messages::*};

#[cfg(test)]
use sui_types::{
base_types::ObjectID,
committee::Committee,
crypto::{AuthorityKeyPair, AuthorityPublicKeyBytes},
object::Object,
base_types::ObjectID, committee::Committee, crypto::AuthorityKeyPair, object::Object,
};

use crate::epoch::reconfiguration::Reconfigurable;
#[cfg(test)]
use sui_config::genesis::Genesis;
use sui_network::tonic::transport::Channel;

#[async_trait]
Expand Down Expand Up @@ -207,6 +206,35 @@ impl AuthorityAPI for NetworkAuthorityClient {
}
}

pub fn make_network_authority_client_sets_from_system_state(
sui_system_state: &SuiSystemState,
network_config: &Config,
) -> anyhow::Result<BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>> {
let mut authority_clients = BTreeMap::new();
for validator in &sui_system_state.validators.active_validators {
let address = Multiaddr::try_from(validator.metadata.net_address.clone())?;
let channel = network_config.connect_lazy(&address)?;
let client = NetworkAuthorityClient::new(channel);
let name: &[u8] = &validator.metadata.name;
let public_key_bytes = AuthorityPublicKeyBytes::from_bytes(name)?;
authority_clients.insert(public_key_bytes, client);
}
Ok(authority_clients)
}

pub fn make_network_authority_client_sets_from_genesis(
genesis: &Genesis,
network_config: &Config,
) -> anyhow::Result<BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>> {
let mut authority_clients = BTreeMap::new();
for validator in genesis.validator_set() {
let channel = network_config.connect_lazy(validator.network_address())?;
let client = NetworkAuthorityClient::new(channel);
authority_clients.insert(validator.public_key(), client);
}
Ok(authority_clients)
}

#[derive(Clone, Copy, Default)]
pub struct LocalAuthorityClientFaultConfig {
pub fail_before_handle_transaction: bool,
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-json-rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,12 @@ pub struct SuiObjectInfo {
pub previous_transaction: TransactionDigest,
}

impl SuiObjectInfo {
pub fn to_object_ref(&self) -> ObjectRef {
(self.object_id, self.version, self.digest)
}
}

impl From<ObjectInfo> for SuiObjectInfo {
fn from(info: ObjectInfo) -> Self {
Self {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sui-network = { path = "../sui-network" }
sui-json-rpc = { path = "../sui-json-rpc" }
sui-telemetry = { path = "../sui-telemetry" }
sui-types = { path = "../sui-types" }
sui-quorum-driver = { path = "../sui-quorum-driver" }

telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" }
mysten-network = { git = "https://github.com/MystenLabs/mysten-infra", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" }
Expand Down
98 changes: 52 additions & 46 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@

use anyhow::Result;
use futures::TryFutureExt;
use multiaddr::Multiaddr;
use parking_lot::Mutex;
use prometheus::Registry;
use std::option::Option::None;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tracing::info;

use std::{sync::Arc, time::Duration};
use sui_config::NodeConfig;
use sui_core::authority_active::checkpoint_driver::CheckpointMetrics;
use sui_core::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
use sui_core::authority_server::ValidatorService;
use sui_core::{
authority::{AuthorityState, AuthorityStore},
authority_active::ActiveAuthority,
authority_client::NetworkAuthorityClient,
authority_client::{
make_network_authority_client_sets_from_genesis,
make_network_authority_client_sets_from_system_state, NetworkAuthorityClient,
},
checkpoints::CheckpointStore,
};
use sui_json_rpc::bcs_api::BcsApiImpl;
use sui_network::api::ValidatorServer;
use sui_quorum_driver::{QuorumDriver, QuorumDriverHandler};
use sui_storage::{
event_store::{EventStoreType, SqlEventStore},
follower_store::FollowerStore,
node_sync_store::NodeSyncStore,
IndexStore,
};
use sui_types::crypto::ToFromBytes;
use sui_types::messages::{CertifiedTransaction, CertifiedTransactionEffects};
use tracing::info;

use sui_json_rpc::event_api::EventReadApiImpl;
use sui_json_rpc::event_api::EventStreamingApiImpl;
Expand All @@ -37,7 +39,7 @@ use sui_json_rpc::read_api::FullNodeApi;
use sui_json_rpc::read_api::ReadApi;
use sui_json_rpc::ws_server::WsServerHandle;
use sui_json_rpc::JsonRpcServerBuilder;
use sui_types::crypto::{AuthorityPublicKeyBytes, KeypairTraits};
use sui_types::crypto::KeypairTraits;
use typed_store::traits::DBMapTableUtil;

pub mod admin;
Expand All @@ -54,6 +56,7 @@ pub struct SuiNode {
_checkpoint_process_handle: Option<tokio::task::JoinHandle<()>>,
state: Arc<AuthorityState>,
active: Option<Arc<ActiveAuthority<NetworkAuthorityClient>>>,
quorum_driver_handler: Option<QuorumDriverHandler<NetworkAuthorityClient>>,
}

impl SuiNode {
Expand Down Expand Up @@ -123,53 +126,39 @@ impl SuiNode {
.await,
);

let mut net_config = mysten_network::config::Config::new();
net_config.connect_timeout = Some(Duration::from_secs(5));
net_config.request_timeout = Some(Duration::from_secs(5));
net_config.http2_keepalive_interval = Some(Duration::from_secs(5));

let sui_system_state = state.get_sui_system_state_object().await?;

let authority_clients = if config.enable_reconfig && sui_system_state.epoch > 0 {
make_network_authority_client_sets_from_system_state(&sui_system_state, &net_config)
} else {
make_network_authority_client_sets_from_genesis(genesis, &net_config)
}?;
let net = AuthorityAggregator::new(
state.clone_committee(),
authority_clients,
AuthAggMetrics::new(&prometheus_registry),
);

// TODO: maybe have a config enum that takes care of this for us.
let is_validator = config.consensus_config().is_some();
let is_node = !is_validator;
let is_full_node = !is_validator;

let should_start_follower = is_node || config.enable_gossip;
let quorum_driver_handler = if is_full_node {
Some(QuorumDriverHandler::new(net.clone()))
} else {
None
};
let should_start_follower = is_full_node || config.enable_gossip;

let mut active = None;

let (gossip_handle, execute_driver_handle, checkpoint_process_handle) =
if should_start_follower {
let mut net_config = mysten_network::config::Config::new();
net_config.connect_timeout = Some(Duration::from_secs(5));
net_config.request_timeout = Some(Duration::from_secs(5));
net_config.http2_keepalive_interval = Some(Duration::from_secs(5));

let mut authority_clients = BTreeMap::new();

let sui_system_state = state.get_sui_system_state_object().await?;

if config.enable_reconfig && sui_system_state.epoch > 0 {
// Create NetworkAuthorityClient with this epoch's network information
let epoch_validators = &sui_system_state.validators.active_validators;

for validator in epoch_validators {
let address = Multiaddr::try_from(validator.metadata.net_address.clone())?;
let channel = net_config.connect_lazy(&address)?;
let client = NetworkAuthorityClient::new(channel);
let name: &[u8] = &validator.metadata.name;
let public_key_bytes = AuthorityPublicKeyBytes::from_bytes(name)?;
authority_clients.insert(public_key_bytes, client);
}
} else {
// Create NetworkAuthorityClient with the genesis set
for validator in genesis.validator_set() {
let channel = net_config
.connect_lazy(validator.network_address())
.unwrap();
let client = NetworkAuthorityClient::new(channel);
authority_clients.insert(validator.public_key(), client);
}
}
let net = AuthorityAggregator::new(
state.clone_committee(),
authority_clients,
AuthAggMetrics::new(&prometheus_registry),
);

let pending_store = Arc::new(NodeSyncStore::open_tables_read_write(
config.db_path().join("node_sync_db"),
None,
Expand Down Expand Up @@ -270,6 +259,7 @@ impl SuiNode {
_post_processing_subsystem_handle: post_processing_subsystem_handle,
state,
active,
quorum_driver_handler,
};

info!("SuiNode started!");
Expand All @@ -285,6 +275,22 @@ impl SuiNode {
self.active.clone()
}

pub fn quorum_driver(&self) -> Option<Arc<QuorumDriver<NetworkAuthorityClient>>> {
self.quorum_driver_handler
.as_ref()
.map(|qdh| qdh.clone_quorum_driver())
}

pub fn subscribe_to_quorum_driver_effects(
&self,
) -> Result<tokio::sync::broadcast::Receiver<(CertifiedTransaction, CertifiedTransactionEffects)>>
{
self.quorum_driver_handler
.as_ref()
.map(|qdh| qdh.subscribe())
.ok_or_else(|| anyhow::anyhow!("Quorum Driver is not enabled in this node."))
}

//TODO watch/wait on all the components
pub async fn wait(self) -> Result<()> {
self.grpc_server.await??;
Expand Down
21 changes: 15 additions & 6 deletions crates/sui-quorum-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::sync::Arc;

use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::log::{error, warn};
use tracing::Instrument;
use tracing::{debug, error, warn};

use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::AuthorityAPI;
Expand Down Expand Up @@ -191,27 +191,36 @@ where
if let Some(task) = task_receiver.recv().await {
match task {
QuorumTask::ProcessTransaction(transaction) => {
let tx_digest = *transaction.digest();
// TODO: We entered here because callers do not want to wait for a
// transaction to finish execution. When this failed, we do not have a
// way to notify the caller. In the future, we may want to maintain
// some data structure for callers to come back and query the status
// of a transaction latter.
// of a transaction later.
match quorum_driver.process_transaction(transaction).await {
Ok(cert) => {
debug!(?tx_digest, "Transaction processing succeeded");
if let Err(err) = quorum_driver.process_certificate(cert).await {
warn!("Certificate processing failed: {:?}", err);
warn!(?tx_digest, "Certificate processing failed: {:?}", err);
}
debug!(?tx_digest, "Certificate processing succeeded");
}
Err(err) => {
warn!("Transaction processing failed: {:?}", err);
warn!(?tx_digest, "Transaction processing failed: {:?}", err);
}
}
}
QuorumTask::ProcessCertificate(certificate) => {
let tx_digest = *certificate.digest();
// TODO: Similar to ProcessTransaction, we may want to allow callers to
// query the status.
if let Err(err) = quorum_driver.process_certificate(certificate).await {
warn!("Certificate processing failed: {:?}", err);
match quorum_driver.process_certificate(certificate).await {
Err(err) => {
warn!("Certificate processing failed: {:?}", err);
}
Ok(_) => {
debug!(?tx_digest, "Certificate processing succeeded");
}
}
}
QuorumTask::UpdateCommittee(new_validators) => {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde_json = "1.0.80"
futures-core = "0.3.21"
futures = "0.3.21"
signature = "1.5.0"
rand = "0.7.3"

sui-json-rpc = { path = "../sui-json-rpc" }
sui-json-rpc-types= { path = "../sui-json-rpc-types" }
Expand Down
Loading

0 comments on commit bfb41f2

Please sign in to comment.