Skip to content

Commit

Permalink
[checkpoints] Introduce checkpoint timestamps (MystenLabs#7482)
Browse files Browse the repository at this point in the history
This PR adds `CheckpointSummary::timestamp_ms` field and populates it
from the time of narwhal commit
  • Loading branch information
andll authored Jan 19, 2023
1 parent 0aa6292 commit a21e19d
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 52 deletions.
1 change: 1 addition & 0 deletions crates/sui-config/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ fn create_genesis_checkpoint(
previous_digest: None,
epoch_rolling_gas_cost_summary: Default::default(),
next_epoch_committee: None,
timestamp_ms: 0, /*todo - put a proper timestamp*/
};

(checkpoint, contents)
Expand Down
27 changes: 18 additions & 9 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use typed_store::traits::{TableSummary, TypedStoreDebug};

use crate::authority::authority_notify_read::NotifyRead;
use crate::authority::{CertTxGuard, MAX_TX_RECOVERY_RETRY};
use crate::checkpoints::{CheckpointCommitHeight, CheckpointServiceNotify, EpochStats};
use crate::checkpoints::{
CheckpointCommitHeight, CheckpointServiceNotify, EpochStats, PendingCheckpoint,
PendingCheckpointInfo,
};
use crate::consensus_handler::{
SequencedConsensusTransaction, VerifiedSequencedConsensusTransaction,
};
Expand Down Expand Up @@ -197,7 +200,7 @@ pub struct AuthorityEpochTables {
/// the sequence number of checkpoint does not match height here.
///
/// The boolean value indicates whether this is the last checkpoint of the epoch.
pending_checkpoints: DBMap<CheckpointCommitHeight, (Vec<TransactionDigest>, bool)>,
pending_checkpoints: DBMap<CheckpointCommitHeight, PendingCheckpoint>,

/// Checkpoint builder maintains internal list of transactions it included in checkpoints here
builder_digest_to_checkpoint: DBMap<TransactionDigest, CheckpointSequenceNumber>,
Expand Down Expand Up @@ -1371,7 +1374,15 @@ impl AuthorityPerEpochStore {
Some(CmpOrdering::Greater) => false,
None => false,
};
checkpoint_service.notify_checkpoint(self, index, roots, final_checkpoint)?;
let checkpoint = PendingCheckpoint {
roots,
details: PendingCheckpointInfo {
timestamp_ms: committed_dag.leader.metadata.created_at,
last_of_epoch: final_checkpoint,
commit_height: index,
},
};
checkpoint_service.notify_checkpoint(self, checkpoint)?;
if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.record_end_of_message_quorum_time_metric();
Expand All @@ -1380,25 +1391,23 @@ impl AuthorityPerEpochStore {
self.record_checkpoint_boundary(round)
}

pub fn get_pending_checkpoints(
&self,
) -> Vec<(CheckpointCommitHeight, (Vec<TransactionDigest>, bool))> {
pub fn get_pending_checkpoints(&self) -> Vec<(CheckpointCommitHeight, PendingCheckpoint)> {
self.tables.pending_checkpoints.iter().collect()
}

pub fn get_pending_checkpoint(
&self,
index: &CheckpointCommitHeight,
) -> Result<Option<(Vec<TransactionDigest>, bool)>, TypedStoreError> {
) -> Result<Option<PendingCheckpoint>, TypedStoreError> {
self.tables.pending_checkpoints.get(index)
}

pub fn insert_pending_checkpoint(
&self,
index: &CheckpointCommitHeight,
transactions: &(Vec<TransactionDigest>, bool),
checkpoint: &PendingCheckpoint,
) -> Result<(), TypedStoreError> {
self.tables.pending_checkpoints.insert(index, transactions)
self.tables.pending_checkpoints.insert(index, checkpoint)
}

pub fn process_pending_checkpoint(
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/checkpoints/checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let checkpoint_seq = summary.sequence_number;
debug!("Sending checkpoint signature at sequence {checkpoint_seq} to consensus");
debug!(
"Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {}",
summary.timestamp_ms
);
LogCheckpointOutput
.checkpoint_created(summary, contents, epoch_store)
.await?;
Expand Down
107 changes: 66 additions & 41 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use sui_types::gas::GasCostSummary;
use sui_types::messages::{TransactionEffects, VerifiedSignedTransactionEffects};
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointContentsDigest, CheckpointDigest,
CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointSummary, VerifiedCheckpoint,
CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointSummary, CheckpointTimestamp,
VerifiedCheckpoint,
};
use tokio::sync::{mpsc, watch, Notify};
use tokio::time::Instant;
Expand All @@ -53,6 +54,19 @@ pub struct EpochStats {
pub total_gas_reward: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingCheckpointInfo {
pub timestamp_ms: CheckpointTimestamp,
pub last_of_epoch: bool,
pub commit_height: CheckpointCommitHeight,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingCheckpoint {
pub roots: Vec<TransactionDigest>,
pub details: PendingCheckpointInfo,
}

#[derive(DBMapUtils)]
pub struct CheckpointStore {
/// Maps checkpoint contents digest to checkpoint contents
Expand Down Expand Up @@ -399,15 +413,10 @@ impl CheckpointBuilder {
Ok(false) => (),
};
let mut last_processed_height: Option<u64> = None;
for (height, (roots, last_checkpoint_of_epoch)) in
self.epoch_store.get_pending_checkpoints()
{
for (height, pending) in self.epoch_store.get_pending_checkpoints() {
last_processed_height = Some(height);
debug!("Making checkpoint at commit height {height}");
if let Err(e) = self
.make_checkpoint(height, roots, last_checkpoint_of_epoch)
.await
{
if let Err(e) = self.make_checkpoint(height, pending).await {
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
Expand All @@ -429,23 +438,20 @@ impl CheckpointBuilder {
async fn make_checkpoint(
&self,
height: CheckpointCommitHeight,
roots: Vec<TransactionDigest>,
last_checkpoint_of_epoch: bool,
pending: PendingCheckpoint,
) -> anyhow::Result<()> {
self.metrics
.checkpoint_roots_count
.inc_by(roots.len() as u64);
.inc_by(pending.roots.len() as u64);
let roots = self
.effects_store
.notify_read_effects(roots)
.notify_read_effects(pending.roots)
.in_monitored_scope("CheckpointNotifyRead")
.await?;
let _scope = monitored_scope("CheckpointBuilder");
let unsorted = self.complete_checkpoint_effects(roots)?;
let sorted = CasualOrder::casual_sort(unsorted);
let new_checkpoint = self
.create_checkpoints(sorted, last_checkpoint_of_epoch)
.await?;
let new_checkpoint = self.create_checkpoints(sorted, pending.details).await?;
self.write_checkpoints(height, new_checkpoint).await?;
Ok(())
}
Expand Down Expand Up @@ -488,7 +494,7 @@ impl CheckpointBuilder {
async fn create_checkpoints(
&self,
all_effects: Vec<TransactionEffects>,
last_pending_of_epoch: bool,
details: PendingCheckpointInfo,
) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
let total = all_effects.len();
let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
Expand Down Expand Up @@ -539,7 +545,7 @@ impl CheckpointBuilder {
self.epoch_store
.record_epoch_first_checkpoint_creation_time_metric();
}
let last_checkpoint_of_epoch = last_pending_of_epoch && index == chunks_count - 1;
let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
let digests_without_epoch_augment: Vec<_> =
effects.iter().map(|e| e.transaction_digest).collect();
debug!("Waiting for checkpoint user signatures for certificates {:?} to appear in consensus", digests_without_epoch_augment);
Expand Down Expand Up @@ -576,6 +582,13 @@ impl CheckpointBuilder {
.as_ref()
.map(|(_, c)| c.sequence_number + 1)
.unwrap_or_default();
let timestamp_ms = details.timestamp_ms;
if let Some((_, last_checkpoint)) = &last_checkpoint {
if last_checkpoint.timestamp_ms > timestamp_ms {
error!("Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
sequence_number, last_checkpoint.timestamp_ms, timestamp_ms);
}
}
let summary = CheckpointSummary::new(
epoch,
sequence_number,
Expand All @@ -594,6 +607,7 @@ impl CheckpointBuilder {
} else {
None
},
timestamp_ms,
);
if last_checkpoint_of_epoch {
info!(
Expand Down Expand Up @@ -904,9 +918,7 @@ pub trait CheckpointServiceNotify {
fn notify_checkpoint(
&self,
epoch_store: &AuthorityPerEpochStore,
index: CheckpointCommitHeight,
roots: Vec<TransactionDigest>,
last_checkpoint_of_epoch: bool,
checkpoint: PendingCheckpoint,
) -> SuiResult;
}

Expand Down Expand Up @@ -1039,26 +1051,28 @@ impl CheckpointServiceNotify for CheckpointService {
fn notify_checkpoint(
&self,
epoch_store: &AuthorityPerEpochStore,
index: CheckpointCommitHeight,
roots: Vec<TransactionDigest>,
last_checkpoint_of_epoch: bool,
checkpoint: PendingCheckpoint,
) -> SuiResult {
if let Some(pending) = epoch_store.get_pending_checkpoint(&index)? {
if pending.0 != roots {
panic!("Received checkpoint at index {} that contradicts previously stored checkpoint. Old digests: {:?}, new digests: {:?}", index, pending, roots);
if let Some(pending) = epoch_store.get_pending_checkpoint(&checkpoint.height())? {
if pending.roots != checkpoint.roots {
panic!("Received checkpoint at index {} that contradicts previously stored checkpoint. Old digests: {:?}, new digests: {:?}", checkpoint.height(), pending.roots, checkpoint.roots);
}
debug!(
"Ignoring duplicate checkpoint notification at height {}",
index
checkpoint.height()
);
return Ok(());
}
debug!(
"Transaction roots for pending checkpoint at height {}: {:?}",
index, roots
checkpoint.height(),
checkpoint.roots
);
epoch_store.insert_pending_checkpoint(&checkpoint.height(), &checkpoint)?;
debug!(
"Notifying builder about checkpoint at {}",
checkpoint.height()
);
epoch_store.insert_pending_checkpoint(&index, &(roots, last_checkpoint_of_epoch))?;
debug!("Notifying builder about checkpoint at {index}");
self.notify_builder.notify_one();
Ok(())
}
Expand All @@ -1076,13 +1090,7 @@ impl CheckpointServiceNotify for CheckpointServiceNoop {
Ok(())
}

fn notify_checkpoint(
&self,
_: &AuthorityPerEpochStore,
_: CheckpointCommitHeight,
_: Vec<TransactionDigest>,
_: bool,
) -> SuiResult {
fn notify_checkpoint(&self, _: &AuthorityPerEpochStore, _: PendingCheckpoint) -> SuiResult {
Ok(())
}
}
Expand Down Expand Up @@ -1132,6 +1140,12 @@ impl CheckpointTailer {
}
}

impl PendingCheckpoint {
pub fn height(&self) -> CheckpointCommitHeight {
self.details.commit_height
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1219,17 +1233,17 @@ mod tests {
);
let mut tailer = checkpoint_service.subscribe_checkpoints(0);
checkpoint_service
.notify_checkpoint(&epoch_store, 0, vec![d(4)], false)
.notify_checkpoint(&epoch_store, p(0, vec![4]))
.unwrap();
// Verify that sending same digests at same height is noop
checkpoint_service
.notify_checkpoint(&epoch_store, 0, vec![d(4)], false)
.notify_checkpoint(&epoch_store, p(0, vec![4]))
.unwrap();
checkpoint_service
.notify_checkpoint(&epoch_store, 1, vec![d(1), d(3)], false)
.notify_checkpoint(&epoch_store, p(1, vec![1, 3]))
.unwrap();
checkpoint_service
.notify_checkpoint(&epoch_store, 2, vec![d(10), d(11), d(12), d(13)], false)
.notify_checkpoint(&epoch_store, p(2, vec![10, 11, 12, 13]))
.unwrap();

let (c1c, c1s) = result.recv().await.unwrap();
Expand Down Expand Up @@ -1347,6 +1361,17 @@ mod tests {
}
}

fn p(i: u64, t: Vec<u8>) -> PendingCheckpoint {
PendingCheckpoint {
roots: t.into_iter().map(d).collect(),
details: PendingCheckpointInfo {
timestamp_ms: 0,
last_of_epoch: false,
commit_height: i,
},
}
}

fn d(i: u8) -> TransactionDigest {
let mut bytes: [u8; 32] = Default::default();
bytes[0] = i;
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-network/src/state_sync/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl CommitteeFixture {
previous_digest: None,
epoch_rolling_gas_cost_summary: Default::default(),
next_epoch_committee: None,
timestamp_ms: 0,
};

self.create_certified_checkpoint(checkpoint)
Expand Down Expand Up @@ -117,6 +118,7 @@ impl CommitteeFixture {
previous_digest: Some(prev.summary.digest()),
epoch_rolling_gas_cost_summary: Default::default(),
next_epoch_committee: None,
timestamp_ms: 0,
};

let checkpoint = self.create_certified_checkpoint(summary);
Expand Down Expand Up @@ -159,6 +161,7 @@ impl CommitteeFixture {
previous_digest: Some(previous_checkpoint.summary.digest()),
epoch_rolling_gas_cost_summary: Default::default(),
next_epoch_committee: Some(next_epoch_committee),
timestamp_ms: 0,
};

let checkpoint = self.create_certified_checkpoint(summary);
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3019,7 +3019,8 @@
"epoch",
"epoch_rolling_gas_cost_summary",
"network_total_transactions",
"sequence_number"
"sequence_number",
"timestamp_ms"
],
"properties": {
"content_digest": {
Expand Down Expand Up @@ -3080,6 +3081,12 @@
"type": "integer",
"format": "uint64",
"minimum": 0.0
},
"timestamp_ms": {
"description": "Timestamp of the checkpoint - number of milliseconds from the Unix epoch Checkpoint timestamps are monotonic, but not strongly monotonic - subsequent checkpoints can have same timestamp if they originate from the same underlining consensus commit",
"type": "integer",
"format": "uint64",
"minimum": 0.0
}
}
},
Expand Down
Loading

0 comments on commit a21e19d

Please sign in to comment.