Skip to content

Commit

Permalink
Add ability to have skewed shared counter traffic (MystenLabs#5991)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Nov 15, 2022
1 parent 335ff5d commit 4e7f350
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 53 deletions.
27 changes: 24 additions & 3 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ pub enum RunSpec {
// transaction in the benchmark workload
#[clap(long, default_value = "0")]
shared_counter: u32,
// 100 for max hotness i.e all requests target
// just the same shared counter, 0 for no hotness
// i.e. all requests target a different shared
// counter. The way total number of counters to
// create is computed roughly as:
// total_shared_counters = max(1, qps * (1.0 - hotness/100.0))
#[clap(long, default_value = "50")]
shared_counter_hotness_factor: u32,
// relative weight of transfer object
// transactions in the benchmark workload
#[clap(long, default_value = "1")]
Expand Down Expand Up @@ -404,8 +412,11 @@ async fn main() -> Result<()> {
stat_collection_interval,
shared_counter,
transfer_object,
shared_counter_hotness_factor,
..
} => {
let shared_counter_ratio = 1.0
- (std::cmp::min(shared_counter_hotness_factor as u32, 100) as f32 / 100.0);
let workloads = if !opts.disjoint_mode {
let mut combination_workload = make_combination_workload(
target_qps,
Expand All @@ -418,7 +429,12 @@ async fn main() -> Result<()> {
shared_counter,
transfer_object,
);
combination_workload.workload.init(arc_agg.clone()).await;
let max_ops = target_qps * in_flight_ratio;
let num_shared_counters = (max_ops as f32 * shared_counter_ratio) as u64;
combination_workload
.workload
.init(num_shared_counters, arc_agg.clone())
.await;
vec![combination_workload]
} else {
let mut workloads = vec![];
Expand All @@ -428,6 +444,8 @@ async fn main() -> Result<()> {
let shared_counter_num_workers =
(shared_counter_weight * num_workers as f32).ceil() as u64;
let shared_counter_max_ops = (shared_counter_qps * in_flight_ratio) as u64;
let num_shared_counters =
(shared_counter_max_ops as f32 * shared_counter_ratio) as u64;
if let Some(mut shared_counter_workload) = make_shared_counter_workload(
shared_counter_qps,
shared_counter_num_workers,
Expand All @@ -436,7 +454,10 @@ async fn main() -> Result<()> {
owner,
keypair.clone(),
) {
shared_counter_workload.workload.init(arc_agg.clone()).await;
shared_counter_workload
.workload
.init(num_shared_counters, arc_agg.clone())
.await;
workloads.push(shared_counter_workload);
}
let transfer_object_weight = 1.0 - shared_counter_weight;
Expand All @@ -456,7 +477,7 @@ async fn main() -> Result<()> {
) {
transfer_object_workload
.workload
.init(arc_agg.clone())
.init(num_shared_counters, arc_agg.clone())
.await;
workloads.push(transfer_object_workload);
}
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-benchmark/src/workloads/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn make_combination_workload(
primary_gas_account_owner,
primary_gas_account_keypair.clone(),
None,
vec![],
);
workloads
.entry(WorkloadType::SharedCounter)
Expand Down Expand Up @@ -71,7 +72,8 @@ pub fn make_shared_counter_workload(
if target_qps == 0 || max_in_flight_ops == 0 || num_workers == 0 {
None
} else {
let workload = SharedCounterWorkload::new_boxed(primary_gas_id, owner, keypair, None);
let workload =
SharedCounterWorkload::new_boxed(primary_gas_id, owner, keypair, None, vec![]);
Some(WorkloadInfo {
target_qps,
num_workers,
Expand Down
120 changes: 82 additions & 38 deletions crates/sui-benchmark/src/workloads/shared_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::{
};
use async_trait::async_trait;
use futures::future::join_all;
use rand::seq::SliceRandom;
use std::{path::PathBuf, sync::Arc};
use sui_types::{
base_types::{ObjectID, ObjectRef, SequenceNumber, SuiAddress},
base_types::{ObjectDigest, ObjectID, ObjectRef, SequenceNumber, SuiAddress},
crypto::{get_key_pair, AccountKeyPair},
messages::VerifiedTransaction,
object::Owner,
Expand All @@ -37,7 +38,7 @@ impl Payload for SharedCounterTestPayload {
counter_initial_shared_version: self.counter_initial_shared_version,
gas: (new_gas, self.gas.1),
sender: self.sender,
keypair: self.keypair.clone(),
keypair: self.keypair,
})
}
fn make_transaction(&self) -> VerifiedTransaction {
Expand All @@ -63,6 +64,7 @@ pub struct SharedCounterWorkload {
pub test_gas_owner: SuiAddress,
pub test_gas_keypair: Arc<AccountKeyPair>,
pub basics_package_ref: Option<ObjectRef>,
pub counters: Vec<(ObjectID, SequenceNumber, ObjectDigest)>,
}

impl SharedCounterWorkload {
Expand All @@ -71,12 +73,14 @@ impl SharedCounterWorkload {
owner: SuiAddress,
keypair: Arc<AccountKeyPair>,
basics_package_ref: Option<ObjectRef>,
counters: Vec<(ObjectID, SequenceNumber, ObjectDigest)>,
) -> Box<dyn Workload<dyn Payload>> {
Box::<dyn Workload<dyn Payload>>::from(Box::new(SharedCounterWorkload {
test_gas: gas,
test_gas_owner: owner,
test_gas_keypair: keypair,
basics_package_ref,
counters,
}))
}
}
Expand All @@ -96,16 +100,16 @@ pub async fn publish_basics_package(

#[async_trait]
impl Workload<dyn Payload> for SharedCounterWorkload {
async fn init(&mut self, proxy: Arc<dyn ValidatorProxy + Sync + Send>) {
async fn init(&mut self, num_counters: u64, proxy: Arc<dyn ValidatorProxy + Sync + Send>) {
if self.basics_package_ref.is_some() {
return;
}
// publish basics package
let primary_gas = proxy.get_object(self.test_gas).await.unwrap();
let primary_gas_ref = primary_gas.compute_object_reference();
let mut primary_gas_ref = primary_gas.compute_object_reference();
let mut publish_module_gas_ref = None;
let (address, keypair) = get_key_pair();
if let Some((_updated, minted)) = transfer_sui_for_testing(
if let Some((updated, minted)) = transfer_sui_for_testing(
(primary_gas_ref, Owner::AddressOwner(self.test_gas_owner)),
&self.test_gas_keypair,
MAX_GAS_FOR_TESTING,
Expand All @@ -115,19 +119,66 @@ impl Workload<dyn Payload> for SharedCounterWorkload {
.await
{
publish_module_gas_ref = Some((address, keypair, minted));
primary_gas_ref = updated;
}
// Publish basics package
eprintln!("Publishing basics package");
let publish_module_gas = publish_module_gas_ref.unwrap();
self.basics_package_ref = Some(
publish_basics_package(
publish_module_gas.2,
proxy,
proxy.clone(),
publish_module_gas.0,
&publish_module_gas.1,
)
.await,
)
);
if !self.counters.is_empty() {
// We already initialized the workload with some counters
return;
}
// create counters
let num_counters = std::cmp::max(num_counters as usize, 1);
eprintln!(
"Creating {:?} shared counters, this may take a while..",
num_counters
);
// Make as many gas objects as the number of actual unique counters
// This gas is used for creating the counters
let mut counters_gas = vec![];
for _ in 0..num_counters {
let (address, keypair) = get_key_pair();
if let Some((updated, minted)) = transfer_sui_for_testing(
(primary_gas_ref, Owner::AddressOwner(self.test_gas_owner)),
&self.test_gas_keypair,
MAX_GAS_FOR_TESTING,
address,
proxy.clone(),
)
.await
{
primary_gas_ref = updated;
counters_gas.push((address, keypair, minted));
}
}
let mut futures = vec![];
for (sender, keypair, gas) in counters_gas.iter() {
let transaction = make_counter_create_transaction(
*gas,
self.basics_package_ref.unwrap(),
*sender,
keypair,
);
let proxy_ref = proxy.clone();
futures.push(async move {
if let Ok((_, effects)) = proxy_ref.execute_transaction(transaction.into()).await {
effects.created()[0].0
} else {
panic!("Failed to create shared counter!");
}
});
}
self.counters = join_all(futures).await;
}
async fn make_test_payloads(
&self,
Expand All @@ -137,10 +188,10 @@ impl Workload<dyn Payload> for SharedCounterWorkload {
// Read latest test gas object
let primary_gas = proxy.get_object(self.test_gas).await.unwrap();
let mut primary_gas_ref = primary_gas.compute_object_reference();
// Make as many gas objects as the number of counters
// Make as many gas objects as the number of payloads
let mut counters_gas = vec![];
for _ in 0..count {
let (address, keypair) = get_key_pair();
let (address, keypair) = get_key_pair::<AccountKeyPair>();
if let Some((updated, minted)) = transfer_sui_for_testing(
(primary_gas_ref, Owner::AddressOwner(self.test_gas_owner)),
&self.test_gas_keypair,
Expand All @@ -151,39 +202,32 @@ impl Workload<dyn Payload> for SharedCounterWorkload {
.await
{
primary_gas_ref = updated;
counters_gas.push((address, keypair, minted));
counters_gas.push((address, Arc::new(keypair), minted));
}
}
let proxy_ref = &proxy;
// create counters using gas objects we created above
eprintln!("Creating shared counters, this may take a while..");
let futures = counters_gas
.into_iter()
.map(|(sender, keypair, gas)| async move {
let transaction = make_counter_create_transaction(
gas,
self.basics_package_ref.unwrap(),
sender,
&keypair,
);
if let Ok((_, effects)) = proxy_ref.execute_transaction(transaction.into()).await {
let counter_ref = effects.created()[0].0;
Box::new(SharedCounterTestPayload {
package_ref: self.basics_package_ref.unwrap(),
counter_id: counter_ref.0,
counter_initial_shared_version: counter_ref.1,
gas: effects.gas_object(),
sender,
keypair: Arc::new(keypair),
})
} else {
panic!("Failed to create shared counter!");
}
});
join_all(futures)
.await
eprintln!("Creating shared txn payloads, hang tight..");
let mut shared_payloads = vec![];
for i in 0..count {
let (sender, keypair, gas) = &counters_gas[i as usize];
// pick a random counter from the pool
let counter_ref = self
.counters
.choose(&mut rand::thread_rng())
.expect("Failed to get a random counter from the pool");
shared_payloads.push(Box::new(SharedCounterTestPayload {
package_ref: self.basics_package_ref.unwrap(),
counter_id: counter_ref.0,
counter_initial_shared_version: counter_ref.1,
gas: (*gas, Owner::AddressOwner(*sender)),
sender: *sender,
keypair: keypair.clone(),
}));
}
let payloads: Vec<Box<dyn Payload>> = shared_payloads
.into_iter()
.map(|b| Box::<dyn Payload>::from(b))
.collect()
.collect();
payloads
}
}
8 changes: 6 additions & 2 deletions crates/sui-benchmark/src/workloads/transfer_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ impl TransferObjectWorkload {

#[async_trait]
impl Workload<dyn Payload> for TransferObjectWorkload {
async fn init(&mut self, _proxy: Arc<dyn ValidatorProxy + Sync + Send>) {
async fn init(
&mut self,
_num_shared_counters: u64,
_proxy: Arc<dyn ValidatorProxy + Sync + Send>,
) {
return;
}
async fn make_test_payloads(
Expand Down Expand Up @@ -148,7 +152,7 @@ impl Workload<dyn Payload> for TransferObjectWorkload {
}
transfer_gas.push(account_transfer_gas);
}
eprintln!("Creating objects to transfer..");
eprintln!("Creating transfer object txns, almost done..");
// create transfer objects with 1 SUI value each
let mut transfer_objects: Vec<Gas> = vec![];
for _i in 0..count {
Expand Down
14 changes: 11 additions & 3 deletions crates/sui-benchmark/src/workloads/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ impl fmt::Display for WorkloadType {

#[async_trait]
pub trait Workload<T: Payload + ?Sized>: Send + Sync {
async fn init(&mut self, proxy: Arc<dyn ValidatorProxy + Sync + Send>);
async fn init(
&mut self,
num_shared_counters: u64,
proxy: Arc<dyn ValidatorProxy + Sync + Send>,
);
async fn make_test_payloads(
&self,
count: u64,
Expand All @@ -153,9 +157,13 @@ pub struct CombinationWorkload {

#[async_trait]
impl Workload<dyn Payload> for CombinationWorkload {
async fn init(&mut self, proxy: Arc<dyn ValidatorProxy + Sync + Send>) {
async fn init(
&mut self,
num_shared_counters: u64,
proxy: Arc<dyn ValidatorProxy + Sync + Send>,
) {
for (_, (_, workload)) in self.workloads.iter_mut() {
workload.init(proxy.clone()).await;
workload.init(num_shared_counters, proxy.clone()).await;
}
}
async fn make_test_payloads(
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod test {
);

for w in workloads.iter_mut() {
w.workload.init(proxy.clone()).await;
w.workload.init(5, proxy.clone()).await;
}

let driver = BenchDriver::new(5);
Expand Down
15 changes: 13 additions & 2 deletions crates/sui-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use std::time::Duration;
use sui_types::{
base_types::{dbg_addr, ObjectID, TransactionDigest},
batch::UpdateItem,
crypto::{get_key_pair, AccountKeyPair, Signature},
crypto::{
get_key_pair, AccountKeyPair, AuthoritySignInfo, AuthoritySignature, Signable, Signature,
},
messages::{
BatchInfoRequest, BatchInfoResponseItem, Transaction, TransactionData, VerifiedTransaction,
},
Expand All @@ -20,7 +22,6 @@ use sui_types::{
use futures::StreamExt;
use sui_types::base_types::{random_object_ref, AuthorityName, ExecutionDigests};
use sui_types::committee::Committee;
use sui_types::crypto::{AuthoritySignInfo, AuthoritySignature};
use sui_types::gas::GasCostSummary;
use sui_types::messages::{CertifiedTransaction, ExecutionStatus, TransactionEffects};
use sui_types::object::Owner;
Expand Down Expand Up @@ -150,6 +151,16 @@ pub fn to_sender_signed_transaction(
VerifiedTransaction::new_unchecked(Transaction::from_data(data, signature))
}

pub fn to_sender_signed_transaction_arc(
data: TransactionData,
signer: &Arc<fastcrypto::ed25519::Ed25519KeyPair>,
) -> VerifiedTransaction {
let mut message = Vec::new();
data.write(&mut message);
let signature: Signature = signer.sign(&message);
VerifiedTransaction::new_unchecked(Transaction::from_data(data, signature))
}

pub fn dummy_transaction_effects(tx: &Transaction) -> TransactionEffects {
TransactionEffects {
status: ExecutionStatus::Success,
Expand Down
Loading

0 comments on commit 4e7f350

Please sign in to comment.