Skip to content

Commit

Permalink
Turn on checkpoint (MystenLabs#3207)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jul 14, 2022
1 parent 8b1425a commit 9a05397
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 125 deletions.
16 changes: 7 additions & 9 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use sui_storage::{follower_store::FollowerStore, node_sync_store::NodeSyncStore}
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::error;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
Expand Down Expand Up @@ -199,7 +198,10 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_checkpoint_process(self: Arc<Self>, metrics: CheckpointMetrics) {
pub async fn spawn_checkpoint_process(
self: Arc<Self>,
metrics: CheckpointMetrics,
) -> JoinHandle<()> {
self.spawn_checkpoint_process_with_config(CheckpointProcessControl::default(), metrics)
.await
}
Expand All @@ -209,15 +211,11 @@ where
self: Arc<Self>,
checkpoint_process_control: CheckpointProcessControl,
metrics: CheckpointMetrics,
) {
) -> JoinHandle<()> {
// Spawn task to take care of checkpointing
let _checkpoint_join = tokio::task::spawn(async move {
tokio::task::spawn(async move {
checkpoint_process(&self, &checkpoint_process_control, metrics).await;
});

if let Err(err) = _checkpoint_join.await {
error!("Join checkpoint task end error: {:?}", err);
}
})
}

/// Spawn gossip process
Expand Down
22 changes: 10 additions & 12 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,16 @@ async fn checkpoint_active_flow_happy_path() {
// Start active part of authority.
for inner_state in authorities.clone() {
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
inner_agg,
)
.unwrap(),
);
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await
});
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
inner_agg,
)
.unwrap(),
);
let _active_handle = active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
}

let sender_aggregator = aggregator.clone();
Expand Down
22 changes: 10 additions & 12 deletions crates/sui-core/src/authority_active/execution_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,16 @@ async fn pending_exec_storage_notify() {
// Start active part of authority.
for inner_state in authorities.clone() {
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
inner_agg,
)
.unwrap(),
);
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await
});
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
inner_agg,
)
.unwrap(),
);
let _active_handle = active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
}

let sender_aggregator = aggregator.clone();
Expand Down
138 changes: 80 additions & 58 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tracing::info;

