Skip to content

Commit

Permalink
Separate sync process and active process (MystenLabs#2748)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 29, 2022
1 parent e8d7ca2 commit 71b914d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 113 deletions.
181 changes: 116 additions & 65 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{
authority_client::AuthorityAPI,
checkpoints::{proposal::CheckpointProposal, CheckpointStore},
};
use sui_types::committee::StakeUnit;
use sui_types::committee::{Committee, StakeUnit};
use sui_types::error::SuiResult;
use tracing::{debug, info, warn};

#[cfg(test)]
Expand All @@ -40,6 +41,11 @@ pub struct CheckpointProcessControl {
/// main loop.
pub delay_on_quorum_failure: Duration,

/// The delay before we retry the process, when there is a local error
/// that prevented us from making progress, e.g. failed to create
/// a new proposal, or not ready to set a new checkpoint due to unexecuted transactions.
pub delay_on_local_failure: Duration,

/// The time between full iterations of the checkpointing
/// logic loop.
pub long_pause_between_checkpoints: Duration,
Expand All @@ -65,6 +71,7 @@ impl Default for CheckpointProcessControl {
fn default() -> CheckpointProcessControl {
CheckpointProcessControl {
delay_on_quorum_failure: Duration::from_secs(10),
delay_on_local_failure: Duration::from_secs(3),
long_pause_between_checkpoints: Duration::from_secs(60),
timeout_until_quorum: Duration::from_secs(60),
extra_time_after_quorum: Duration::from_millis(200),
Expand Down Expand Up @@ -101,22 +108,25 @@ pub async fn checkpoint_process<A>(
continue;
}
// (1) Get the latest summaries and proposals
// TODO: This may not work if we are many epochs behind: we won't be able to download
// from the current network. We will need to consolidate sync implementation.
let state_of_world = get_latest_proposal_and_checkpoint_from_all(
net.clone(),
timing.extra_time_after_quorum,
timing.timeout_until_quorum,
)
.await;

if let Err(err) = state_of_world {
warn!("Cannot get a quorum of checkpoint information: {:?}", err);
// Sleep for 10 sec to allow the network to set itself up or the partition
// to go away.
tokio::time::sleep(timing.delay_on_quorum_failure).await;
continue;
}

let (checkpoint, proposals) = state_of_world.expect("Just checked that we are not Err");
let (checkpoint, proposals) = match state_of_world {
Ok(s) => s,
Err(err) => {
warn!("Cannot get a quorum of checkpoint information: {:?}", err);
// Sleep for delay_on_quorum_failure to allow the network to set itself
// up or the partition to go away.
tokio::time::sleep(timing.delay_on_quorum_failure).await;
continue;
}
};

// (2) Sync to the latest checkpoint, this might take some time.
// Its ok nothing else goes on in terms of the active checkpoint logic
Expand All @@ -126,7 +136,8 @@ pub async fn checkpoint_process<A>(
// Check if there are more historic checkpoints to catch up with
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
if next_checkpoint < checkpoint.summary.sequence_number {
// TODO log error
// TODO: The sync process doesn't really work today because we don't yet have a
// mechanism to ensure that all past transactions will be executed.
if let Err(err) = sync_to_checkpoint(
active_authority.state.name,
net.clone(),
Expand All @@ -139,40 +150,25 @@ pub async fn checkpoint_process<A>(
// if there was an error we pause to wait for network to come up
tokio::time::sleep(timing.delay_on_quorum_failure).await;
}

// The above process can take some time, and the latest checkpoint may have
// already changed. Restart process to be sure.
continue;
}

// The checkpoint we received is equal to what is expected or greater.
// In either case try to upgrade the signed checkpoint to a certified one
// if possible
let result = {
state_checkpoints.lock().process_checkpoint_certificate(
&checkpoint,
&None,
committee,
)
}; // unlock

if let Err(err) = result {
warn!("Cannot process checkpoint: {err:?}");
drop(err);

// One of the errors may be due to the fact that we do not have
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) =
get_checkpoint_contents(active_authority.state.name, net.clone(), &checkpoint)
.await
{
// Retry with contents
let _ = state_checkpoints.lock().process_checkpoint_certificate(
&checkpoint,
&Some(contents),
committee,
);
}
// sync_to_checkpoint only syncs to the checkpoint before the latest checkpoint.
// The latest checkpoint requires special handling (refer to the comments there).
if let Err(err) = update_latest_checkpoint(
active_authority.state.name,
&net,
&state_checkpoints,
&checkpoint,
committee,
)
.await
{
warn!("Failed to update latest checkpoint: {:?}", err);
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
}

Expand All @@ -184,31 +180,48 @@ pub async fn checkpoint_process<A>(
.map(|(auth, _)| committee.weight(auth))
.sum();

// TODO: What is _start_checkpoint_making for?
let _start_checkpoint_making = weight > committee.quorum_threshold();

let proposal = state_checkpoints
.lock()
.new_proposal(committee.epoch)
.clone();
if let Ok(my_proposal) = proposal {
diff_proposals(
active_authority,
state_checkpoints.clone(),
&my_proposal,
proposals,
timing.consensus_delay_estimate,
)
.await;
let proposal = state_checkpoints.lock().new_proposal(committee.epoch);
match proposal {
Ok(my_proposal) => {
diff_proposals(
active_authority,
state_checkpoints.clone(),
&my_proposal,
proposals,
timing.consensus_delay_estimate,
)
.await;
}
Err(err) => {
warn!("Failure to make a new proposal: {:?}", err);
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
}

if let Err(err) = state_checkpoints
let success = state_checkpoints
.lock()
.attempt_to_construct_checkpoint(committee)
{
warn!("Error attempting to construct checkpoint: {:?}", err);
.attempt_to_construct_checkpoint(committee);

match success {
Err(err) => {
warn!("Error attempting to construct checkpoint: {:?}", err);
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
Ok(false) => {
// TODO: attempt_to_construct_checkpoint should just return Err for the false case.
warn!("Did not construct checkpoint");
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
Ok(true) => (),
}

// (5) Wait for a long long time.
// (4) Wait for a long long time.
let name = state_checkpoints.lock().name;
let next_checkpoint = state_checkpoints.lock().next_checkpoint();

Expand Down Expand Up @@ -396,6 +409,48 @@ where
Ok((highest_certificate_cert, proposals))
}

/// The latest certified checkpoint can either be a checkpoint downloaded from another validator,
/// or constructed locally using a quorum of signed checkpoints. In the latter case, we won't be
/// able to download it from anywhere, but only need contents to make sure we can update it.
/// Such content can either be obtained locally if there was already a signed checkpoint, or
/// downloaded from other validators if not available.
async fn update_latest_checkpoint<A>(
self_name: AuthorityName,
net: &Arc<AuthorityAggregator<A>>,
state_checkpoints: &Arc<Mutex<CheckpointStore>>,
checkpoint: &CertifiedCheckpointSummary,
committee: &Committee,
) -> SuiResult
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let result = {
state_checkpoints
.lock()
.process_checkpoint_certificate(checkpoint, &None, committee)
}; // unlock

if let Err(err) = result {
warn!("Cannot process checkpoint: {err:?}");
drop(err);

// One of the errors may be due to the fact that we do not have
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) = get_checkpoint_contents(self_name, net.clone(), checkpoint).await {
// Retry with contents
state_checkpoints.lock().process_checkpoint_certificate(
checkpoint,
&Some(contents),
committee,
)?;
}
}

Ok(())
}

/// Download all checkpoints that are not known to us
pub async fn sync_to_checkpoint<A>(
name: AuthorityName,
Expand Down Expand Up @@ -433,11 +488,7 @@ where
}

let full_sync_start = latest_checkpoint
.map(|chk| match chk {
AuthenticatedCheckpoint::Signed(signed) => signed.summary.sequence_number + 1,
AuthenticatedCheckpoint::Certified(cert) => cert.summary.sequence_number + 1,
AuthenticatedCheckpoint::None => unreachable!(),
})
.map(|chk| chk.summary().sequence_number + 1)
.unwrap_or(0);

for seq in full_sync_start..latest_known_checkpoint.summary.sequence_number {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
println!("Start active execution process.");
active_state.clone().spawn_execute_process().await;

// Spin the gossip service.
// Spin the checkpoint service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.await;
Expand Down Expand Up @@ -273,7 +273,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
.next_checkpoint();
assert!(
next_checkpoint_sequence > 1,
"Expected {} > 2",
"Expected {} > 1",
next_checkpoint_sequence
);
value_set.insert(next_checkpoint_sequence);
Expand Down
49 changes: 18 additions & 31 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,17 +362,28 @@ impl CheckpointStore {
})
}

pub fn sign_new_checkpoint(
&mut self,
summary: CheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult {
let checkpoint = AuthenticatedCheckpoint::Signed(
SignedCheckpointSummary::new_from_summary(summary, self.name, &*self.secret),
);
self.handle_internal_set_checkpoint(&checkpoint, contents)
}

/// Call this function internally to update the latest checkpoint.
/// Internally it is called with an unsigned checkpoint, and results
/// in the checkpoint being signed, stored and the contents
/// registered as processed or unprocessed.
pub fn handle_internal_set_checkpoint(
&mut self,
epoch: EpochId,
checkpoint: CheckpointSummary,
checkpoint: &AuthenticatedCheckpoint,
contents: &CheckpointContents,
) -> Result<(), SuiError> {
let checkpoint_sequence_number = *checkpoint.sequence_number();
let summary = checkpoint.summary();
let checkpoint_sequence_number = *summary.sequence_number();

// Process checkpoints once but allow idempotent processing
if self.checkpoints.get(&checkpoint_sequence_number)?.is_some() {
Expand All @@ -392,6 +403,7 @@ impl CheckpointStore {

// Ensure we have processed all transactions contained in this checkpoint.
if !self.all_checkpoint_transactions_executed(contents)? {
// TODO: We need to schedule all unexecuted transactions for execution.
return Err(SuiError::from(
"Checkpoint contains unexecuted transactions.",
));
Expand All @@ -405,19 +417,14 @@ impl CheckpointStore {
// contents as such a list instead of a set.
// Probably we need access to the effects to do the above.

// Sign the new checkpoint
let signed_checkpoint = AuthenticatedCheckpoint::Signed(
SignedCheckpointSummary::new_from_summary(checkpoint, self.name, &*self.secret),
);

// Make a DB batch
let batch = self.checkpoints.batch();

// Last store the actual checkpoints.
let batch = batch
.insert_batch(
&self.checkpoints,
[(&checkpoint_sequence_number, &signed_checkpoint)],
[(&checkpoint_sequence_number, checkpoint)],
)?
// Drop the fragments for the previous checkpoint
.delete_batch(
Expand All @@ -437,9 +444,6 @@ impl CheckpointStore {
let transactions: Vec<_> = contents.transactions.iter().cloned().collect();
self.update_new_checkpoint_inner(checkpoint_sequence_number, &transactions, batch)?;

// Try to set a fresh proposal, and ignore errors if this fails.
let _ = self.new_proposal(epoch);

Ok(())
}

Expand Down Expand Up @@ -632,7 +636,7 @@ impl CheckpointStore {
&contents,
previous_digest,
);
self.handle_internal_set_checkpoint(committee.epoch, summary, &contents)
self.sign_new_checkpoint(summary, &contents)
.map_err(FragmentInternalError::Error)?;

return Ok(true);
Expand Down Expand Up @@ -773,26 +777,9 @@ impl CheckpointStore {
// Check and process contents
checkpoint.verify_with_transactions(committee, contents)?;
self.handle_internal_set_checkpoint(
committee.epoch,
checkpoint.summary.clone(),
contents,
)?;
// Then insert it
self.checkpoints.insert(
checkpoint.summary.sequence_number(),
&AuthenticatedCheckpoint::Certified(checkpoint.clone()),
contents,
)?;

// Now that we have the new checkpoint we try to move forward the checkpoint creation
// process. We try to use fragments in the sequence to create past checkpoints.
loop {
let construct = self.attempt_to_construct_checkpoint(committee);
// Exit if checkpoint construction leads to an error or returns false
// (ie no new checkpoint is created.)
if construct.is_err() || !construct.unwrap() {
break;
}
}
} else {
return Err(SuiError::from("No checkpoint set at this sequence."));
}
Expand Down
Loading

0 comments on commit 71b914d

Please sign in to comment.