Skip to content

Commit

Permalink
[State Sync] Update tests for dedicated commit handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jan 13, 2024
1 parent 989b743 commit d20fab9
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 144 deletions.
2 changes: 1 addition & 1 deletion api/openapi-spec-generator/src/fake_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;

// This is necessary for building the API with how the code is structured currently.
pub fn get_fake_context() -> Context {
let mempool = MockSharedMempool::new();
let mempool = MockSharedMempool::new_with_runtime();
Context::new(
ChainId::test(),
Arc::new(MockDbReaderWriter),
Expand Down
2 changes: 1 addition & 1 deletion api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl TestContext {

self.mempool
.mempool_notifier
.notify_new_commit(txns, timestamp, 1000)
.notify_new_commit(txns, timestamp)
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/aptos-rest-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ impl Client {
}

pub async fn wait_for_version(&self, version: u64) -> Result<State> {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
const DEFAULT_DELAY: Duration = Duration::from_millis(500);

let start = std::time::Instant::now();
Expand Down
23 changes: 16 additions & 7 deletions mempool/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::Handle;

/// Mock of a running instance of shared mempool.
pub struct MockSharedMempool {
_runtime: Option<Runtime>,
_runtime: Option<Handle>,
_handle: Option<Handle>,
pub ac_client: MempoolClientSender,
pub mempool: Arc<Mutex<CoreMempool>>,
Expand All @@ -59,15 +59,14 @@ impl MockSharedMempool {
/// Returns the runtime on which the shared mempool is running
/// and the channel through which shared mempool receives client events.
pub fn new() -> Self {
let runtime = aptos_runtimes::spawn_named_runtime("shared-mem".into(), None);
let _entered_runtime = runtime.enter();
// Create the shared mempool
let (ac_client, mempool, quorum_store_sender, mempool_notifier) = Self::start(
runtime.handle(),
&Handle::current(),
&DbReaderWriter::new(MockDbReaderWriter),
MockVMValidator,
);
Self {
_runtime: Some(runtime),
_runtime: Some(Handle::current()),
_handle: None,
ac_client,
mempool,
Expand All @@ -76,6 +75,16 @@ impl MockSharedMempool {
}
}

/// Creates a mock shared mempool and runtime
pub fn new_with_runtime() -> Self {
// Create a runtime
let runtime = aptos_runtimes::spawn_named_runtime("shared-mem".into(), None);
let _entered_runtime = runtime.enter();

// Create and return the shared mempool
Self::new()
}

/// Creates a mock of a running instance of shared mempool inside a tokio runtime;
/// Holds a runtime handle instead.
pub fn new_in_runtime<V: TransactionValidation + 'static>(
Expand Down Expand Up @@ -121,7 +130,7 @@ impl MockSharedMempool {
let (ac_client, client_events) = mpsc::channel(1_024);
let (quorum_store_sender, quorum_store_receiver) = mpsc::channel(1_024);
let (mempool_notifier, mempool_listener) =
aptos_mempool_notifications::new_mempool_notifier_listener_pair();
aptos_mempool_notifications::new_mempool_notifier_listener_pair(100);
let (reconfig_sender, reconfig_events) = aptos_channel::new(QueueStyle::LIFO, 1, None);
let reconfig_event_subscriber = ReconfigNotificationListener {
notification_receiver: reconfig_events,
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ fn start_node_mempool(
let (_ac_endpoint_sender, ac_endpoint_receiver) = mpsc::channel(1_024);
let (_quorum_store_sender, quorum_store_receiver) = mpsc::channel(1_024);
let (_mempool_notifier, mempool_listener) =
aptos_mempool_notifications::new_mempool_notifier_listener_pair();
aptos_mempool_notifications::new_mempool_notifier_listener_pair(100);
let (reconfig_sender, reconfig_events) = aptos_channel::new(QueueStyle::LIFO, 1, None);
let reconfig_event_subscriber = ReconfigNotificationListener {
notification_receiver: reconfig_events,
Expand Down
59 changes: 34 additions & 25 deletions mempool/src/tests/shared_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::{
use aptos_consensus_types::common::RejectedTransactionSummary;
use aptos_mempool_notifications::MempoolNotificationSender;
use aptos_types::{transaction::Transaction, vm_status::DiscardedVMStatus};
use futures::{channel::oneshot, executor::block_on, sink::SinkExt};
use futures::{channel::oneshot, sink::SinkExt};
use tokio::time::timeout;

#[test]
fn test_consensus_events_rejected_txns() {
#[tokio::test]
async fn test_consensus_events_rejected_txns() {
let smp = MockSharedMempool::new();

// Add txns 1, 2, 3
Expand Down Expand Up @@ -42,10 +43,8 @@ fn test_consensus_events_rejected_txns() {
let (callback, callback_rcv) = oneshot::channel();
let req = QuorumStoreRequest::RejectNotification(transactions, callback);
let mut consensus_sender = smp.consensus_to_mempool_sender.clone();
block_on(async {
assert!(consensus_sender.send(req).await.is_ok());
assert!(callback_rcv.await.is_ok());
});
assert!(consensus_sender.send(req).await.is_ok());
assert!(callback_rcv.await.is_ok());

let pool = smp.mempool.lock();
// TODO: make less brittle to broadcast buckets changes
Expand All @@ -54,12 +53,9 @@ fn test_consensus_events_rejected_txns() {
assert_eq!(timeline.first().unwrap(), &kept_txn);
}

#[test]
fn test_mempool_notify_committed_txns() {
// Create runtime for the mempool notifier and listener
let runtime = aptos_runtimes::spawn_named_runtime("shared-mem".into(), None);
let _enter = runtime.enter();

#[allow(clippy::await_holding_lock)] // This appears to be a false positive!
#[tokio::test(flavor = "multi_thread")]
async fn test_mempool_notify_committed_txns() {
// Create a new mempool notifier, listener and shared mempool
let smp = MockSharedMempool::new();

Expand All @@ -81,18 +77,31 @@ fn test_mempool_notify_committed_txns() {
assert!(batch_add_signed_txn(&mut pool, txns).is_ok());
}

// Notify mempool of the new commit
let committed_txns = vec![Transaction::UserTransaction(committed_txn)];
block_on(async {
assert!(smp
.mempool_notifier
.notify_new_commit(committed_txns, 1, 1000)
.await
.is_ok());
});
assert!(smp
.mempool_notifier
.notify_new_commit(committed_txns, 1)
.await
.is_ok());

let pool = smp.mempool.lock();
// TODO: make less brittle to broadcast buckets changes
let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10);
assert_eq!(timeline.len(), 1);
assert_eq!(timeline.first().unwrap(), &kept_txn);
// Wait until mempool handles the commit notification
let wait_for_commit = async {
let pool = smp.mempool.lock();
// TODO: make less brittle to broadcast buckets changes
let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10);
if timeline.len() == 10 && timeline.first().unwrap() == &kept_txn {
return; // Mempool handled the commit notification
}
drop(pool);

// Sleep for a while
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
};
if let Err(elasped) = timeout(std::time::Duration::from_secs(5), wait_for_commit).await {
panic!(
"Mempool did not receive the commit notification! {:?}",
elasped
);
}
}
2 changes: 1 addition & 1 deletion mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ fn setup_mempool(
let (ac_endpoint_sender, ac_endpoint_receiver) = mpsc_channel();
let (quorum_store_sender, quorum_store_receiver) = mpsc_channel();
let (mempool_notifier, mempool_listener) =
aptos_mempool_notifications::new_mempool_notifier_listener_pair();
aptos_mempool_notifications::new_mempool_notifier_listener_pair(100);

let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
let vm_validator = Arc::new(RwLock::new(MockVMValidator));
Expand Down
2 changes: 1 addition & 1 deletion state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub static STORAGE_SYNCHRONIZER_LATENCIES: Lazy<HistogramVec> = Lazy::new(|| {
"aptos_state_sync_storage_synchronizer_latencies",
"Counters related to the storage synchronizer latencies",
&["label"],
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(),
)
.unwrap()
});
Expand Down
2 changes: 1 addition & 1 deletion state-sync/state-sync-driver/src/tests/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ async fn create_driver_for_tests(
let (consensus_notifier, consensus_listener) =
aptos_consensus_notifications::new_consensus_notifier_listener_pair(5000);
let (mempool_notifier, mempool_listener) =
aptos_mempool_notifications::new_mempool_notifier_listener_pair();
aptos_mempool_notifications::new_mempool_notifier_listener_pair(100);

// Create the storage service notifier and listener
let (storage_service_notifier, storage_service_listener) =
Expand Down
2 changes: 1 addition & 1 deletion state-sync/state-sync-driver/src/tests/driver_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn test_new_initialized_configs() {
bootstrap_genesis::<AptosVM>(&db_rw, get_genesis_txn(&node_config).unwrap()).unwrap();

// Create mempool and consensus notifiers
let (mempool_notifier, _) = new_mempool_notifier_listener_pair();
let (mempool_notifier, _) = new_mempool_notifier_listener_pair(100);
let (_, consensus_listener) = new_consensus_notifier_listener_pair(0);

// Create the event subscription service and a reconfig subscriber
Expand Down
Loading

0 comments on commit d20fab9

Please sign in to comment.