Skip to content

Commit

Permalink
spawn active processes individually
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Jun 10, 2022
1 parent 8f9d95a commit e6b9ca7
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 29 deletions.
30 changes: 16 additions & 14 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
use sui_storage::follower_store::FollowerStore;
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::error;

use crate::{
Expand Down Expand Up @@ -203,25 +204,17 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_all_active_processes(self) {
self.spawn_active_processes(true, Some(CheckpointProcessControl::default()))
pub async fn spawn_checkpoint_process(self) {
self._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
.await
}

/// Spawn all active tasks.
pub async fn spawn_active_processes(
pub async fn _spawn_checkpoint_process(
self,
gossip: bool,
checkpoint_process_control: Option<CheckpointProcessControl>,
) {
let active = Arc::new(self);
// Spawn a task to take care of gossip
let gossip_locals = active.clone();
let _gossip_join = tokio::task::spawn(async move {
if gossip {
gossip_process(&gossip_locals, 4).await;
}
});

// Spawn task to take care of checkpointing
let checkpoint_locals = active; // .clone();
Expand All @@ -231,11 +224,20 @@ where
}
});

if let Err(err) = _gossip_join.await {
error!("Join gossip task end error: {:?}", err);
}
if let Err(err) = _checkpoint_join.await {
error!("Join checkpoint task end error: {:?}", err);
}
}

/// Spawn gossip process
pub async fn spawn_gossip_process(self, degree: usize) -> JoinHandle<()> {
let active = Arc::new(self);

let gossip_locals = active;
let gossip_join = tokio::task::spawn(async move {
gossip_process(&gossip_locals, degree).await;
});

gossip_join
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn checkpoint_active_flow_happy_path() {
clients,
)
.unwrap();
active_state.spawn_all_active_processes().await
active_state.spawn_checkpoint_process().await
});
}

Expand Down Expand Up @@ -110,7 +110,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
.unwrap();
// Spin the gossip service.
active_state
.spawn_active_processes(true, Some(CheckpointProcessControl::default()))
._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
.await;
});
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
.unwrap();
// Spin the gossip service.
active_state
.spawn_active_processes(false, Some(CheckpointProcessControl::default()))
._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
.await;
});
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn test_gossip_plain() {
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
active_state.spawn_all_active_processes().await;
active_state.spawn_gossip_process(3).await;
});

active_authorities.push(handle);
Expand Down Expand Up @@ -70,7 +70,7 @@ pub async fn test_gossip_error() {
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
active_state.spawn_all_active_processes().await;
active_state.spawn_gossip_process(3).await;
});
active_authorities.push(handle);
}
Expand Down
13 changes: 5 additions & 8 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use futures::TryFutureExt;
use parking_lot::Mutex;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use sui_config::NodeConfig;
use sui_core::authority_active::gossip::gossip_process;
use sui_core::authority_server::ValidatorService;
use sui_core::{
authority::{AuthorityState, AuthorityStore},
Expand Down Expand Up @@ -96,17 +95,15 @@ impl SuiNode {
let active_authority =
ActiveAuthority::new(state.clone(), follower_store, authority_clients)?;

let degree = active_authority.state.committee.load().voting_rights.len();
// Start following validators
Some(tokio::task::spawn(async move {
gossip_process(
&active_authority,
let handle = active_authority
.spawn_gossip_process(
// listen to all authorities (note that gossip_process caps this to total minus 1.)
active_authority.state.committee.load().voting_rights.len(),
// start receiving the earliest TXes the validator has.
//Some(0),
degree,
)
.await;
}))
Some(handle)
};

let batch_subsystem_handle = {
Expand Down
4 changes: 2 additions & 2 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn end_to_end() {
..CheckpointProcessControl::default()
};
active_state
.spawn_active_processes(true, Some(checkpoint_process_control))
._spawn_checkpoint_process(Some(checkpoint_process_control))
.await
});
}
Expand Down Expand Up @@ -230,7 +230,7 @@ async fn checkpoint_with_shared_objects() {
..CheckpointProcessControl::default()
};
active_state
.spawn_active_processes(true, Some(checkpoint_process_control))
._spawn_checkpoint_process(Some(checkpoint_process_control))
.await
});
}
Expand Down

0 comments on commit e6b9ca7

Please sign in to comment.