use sui_config::NodeConfig;
use sui_core::authority_active::checkpoint_driver::CheckpointMetrics;
use sui_core::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
use sui_core::authority_server::ValidatorService;
use sui_core::{
Expand Down Expand Up @@ -41,11 +42,13 @@ pub mod metrics;

pub struct SuiNode {
grpc_server: tokio::task::JoinHandle<Result<()>>,
_json_rpc_service: Option<jsonrpsee::http_server::HttpServerHandle>,
_ws_subscription_service: Option<jsonrpsee::ws_server::WsServerHandle>,
_json_rpc_service: Option<HttpServerHandle>,
_ws_subscription_service: Option<WsServerHandle>,
_batch_subsystem_handle: tokio::task::JoinHandle<Result<()>>,
_post_processing_subsystem_handle: Option<tokio::task::JoinHandle<Result<()>>>,
_gossip_handle: Option<tokio::task::JoinHandle<()>>,
_execute_driver_handle: Option<tokio::task::JoinHandle<()>>,
_checkpoint_process_handle: Option<tokio::task::JoinHandle<()>>,
state: Arc<AuthorityState>,
}

Expand Down Expand Up @@ -122,65 +125,82 @@ impl SuiNode {

let should_start_follower = is_node || config.enable_gossip;

let gossip_handle = if should_start_follower {
let mut net_config = mysten_network::config::Config::new();
net_config.connect_timeout = Some(Duration::from_secs(5));
net_config.request_timeout = Some(Duration::from_secs(5));
net_config.http2_keepalive_interval = Some(Duration::from_secs(5));

let mut authority_clients = BTreeMap::new();

let sui_system_state = state.get_sui_system_state_object().await?;

if config.enable_reconfig && sui_system_state.epoch > 0 {
// Create NetworkAuthorityClient with this epoch's network information
let epoch_validators = &sui_system_state.validators.active_validators;

for validator in epoch_validators {
let net_addr: &[u8] = &validator.metadata.net_address.clone();
let str_addr = std::str::from_utf8(net_addr)?;
let address: Multiaddr = str_addr.parse()?;
//let address = Multiaddr::try_from(net_addr)?;
let channel = net_config.connect_lazy(&address)?;
let client = NetworkAuthorityClient::new(channel);
let name: &[u8] = &validator.metadata.name;
let public_key_bytes = PublicKeyBytes::try_from(name)?;
authority_clients.insert(public_key_bytes, client);
let (gossip_handle, execute_driver_handle, checkpoint_process_handle) =
if should_start_follower {
let mut net_config = mysten_network::config::Config::new();
net_config.connect_timeout = Some(Duration::from_secs(5));
net_config.request_timeout = Some(Duration::from_secs(5));
net_config.http2_keepalive_interval = Some(Duration::from_secs(5));

let mut authority_clients = BTreeMap::new();

let sui_system_state = state.get_sui_system_state_object().await?;

if config.enable_reconfig && sui_system_state.epoch > 0 {
// Create NetworkAuthorityClient with this epoch's network information
let epoch_validators = &sui_system_state.validators.active_validators;

for validator in epoch_validators {
let net_addr: &[u8] = &validator.metadata.net_address.clone();
let str_addr = std::str::from_utf8(net_addr)?;
let address: Multiaddr = str_addr.parse()?;
//let address = Multiaddr::try_from(net_addr)?;
let channel = net_config.connect_lazy(&address)?;
let client = NetworkAuthorityClient::new(channel);
let name: &[u8] = &validator.metadata.name;
let public_key_bytes = PublicKeyBytes::try_from(name)?;
authority_clients.insert(public_key_bytes, client);
}
} else {
// Create NetworkAuthorityClient with the genesis set
for validator in genesis.validator_set() {
let channel = net_config
.connect_lazy(validator.network_address())
.unwrap();
let client = NetworkAuthorityClient::new(channel);
authority_clients.insert(validator.public_key(), client);
}
}
} else {
// Create NetworkAuthorityClient with the genesis set
for validator in genesis.validator_set() {
let channel = net_config
.connect_lazy(validator.network_address())
.unwrap();
let client = NetworkAuthorityClient::new(channel);
authority_clients.insert(validator.public_key(), client);
let net = AuthorityAggregator::new(
state.clone_committee(),
authority_clients,
AuthAggMetrics::new(&prometheus_registry),
);

let active_authority =
Arc::new(ActiveAuthority::new(state.clone(), follower_store, net)?);

if is_validator {
// TODO: get degree from config file.
let degree = 4;
(
Some(active_authority.clone().spawn_gossip_process(degree).await),
Some(active_authority.clone().spawn_execute_process().await),
Some(
active_authority
.spawn_checkpoint_process(CheckpointMetrics::new(
&prometheus_registry,
))
.await,
),
)
} else {
let pending_store =
Arc::new(NodeSyncStore::open(config.db_path().join("node_sync_db"))?);

(
Some(
active_authority
.spawn_node_sync_process(pending_store)
.await,
),
None,
None,
)
}
}
let net = AuthorityAggregator::new(
state.clone_committee(),
authority_clients,
AuthAggMetrics::new(&prometheus_registry),
);

let active_authority =
Arc::new(ActiveAuthority::new(state.clone(), follower_store, net)?);

Some(if is_validator {
// TODO: get degree from config file.
let degree = 4;
active_authority.spawn_gossip_process(degree).await
} else {
let pending_store =
Arc::new(NodeSyncStore::open(config.db_path().join("node_sync_db"))?);

active_authority
.spawn_node_sync_process(pending_store)
.await
})
} else {
None
};
(None, None, None)
};

let batch_subsystem_handle = {
// Start batch system so that this node can be followed
Expand Down Expand Up @@ -234,6 +254,8 @@ impl SuiNode {
_json_rpc_service: json_rpc_service,
_ws_subscription_service: ws_subscription_service,
_gossip_handle: gossip_handle,
_execute_driver_handle: execute_driver_handle,
_checkpoint_process_handle: checkpoint_process_handle,
_batch_subsystem_handle: batch_subsystem_handle,
_post_processing_subsystem_handle: post_processing_subsystem_handle,
state,
Expand Down
62 changes: 28 additions & 34 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,18 @@ async fn end_to_end() {
for authority in &handles {
let state = authority.state().clone();
let inner_agg = aggregator.clone();
let _active_authority_handle = tokio::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(state, inner_agg).unwrap(),
);
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
};
active_state
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await
});
let active_state =
Arc::new(ActiveAuthority::new_with_ephemeral_follower_store(state, inner_agg).unwrap());
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
};
let _active_authority_handle = active_state
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await;
}

// Send the transactions for execution.
Expand Down Expand Up @@ -231,25 +228,22 @@ async fn checkpoint_with_shared_objects() {
for authority in &handles {
let state = authority.state().clone();
let inner_agg = aggregator.clone();
let _active_authority_handle = tokio::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(state, inner_agg).unwrap(),
);
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
};

println!("Start active execution process.");
active_state.clone().spawn_execute_process().await;

active_state
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await
});
let active_state =
Arc::new(ActiveAuthority::new_with_ephemeral_follower_store(state, inner_agg).unwrap());
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
};

println!("Start active execution process.");
active_state.clone().spawn_execute_process().await;

let _active_authority_handle = active_state
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await;
}

// Publish the move package to all authorities and get the new package ref.
Expand Down

0 comments on commit 9a05397

Please sign in to comment.