Skip to content

Commit

Permalink
TxManager::enqueue does not need to return Result (MystenLabs#16450)
Browse files Browse the repository at this point in the history
## Description 

To be consistent with all the other calls inside the function, where we
always use expect.
It's also a bad practice to ignore the return value of some calls.
Changing all of them to expect.
The core of this PR is in TransactionManager. All changes in other
functions are due to the function signature change.

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
lxfind authored Feb 29, 2024
1 parent d54ac2f commit 730caa6
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 155 deletions.
8 changes: 4 additions & 4 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ impl AuthorityState {
let expected_effects_digest = effects.digest();

self.transaction_manager
.enqueue(vec![transaction.clone()], epoch_store)?;
.enqueue(vec![transaction.clone()], epoch_store);

let observed_effects = self
.execution_cache
Expand Down Expand Up @@ -1061,7 +1061,7 @@ impl AuthorityState {
// Shared object transactions need to be sequenced by Narwhal before enqueueing
// for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
// For owned object transactions, they can be enqueued for execution immediately.
self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store)?;
self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
}

let effects = self.notify_read_effects(certificate).await?;
Expand Down Expand Up @@ -1224,7 +1224,7 @@ impl AuthorityState {
let digest = *certificate.digest();

fail_point_if!("correlated-crash-process-certificate", || {
if sui_simulator::random::deterministic_probabilty_once(&digest, 0.01) {
if sui_simulator::random::deterministic_probability_once(&digest, 0.01) {
sui_simulator::task::kill_current_node(None);
}
});
Expand Down Expand Up @@ -2689,7 +2689,7 @@ impl AuthorityState {
&self,
certs: Vec<VerifiedCertificate>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<()> {
) {
self.transaction_manager
.enqueue_certificates(certs, epoch_store)
}
Expand Down
23 changes: 9 additions & 14 deletions crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,10 @@ pub async fn enqueue_all_and_execute_all(
authority: &AuthorityState,
certificates: Vec<VerifiedCertificate>,
) -> Result<Vec<TransactionEffects>, SuiError> {
authority
.enqueue_certificates_for_execution(
certificates.clone(),
&authority.epoch_store_for_testing(),
)
.unwrap();
authority.enqueue_certificates_for_execution(
certificates.clone(),
&authority.epoch_store_for_testing(),
);
let mut output = Vec::new();
for cert in certificates {
let effects = authority.notify_read_effects(&cert).await?;
Expand All @@ -353,12 +351,10 @@ pub async fn execute_sequenced_certificate_to_effects(
authority: &AuthorityState,
certificate: VerifiedCertificate,
) -> Result<(TransactionEffects, Option<ExecutionError>), SuiError> {
authority
.enqueue_certificates_for_execution(
vec![certificate.clone()],
&authority.epoch_store_for_testing(),
)
.unwrap();
authority.enqueue_certificates_for_execution(
vec![certificate.clone()],
&authority.epoch_store_for_testing(),
);

let (result, execution_error_opt) = authority.try_execute_for_test(&certificate).await?;
let effects = result.inner().data().clone();
Expand All @@ -383,8 +379,7 @@ pub async fn send_consensus(authority: &AuthorityState, cert: &VerifiedCertifica

authority
.transaction_manager()
.enqueue(certs, &authority.epoch_store_for_testing())
.unwrap();
.enqueue(certs, &authority.epoch_store_for_testing());
}

pub async fn send_consensus_no_execution(authority: &AuthorityState, cert: &VerifiedCertificate) {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl ValidatorService {
// even when we are not returning effects to user
if !certificate.contains_shared_object() {
self.state
.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store)?;
.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
}
return Ok(None);
}
Expand Down
13 changes: 5 additions & 8 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,10 @@ impl CheckpointExecutor {
.expect("Acquiring shared locks for change_epoch tx cannot fail");
}

self.tx_manager
.enqueue_with_expected_effects_digest(
vec![(change_epoch_tx.clone(), execution_digests.effects)],
&epoch_store,
)
.expect("Enqueueing change_epoch tx cannot fail");
self.tx_manager.enqueue_with_expected_effects_digest(
vec![(change_epoch_tx.clone(), execution_digests.effects)],
&epoch_store,
);
handle_execution_effects(
&self.state,
vec![execution_digests],
Expand Down Expand Up @@ -1050,8 +1048,7 @@ async fn execute_transactions(
}

let exec_start = Instant::now();
transaction_manager
.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store)?;
transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);

handle_execution_effects(
state,
Expand Down
6 changes: 2 additions & 4 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {

fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
let key = [commit_sub_dag_index, self.epoch_store.epoch()];
if sui_simulator::random::deterministic_probabilty(&key, 0.01) {
if sui_simulator::random::deterministic_probability(&key, 0.01) {
sui_simulator::task::kill_current_node(None);
}
});
Expand Down Expand Up @@ -511,9 +511,7 @@ impl AsyncTransactionScheduler {
) {
while let Some(transactions) = recv.recv().await {
let _guard = monitored_scope("ConsensusHandler::enqueue");
transaction_manager
.enqueue(transactions, &epoch_store)
.expect("transaction_manager::enqueue should not fail");
transaction_manager.enqueue(transactions, &epoch_store);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ impl LocalAuthorityClient {
.verify_cert(certificate)
.await?;
//let certificate = certificate.verify(epoch_store.committee())?;
state
.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store)?;
state.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
let effects = state.notify_read_effects(&certificate).await?;
state.sign_effects(effects, &epoch_store)?
}
Expand Down
44 changes: 28 additions & 16 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct PendingCertificate {
// When executing from checkpoint, the certified effects digest is provided, so that forks can
// be detected prior to committing the transaction.
pub expected_effects_digest: Option<TransactionEffectsDigest>,
// The input object this certifiate is waiting for to become available in order to be executed.
// The input object this certificate is waiting for to become available in order to be executed.
pub waiting_input_objects: BTreeSet<InputKey>,
// Stores stats about this transaction.
pub stats: PendingCertificateStats,
Expand Down Expand Up @@ -354,9 +354,7 @@ impl TransactionManager {
inner: RwLock::new(Inner::new(epoch_store.epoch(), metrics)),
tx_ready_certificates,
};
transaction_manager
.enqueue(epoch_store.all_pending_execution().unwrap(), epoch_store)
.expect("Initialize TransactionManager with pending certificates failed.");
transaction_manager.enqueue(epoch_store.all_pending_execution().unwrap(), epoch_store);
transaction_manager
}

Expand All @@ -370,7 +368,7 @@ impl TransactionManager {
&self,
certs: Vec<VerifiedCertificate>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
) {
let executable_txns = certs
.into_iter()
.map(VerifiedExecutableTransaction::new_from_certificate)
Expand All @@ -383,7 +381,7 @@ impl TransactionManager {
&self,
certs: Vec<VerifiedExecutableTransaction>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
) {
let certs = certs.into_iter().map(|cert| (cert, None)).collect();
self.enqueue_impl(certs, epoch_store)
}
Expand All @@ -393,7 +391,7 @@ impl TransactionManager {
&self,
certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
) {
let certs = certs
.into_iter()
.map(|(cert, fx)| (cert, Some(fx)))
Expand All @@ -408,7 +406,7 @@ impl TransactionManager {
Option<TransactionEffectsDigest>,
)>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
) {
// filter out already executed certs
let certs: Vec<_> = certs
.into_iter()
Expand All @@ -418,10 +416,16 @@ impl TransactionManager {
if self
.cache_read
.is_tx_already_executed(&digest)
.expect("Failed to check if tx is already executed")
.unwrap_or_else(|err| {
panic!("Failed to check if tx is already executed: {:?}", err)
})
{
// also ensure the transaction will not be retried after restart.
let _ = epoch_store.remove_pending_execution(&digest);
epoch_store
.remove_pending_execution(&digest)
.unwrap_or_else(|err| {
panic!("remove_pending_execution should not fail: {:?}", err)
});
self.metrics
.transaction_manager_num_enqueued_certificates
.with_label_values(&["already_executed"])
Expand Down Expand Up @@ -497,7 +501,7 @@ impl TransactionManager {
receiving_objects,
epoch_store.epoch(),
)
.expect("Checking object existence cannot fail!")
.unwrap_or_else(|err| panic!("Checking object existence cannot fail: {:?}", err))
.into_iter()
.zip(input_object_cache_misses);

Expand Down Expand Up @@ -565,7 +569,11 @@ impl TransactionManager {
inner.epoch, pending_cert.certificate
);
// also ensure the transaction will not be retried after restart.
let _ = epoch_store.remove_pending_execution(&digest);
epoch_store
.remove_pending_execution(&digest)
.unwrap_or_else(|err| {
panic!("remove_pending_execution should not fail: {:?}", err)
});
continue;
}

Expand All @@ -586,9 +594,15 @@ impl TransactionManager {
continue;
}
// skip already executed txes
if self.cache_read.is_tx_already_executed(&digest)? {
let is_tx_already_executed = self
.cache_read
.is_tx_already_executed(&digest)
.expect("Check if tx is already executed should not fail");
if is_tx_already_executed {
// also ensure the transaction will not be retried after restart.
let _ = epoch_store.remove_pending_execution(&digest);
epoch_store
.remove_pending_execution(&digest)
.expect("remove_pending_execution should not fail");
self.metrics
.transaction_manager_num_enqueued_certificates
.with_label_values(&["already_executed"])
Expand Down Expand Up @@ -652,8 +666,6 @@ impl TransactionManager {
.set(inner.pending_certificates.len() as i64);

inner.maybe_reserve_capacity();

Ok(())
}

#[cfg(test)]
Expand Down
24 changes: 10 additions & 14 deletions crates/sui-core/src/unit_tests/execution_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,20 +423,16 @@ async fn test_execution_with_dependencies() {

// Enqueue certs out of dependency order for executions.
for cert in executed_shared_certs.iter().rev() {
authorities[3]
.enqueue_certificates_for_execution(
vec![cert.clone()],
&authorities[3].epoch_store_for_testing(),
)
.unwrap();
authorities[3].enqueue_certificates_for_execution(
vec![cert.clone()],
&authorities[3].epoch_store_for_testing(),
);
}
for cert in executed_owned_certs.iter().rev() {
authorities[3]
.enqueue_certificates_for_execution(
vec![cert.clone()],
&authorities[3].epoch_store_for_testing(),
)
.unwrap();
authorities[3].enqueue_certificates_for_execution(
vec![cert.clone()],
&authorities[3].epoch_store_for_testing(),
);
}

// All certs should get executed eventually.
Expand Down Expand Up @@ -761,7 +757,7 @@ async fn test_authority_txn_signing_pushback() {
// Manually make the authority into overload state and reject 100% of traffic.
authority_state.overload_info.set_overload(100);

// First, create a transaction to tranfer `gas_object1` to `recipient1`.
// First, create a transaction to transfer `gas_object1` to `recipient1`.
let rgp = authority_state.reference_gas_price_for_testing().unwrap();
let tx = make_transfer_object_transaction(
gas_object1.compute_object_reference(),
Expand Down Expand Up @@ -888,7 +884,7 @@ async fn test_authority_txn_execution_pushback() {
// Manually make the authority into overload state and reject 100% of traffic.
authority_state.overload_info.set_overload(100);

// Create a transaction to tranfer `gas_object1` to `recipient`.
// Create a transaction to transfer `gas_object1` to `recipient`.
let rgp = authority_state.reference_gas_price_for_testing().unwrap();
let tx = make_transfer_object_transaction(
gas_object1.compute_object_reference(),
Expand Down
Loading

0 comments on commit 730caa6

Please sign in to comment.