Skip to content

Commit

Permalink
[StateAccumulator] Exercise transaction edge cases in test (MystenLab…
Browse files Browse the repository at this point in the history
…s#9761)

## Description 

This exercises state accumulation against edge case transactions such as
those created from the use of `created_then_wrapped`,
`unwrapped_then_deleted`, as well as other combinations of wraps and
unwraps, by adding incremental checking of the live object set against
the result of accumulating single transaction effects. This path will be
exercised by any tests that hit
`authority_tests::send_and_confirm_transaction_()`. Notably, this
includes all tests in `move_interation_tests.rs`.

Included also is bug fixes uncovered from the above.

## Test Plan 

```
cargo simtest && cargo nextest run
```

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes

---------

Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
williampsmith and mystenmark authored Mar 24, 2023
1 parent d812bb2 commit a1314eb
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 20 deletions.
24 changes: 24 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,30 @@ impl AuthorityStore {
.flatten())
}

pub fn get_object_ref_prior_to_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> Result<Option<ObjectRef>, SuiError> {
let Some(prior_version) = version.one_before() else {
return Ok(None);
};
let mut iterator = self
.perpetual_tables
.objects
.iter()
.skip_prior_to(&ObjectKey(*object_id, prior_version))?;

if let Some((object_key, value)) = iterator.next() {
if object_key.0 == *object_id {
return Ok(Some(
self.perpetual_tables.object_reference(&object_key, value)?,
));
}
}
Ok(None)
}

pub fn multi_get_object_by_key(
&self,
object_keys: &[ObjectKey],
Expand Down
96 changes: 77 additions & 19 deletions crates/sui-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sui_types::storage::ObjectKey;
use tracing::debug;
use typed_store::Map;

use std::collections::HashSet;
use std::sync::Arc;

use fastcrypto::hash::MultisetHash;
Expand Down Expand Up @@ -50,7 +51,7 @@ impl StateAccumulator {
Self { authority_store }
}

/// Accumulates the effects of a single checkpoint.
/// Accumulates the effects of a single checkpoint and persists the accumulator.
/// This function is idempotent.
pub fn accumulate_checkpoint(
&self,
Expand All @@ -63,6 +64,20 @@ impl StateAccumulator {
return Ok(acc);
}

let acc = self.accumulate_effects(effects);

epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
debug!("Accumulated checkpoint {}", checkpoint_seq_num);

epoch_store
.checkpoint_state_notify_read
.notify(&checkpoint_seq_num, &acc);

Ok(acc)
}

/// Accumulates given effects and returns the accumulator without side effects.
pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
let mut acc = Accumulator::default();

// process insertions to the set
Expand Down Expand Up @@ -98,45 +113,87 @@ impl StateAccumulator {
.collect::<Vec<Vec<u8>>>(),
);

// get all modified_at_versions for the fx
let modified_at_version_keys: Vec<_> = effects
let all_unwrapped = effects
.iter()
.flat_map(|fx| {
fx.modified_at_versions()
fx.unwrapped()
.iter()
.map(|(oref, _owner)| (oref.0, oref.1))
.collect::<Vec<(ObjectID, SequenceNumber)>>()
})
.chain(effects.iter().flat_map(|fx| {
fx.unwrapped_then_deleted()
.iter()
.map(|(id, seq_num)| ObjectKey(*id, *seq_num))
.collect::<Vec<ObjectKey>>()
.map(|oref| (oref.0, oref.1))
.collect::<Vec<(ObjectID, SequenceNumber)>>()
}))
.collect::<Vec<(ObjectID, SequenceNumber)>>();

let unwrapped_ids: HashSet<ObjectID> =
all_unwrapped.iter().map(|(id, _)| id).cloned().collect();

// Collect keys from modified_at_versions to remove from the accumulator.
// Filter all unwrapped objects (from unwrapped or unwrapped_then_deleted effects)
// as these were inserted into the accumulator as a WrappedObject. Will handle these
// separately.
let modified_at_version_keys: Vec<ObjectKey> = effects
.iter()
.flat_map(|fx| fx.modified_at_versions())
.filter_map(|(id, seq_num)| {
if unwrapped_ids.contains(id) {
None
} else {
Some(ObjectKey(*id, *seq_num))
}
})
.collect();

let modified_at_digests: Vec<_> = self
.authority_store
.multi_get_object_by_key(&modified_at_version_keys)
.multi_get_object_by_key(&modified_at_version_keys.clone())
.expect("Failed to get modified_at_versions object from object table")
.into_iter()
.map(|obj| {
obj.expect(
"Object from modified_at_versions effects does not exist in objects table",
)
.zip(modified_at_version_keys)
.map(|(obj, key)| {
obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
.compute_object_reference()
.2
})
.collect();

acc.remove_all(modified_at_digests);

epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
debug!("Accumulated checkpoint {}", checkpoint_seq_num);
// Process unwrapped and unwrapped_then_deleted effects, which need to be
// removed as WrappedObject using the last sequence number it was tombstoned
// against. Since this happened in a past transaction, and the child object may
// have been modified since (and hence its sequence number incremented), we
// seek the version prior to the unwrapped version from the objects table directly
let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
.iter()
.map(|(id, seq_num)| {
let objref = self
.authority_store
.get_object_ref_prior_to_key(id, *seq_num)
.expect("read cannot fail")
.expect("wrapped tombstone must precede unwrapped object");

epoch_store
.checkpoint_state_notify_read
.notify(&checkpoint_seq_num, &acc);
assert!(objref.2.is_wrapped(), "{:?}", objref);

Ok(acc)
WrappedObject::new(objref.0, objref.1)
})
.collect();

acc.remove_all(
wrapped_objects_to_remove
.iter()
.map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
.collect::<Vec<Vec<u8>>>(),
);

acc
}

/// Unions all checkpoint accumulators at the end of the epoch to generate the
/// root state hash and saves it. This function is idempotent. Can be called on
/// root state hash and persists it to db. This function is idempotent. Can be called on
/// non-consecutive epochs, e.g. to accumulate epoch 3 after having last
/// accumulated epoch 1.
pub async fn accumulate_epoch(
Expand Down Expand Up @@ -220,6 +277,7 @@ impl StateAccumulator {
Ok(root_state_hash)
}

/// Returns the result of accumulatng the live object set, without side effects
pub fn accumulate_live_object_set(&self) -> Accumulator {
let mut acc = Accumulator::default();
for oref in self.authority_store.iter_live_object_set() {
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::task::{Context, Poll};
use std::{convert::TryInto, env};

use bcs;
use fastcrypto::hash::MultisetHash;
use futures::{stream::FuturesUnordered, StreamExt};
use move_binary_format::access::ModuleAccess;
use move_binary_format::{
Expand Down Expand Up @@ -62,6 +63,7 @@ use sui_types::{SUI_CLOCK_OBJECT_ID, SUI_CLOCK_OBJECT_SHARED_VERSION};
use crate::authority::move_integration_tests::build_and_publish_test_package_with_upgrade_cap;
use crate::consensus_handler::SequencedConsensusTransaction;
use crate::epoch::epoch_metrics::EpochMetrics;
use crate::state_accumulator::StateAccumulator;
use crate::{
authority_client::{AuthorityAPI, NetworkAuthorityClient},
authority_server::AuthorityServer,
Expand Down Expand Up @@ -1533,7 +1535,18 @@ pub async fn send_and_confirm_transaction_(

// Submit the confirmation. *Now* execution actually happens, and it should fail when we try to look up our dummy module.
// we unfortunately don't get a very descriptive error message, but we can at least see that something went wrong inside the VM
//
// We also check the incremental effects of the transaction on the live object set against StateAccumulator
// for testing and regression detection
let state_acc = StateAccumulator::new(authority.database.clone());
let mut state = state_acc.accumulate_live_object_set();
let result = authority.try_execute_for_test(&certificate).await?;
let state_after = state_acc.accumulate_live_object_set();
let effects_acc = state_acc.accumulate_effects(vec![result.inner().data().clone()]);
state.union(&effects_acc);

assert_eq!(state_after.digest(), state.digest());

if let Some(fullnode) = fullnode {
fullnode.try_execute_for_test(&certificate).await?;
}
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/unit_tests/move_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn test_publish_duplicate_modules() {
#[tokio::test]
#[cfg_attr(msim, ignore)]
async fn test_object_wrapping_unwrapping() {
telemetry_subscribers::init_for_testing();
let (sender, sender_key): (_, AccountKeyPair) = get_key_pair();
let gas = ObjectID::random();
let authority = init_state_with_ids(vec![(sender, gas)]).await;
Expand Down Expand Up @@ -783,11 +784,11 @@ async fn test_create_then_delete_parent_child_wrap_separate() {
)
.await
.unwrap();

assert!(effects.status().is_ok());
let child = effects.created()[0].0;

// Add the child to the parent.
println!("add_child_wrapped");
let effects = call_move(
&authority,
&gas,
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-types/src/base_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ mod base_types_tests;
)]
pub struct SequenceNumber(u64);

impl SequenceNumber {
pub fn one_before(&self) -> Option<SequenceNumber> {
if self.0 == 0 {
None
} else {
Some(SequenceNumber(self.0 - 1))
}
}
}

pub type TxSequenceNumber = u64;

impl fmt::Display for SequenceNumber {
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-types/src/digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ impl ObjectDigest {
*self == Self::OBJECT_DIGEST_DELETED
}

pub fn is_wrapped(&self) -> bool {
*self == Self::OBJECT_DIGEST_WRAPPED
}

pub fn base58_encode(&self) -> String {
Base58::encode(self.0)
}
Expand Down

0 comments on commit a1314eb

Please sign in to comment.