diff --git a/Cargo.lock b/Cargo.lock index 54f2e29044b63..058714be00b63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2899,16 +2899,18 @@ name = "diemdb-benchmark" version = "0.1.0" dependencies = [ "anyhow", - "byteorder", "diem-config", "diem-crypto", + "diem-genesis-tool", "diem-jellyfish-merkle", "diem-types", + "diem-vm", "diem-workspace-hack", "diemdb", + "executor", + "executor-benchmark", "indicatif", "itertools 0.10.1", - "rand 0.8.3", "storage-interface", "structopt 0.3.21", ] @@ -7140,6 +7142,7 @@ name = "scratchpad-benchmark" version = "0.1.0" dependencies = [ "anyhow", + "byteorder", "diem-config", "diem-crypto", "diem-genesis-tool", diff --git a/execution/executor-benchmark/benches/executor_benchmark.rs b/execution/executor-benchmark/benches/executor_benchmark.rs index 9c15f19cb4063..a96dbbbd9c73b 100644 --- a/execution/executor-benchmark/benches/executor_benchmark.rs +++ b/execution/executor-benchmark/benches/executor_benchmark.rs @@ -3,7 +3,8 @@ use criterion::{criterion_group, criterion_main, measurement::Measurement, BatchSize, Criterion}; use executor_benchmark::{ - create_storage_service_and_executor, TransactionExecutor, TransactionGenerator, + create_storage_service_and_executor, transaction_executor::TransactionExecutor, + transaction_generator::TransactionGenerator, }; use std::sync::Arc; @@ -27,7 +28,7 @@ fn executor_benchmark(c: &mut Criterion) { let mut generator = TransactionGenerator::new(genesis_key, NUM_ACCOUNTS); let (commit_tx, _commit_rx) = std::sync::mpsc::channel(); - let mut executor = TransactionExecutor::new(executor, parent_block_id, commit_tx); + let mut executor = TransactionExecutor::new(executor, parent_block_id, Some(commit_tx)); let txns = generator.gen_account_creations(SMALL_BLOCK_SIZE); for txn_block in txns { executor.execute_block(txn_block); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index 05a277bdd3553..aa22a2da7494c 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -1,420 +1,34 @@ // Copyright (c) The Diem Core Contributors // SPDX-License-Identifier: Apache-2.0 +pub mod transaction_committer; +pub mod transaction_executor; +pub mod transaction_generator; + +use crate::{ + transaction_committer::TransactionCommitter, transaction_executor::TransactionExecutor, + transaction_generator::TransactionGenerator, +}; use diem_config::{ config::{NodeConfig, RocksdbConfig}, utils::get_genesis_txn, }; -use diem_crypto::{ - ed25519::{Ed25519PrivateKey, Ed25519PublicKey}, - hash::HashValue, - PrivateKey, SigningKey, Uniform, -}; use diem_logger::prelude::*; -use diem_transaction_builder::stdlib::{ - encode_create_parent_vasp_account_script, encode_peer_to_peer_with_metadata_script, -}; -use diem_types::{ - account_address::AccountAddress, - account_config::{ - testnet_dd_account_address, treasury_compliance_account_address, xus_tag, AccountResource, - XUS_NAME, - }, - block_info::BlockInfo, - chain_id::ChainId, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, - protocol_spec::DpnProto, - transaction::{ - authenticator::AuthenticationKey, RawTransaction, Script, SignedTransaction, Transaction, - Version, - }, -}; +use diem_types::protocol_spec::DpnProto; use diem_vm::DiemVM; -use diemdb::{metrics::DIEM_STORAGE_API_LATENCY_SECONDS, DiemDB}; +use diemdb::DiemDB; use executor::{ db_bootstrapper::{generate_waypoint, maybe_bootstrap}, - metrics::{ - DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS, DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS, - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS, - }, Executor, }; -use executor_types::BlockExecutor; -use rand::{rngs::StdRng, SeedableRng}; use std::{ - collections::BTreeMap, - convert::TryFrom, path::PathBuf, sync::{mpsc, Arc}, - time::{Duration, Instant}, }; use storage_client::StorageClient; use storage_interface::{default_protocol::DbReaderWriter, DbReader}; use storage_service::start_storage_service_with_db; -fn report_block( - version: Version, - global_start_time: Instant, - execution_start_time: Instant, - execution_time: Duration, - commit_time: Duration, - block_size: usize, -) { - info!( - "Version: {}. latency: {} ms, execute time: {} ms. commit time: {} ms. TPS: {:.0}. Accumulative TPS: {:.0}", - version, - Instant::now().duration_since(execution_start_time).as_millis(), - execution_time.as_millis(), - commit_time.as_millis(), - block_size as f64 / (std::cmp::max(execution_time, commit_time)).as_secs_f64(), - version as f64 / global_start_time.elapsed().as_secs_f64(), - ); - info!( - "Accumulative total: VM time: {:.0} secs, executor time: {:.0} secs, commit time: {:.0} secs, DB commit time: {:.0} secs", - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(), - DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.get_sample_sum() - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(), - DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum(), - DIEM_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(), - ); - const NANOS_PER_SEC: f64 = 1_000_000_000.0; - info!( - "Accumulative per transaction: VM time: {:.0} ns, executor time: {:.0} ns, commit time: {:.0} ns, DB commit time: {:.0} ns", - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum() * NANOS_PER_SEC - / version as f64, - (DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.get_sample_sum() - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum()) * NANOS_PER_SEC - / version as f64, - DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum() * NANOS_PER_SEC - / version as f64, - DIEM_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum() * NANOS_PER_SEC - / version as f64, - ); -} - -struct AccountData { - private_key: Ed25519PrivateKey, - public_key: Ed25519PublicKey, - address: AccountAddress, - sequence_number: u64, -} - -impl AccountData { - pub fn auth_key_prefix(&self) -> Vec { - AuthenticationKey::ed25519(&self.public_key) - .prefix() - .to_vec() - } -} - -pub struct TransactionGenerator { - /// The current state of the accounts. The main purpose is to keep track of the sequence number - /// so generated transactions are guaranteed to be successfully executed. - accounts: Vec, - - /// Used to mint accounts. - genesis_key: Ed25519PrivateKey, - - /// For deterministic transaction generation. - rng: StdRng, - - /// Each generated block of transactions are sent to this channel. Using `SyncSender` to make - /// sure if execution is slow to consume the transactions, we do not run out of memory. - block_sender: Option>>, -} - -impl TransactionGenerator { - pub fn new(genesis_key: Ed25519PrivateKey, num_accounts: usize) -> Self { - Self::new_impl(genesis_key, num_accounts, None) - } - - pub fn new_with_sender( - genesis_key: Ed25519PrivateKey, - num_accounts: usize, - block_sender: mpsc::SyncSender>, - ) -> Self { - Self::new_impl(genesis_key, num_accounts, Some(block_sender)) - } - - fn new_impl( - genesis_key: Ed25519PrivateKey, - num_accounts: usize, - block_sender: Option>>, - ) -> Self { - let seed = [1u8; 32]; - let mut rng = StdRng::from_seed(seed); - - let mut accounts = Vec::with_capacity(num_accounts); - for _i in 0..num_accounts { - let private_key = Ed25519PrivateKey::generate(&mut rng); - let public_key = private_key.public_key(); - let address = diem_types::account_address::from_public_key(&public_key); - let account = AccountData { - private_key, - public_key, - address, - sequence_number: 0, - }; - accounts.push(account); - } - - Self { - accounts, - genesis_key, - rng, - block_sender, - } - } - - pub fn run( - &mut self, - init_account_balance: u64, - block_size: usize, - num_transfer_blocks: usize, - ) { - assert!(self.block_sender.is_some()); - - self.gen_account_creations(block_size); - self.gen_mint_transactions(init_account_balance, block_size); - self.gen_transfer_transactions(block_size, num_transfer_blocks); - } - - pub fn gen_account_creations(&self, block_size: usize) -> Vec> { - let tc_account = treasury_compliance_account_address(); - let mut txn_block = vec![]; - - for (i, block) in self.accounts.chunks(block_size).enumerate() { - let mut transactions = Vec::with_capacity(block_size); - for (j, account) in block.iter().enumerate() { - let txn = create_transaction( - tc_account, - (i * block_size + j) as u64, - &self.genesis_key, - self.genesis_key.public_key(), - encode_create_parent_vasp_account_script( - xus_tag(), - 0, - account.address, - account.auth_key_prefix(), - vec![], - false, /* add all currencies */ - ), - ); - transactions.push(txn); - } - if let Some(sender) = &self.block_sender { - sender.send(transactions).unwrap(); - } else { - txn_block.push(transactions); - } - } - txn_block - } - - /// Generates transactions that allocate `init_account_balance` to every account. - pub fn gen_mint_transactions( - &self, - init_account_balance: u64, - block_size: usize, - ) -> Vec> { - let testnet_dd_account = testnet_dd_account_address(); - let mut txn_block = vec![]; - - for (i, block) in self.accounts.chunks(block_size).enumerate() { - let mut transactions = Vec::with_capacity(block_size); - for (j, account) in block.iter().enumerate() { - let txn = create_transaction( - testnet_dd_account, - (i * block_size + j) as u64, - &self.genesis_key, - self.genesis_key.public_key(), - encode_peer_to_peer_with_metadata_script( - xus_tag(), - account.address, - init_account_balance, - vec![], - vec![], - ), - ); - transactions.push(txn); - } - if let Some(sender) = &self.block_sender { - sender.send(transactions).unwrap(); - } else { - txn_block.push(transactions); - } - } - txn_block - } - - /// Generates transactions for random pairs of accounts. - pub fn gen_transfer_transactions( - &mut self, - block_size: usize, - num_blocks: usize, - ) -> Vec> { - let mut txn_block = vec![]; - for _i in 0..num_blocks { - let mut transactions = Vec::with_capacity(block_size); - for _j in 0..block_size { - let indices = rand::seq::index::sample(&mut self.rng, self.accounts.len(), 2); - let sender_idx = indices.index(0); - let receiver_idx = indices.index(1); - - let sender = &self.accounts[sender_idx]; - let receiver = &self.accounts[receiver_idx]; - let txn = create_transaction( - sender.address, - sender.sequence_number, - &sender.private_key, - sender.public_key.clone(), - encode_peer_to_peer_with_metadata_script( - xus_tag(), - receiver.address, - 1, /* amount */ - vec![], - vec![], - ), - ); - transactions.push(txn); - - self.accounts[sender_idx].sequence_number += 1; - } - if let Some(sender) = &self.block_sender { - sender.send(transactions).unwrap(); - } else { - txn_block.push(transactions); - } - } - txn_block - } - - /// Verifies the sequence numbers in storage match what we have locally. - fn verify_sequence_number(&self, db: &dyn DbReader) { - for account in &self.accounts { - let address = account.address; - let blob = db - .get_latest_account_state(address) - .expect("Failed to query storage.") - .expect("Account must exist."); - let account_resource = AccountResource::try_from(&blob).unwrap(); - assert_eq!(account_resource.sequence_number(), account.sequence_number); - } - } - - /// Drops the sender to notify the receiving end of the channel. - fn drop_sender(&mut self) { - self.block_sender.take().unwrap(); - } -} - -pub struct TransactionCommitter { - executor: Arc>, - block_receiver: mpsc::Receiver<(HashValue, HashValue, Instant, Instant, Duration, usize)>, -} - -impl TransactionCommitter { - fn new( - executor: Arc>, - block_receiver: mpsc::Receiver<(HashValue, HashValue, Instant, Instant, Duration, usize)>, - ) -> Self { - Self { - executor, - block_receiver, - } - } - - fn run(&mut self) { - let mut version = 0; - while let Ok(( - block_id, - root_hash, - global_start_time, - execution_start_time, - execution_time, - num_txns, - )) = self.block_receiver.recv() - { - version += num_txns as u64; - let commit_start = std::time::Instant::now(); - let block_info = BlockInfo::new( - 1, /* epoch */ - 0, /* round, doesn't matter */ - block_id, /* id, doesn't matter */ - root_hash, version, 0, /* timestamp_usecs, doesn't matter */ - None, /* next_epoch_state */ - ); - let ledger_info = LedgerInfo::new( - block_info, - HashValue::zero(), /* consensus_data_hash, doesn't matter */ - ); - let ledger_info_with_sigs = - LedgerInfoWithSignatures::new(ledger_info, BTreeMap::new() /* signatures */); - - self.executor - .commit_blocks(vec![block_id], ledger_info_with_sigs) - .unwrap(); - - report_block( - version, - global_start_time, - execution_start_time, - execution_time, - Instant::now().duration_since(commit_start), - num_txns, - ); - } - } -} - -pub struct TransactionExecutor { - executor: Arc>, - parent_block_id: HashValue, - start_time: Instant, - version: u64, - commit_sender: mpsc::Sender<(HashValue, HashValue, Instant, Instant, Duration, usize)>, -} - -impl TransactionExecutor { - pub fn new( - executor: Arc>, - parent_block_id: HashValue, - commit_sender: mpsc::Sender<(HashValue, HashValue, Instant, Instant, Duration, usize)>, - ) -> Self { - Self { - executor, - parent_block_id, - version: 0, - start_time: Instant::now(), - commit_sender, - } - } - - pub fn execute_block(&mut self, transactions: Vec) { - let num_txns = transactions.len(); - self.version += num_txns as u64; - - let execution_start = std::time::Instant::now(); - - let block_id = HashValue::random(); - let output = self - .executor - .execute_block((block_id, transactions), self.parent_block_id) - .unwrap(); - - self.parent_block_id = block_id; - - self.commit_sender - .send(( - block_id, - output.root_hash(), - self.start_time, - execution_start, - Instant::now().duration_since(execution_start), - num_txns, - )) - .unwrap(); - } -} - pub fn create_storage_service_and_executor( config: &NodeConfig, ) -> (Arc>, Executor) { @@ -466,14 +80,16 @@ pub fn run_benchmark( .spawn(move || { let mut generator = TransactionGenerator::new_with_sender(genesis_key, num_accounts, block_sender); - generator.run(init_account_balance, block_size, num_transfer_blocks); + generator.run_mint(init_account_balance, block_size); + generator.run_transfer(block_size, num_transfer_blocks); generator }) .expect("Failed to spawn transaction generator thread."); let exe_thread = std::thread::Builder::new() .name("txn_executor".to_string()) .spawn(move || { - let mut exe = TransactionExecutor::new(executor_1, parent_block_id, commit_sender); + let mut exe = + TransactionExecutor::new(executor_1, parent_block_id, Some(commit_sender)); while let Ok(transactions) = block_receiver.recv() { info!("Received block of size {:?} to execute", transactions.len()); exe.execute_block(transactions); @@ -499,32 +115,6 @@ pub fn run_benchmark( generator.verify_sequence_number(db.as_ref()); } -fn create_transaction( - sender: AccountAddress, - sequence_number: u64, - private_key: &Ed25519PrivateKey, - public_key: Ed25519PublicKey, - program: Script, -) -> Transaction { - let now = diem_infallible::duration_since_epoch(); - let expiration_time = now.as_secs() + 3600; - - let raw_txn = RawTransaction::new_script( - sender, - sequence_number, - program, - 1_000_000, /* max_gas_amount */ - 0, /* gas_unit_price */ - XUS_NAME.to_owned(), /* gas_currency_code */ - expiration_time, - ChainId::test(), - ); - - let signature = private_key.sign(&raw_txn); - let signed_txn = SignedTransaction::new(raw_txn, public_key, signature); - Transaction::UserTransaction(signed_txn) -} - #[cfg(test)] mod tests { #[test] diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs new file mode 100644 index 0000000000000..745b969456d81 --- /dev/null +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -0,0 +1,129 @@ +// Copyright (c) The Diem Core Contributors +// SPDX-License-Identifier: Apache-2.0 + +use diem_crypto::hash::HashValue; +use diem_logger::prelude::*; +use diem_types::{ + block_info::BlockInfo, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + protocol_spec::DpnProto, + transaction::Version, +}; +use diem_vm::DiemVM; +use diemdb::metrics::DIEM_STORAGE_API_LATENCY_SECONDS; +use executor::{ + metrics::{ + DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS, DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS, + DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS, + }, + Executor, +}; +use executor_types::BlockExecutor; +use std::{ + collections::BTreeMap, + sync::{mpsc, Arc}, + time::{Duration, Instant}, +}; + +pub(crate) fn gen_li_with_sigs( + block_id: HashValue, + root_hash: HashValue, + version: Version, +) -> LedgerInfoWithSignatures { + let block_info = BlockInfo::new( + 1, /* epoch */ + 0, /* round, doesn't matter */ + block_id, /* id, doesn't matter */ + root_hash, version, 0, /* timestamp_usecs, doesn't matter */ + None, /* next_epoch_state */ + ); + let ledger_info = LedgerInfo::new( + block_info, + HashValue::zero(), /* consensus_data_hash, doesn't matter */ + ); + LedgerInfoWithSignatures::new(ledger_info, BTreeMap::new() /* signatures */) +} + +pub struct TransactionCommitter { + executor: Arc>, + block_receiver: mpsc::Receiver<(HashValue, HashValue, Instant, Instant, Duration, usize)>, +} + +impl TransactionCommitter { + pub fn new( + executor: Arc>, + block_receiver: mpsc::Receiver<(HashValue, HashValue, Instant, Instant, Duration, usize)>, + ) -> Self { + Self { + executor, + block_receiver, + } + } + + pub fn run(&mut self) { + let mut version = 0; + while let Ok(( + block_id, + root_hash, + global_start_time, + execution_start_time, + execution_time, + num_txns, + )) = self.block_receiver.recv() + { + version += num_txns as u64; + let commit_start = std::time::Instant::now(); + let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, version); + self.executor + .commit_blocks(vec![block_id], ledger_info_with_sigs) + .unwrap(); + + report_block( + version, + global_start_time, + execution_start_time, + execution_time, + Instant::now().duration_since(commit_start), + num_txns, + ); + } + } +} + +fn report_block( + version: Version, + global_start_time: Instant, + execution_start_time: Instant, + execution_time: Duration, + commit_time: Duration, + block_size: usize, +) { + info!( + "Version: {}. latency: {} ms, execute time: {} ms. commit time: {} ms. TPS: {:.0}. Accumulative TPS: {:.0}", + version, + Instant::now().duration_since(execution_start_time).as_millis(), + execution_time.as_millis(), + commit_time.as_millis(), + block_size as f64 / (std::cmp::max(execution_time, commit_time)).as_secs_f64(), + version as f64 / global_start_time.elapsed().as_secs_f64(), + ); + info!( + "Accumulative total: VM time: {:.0} secs, executor time: {:.0} secs, commit time: {:.0} secs, DB commit time: {:.0} secs", + DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(), + DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.get_sample_sum() - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(), + DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum(), + DIEM_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(), + ); + const NANOS_PER_SEC: f64 = 1_000_000_000.0; + info!( + "Accumulative per transaction: VM time: {:.0} ns, executor time: {:.0} ns, commit time: {:.0} ns, DB commit time: {:.0} ns", + DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum() * NANOS_PER_SEC + / version as f64, + (DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.get_sample_sum() - DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum()) * NANOS_PER_SEC + / version as f64, + DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum() * NANOS_PER_SEC + / version as f64, + DIEM_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum() * NANOS_PER_SEC + / version as f64, + ); +} diff --git a/execution/executor-benchmark/src/transaction_executor.rs b/execution/executor-benchmark/src/transaction_executor.rs new file mode 100644 index 0000000000000..ce989fce821e7 --- /dev/null +++ b/execution/executor-benchmark/src/transaction_executor.rs @@ -0,0 +1,79 @@ +// Copyright (c) The Diem Core Contributors +// SPDX-License-Identifier: Apache-2.0 + +use diem_crypto::hash::HashValue; +use diem_types::{ + protocol_spec::DpnProto, + transaction::{Transaction, Version}, +}; +use diem_vm::DiemVM; +use executor::Executor; +use executor_types::BlockExecutor; +use std::{ + sync::{mpsc, Arc}, + time::{Duration, Instant}, +}; + +pub struct TransactionExecutor { + executor: Arc>, + parent_block_id: HashValue, + start_time: Instant, + version: Version, + // If commit_sender is `None`, we will commit all the execution result immediately in this struct. + commit_sender: Option>, +} + +impl TransactionExecutor { + pub fn new( + executor: Arc>, + parent_block_id: HashValue, + commit_sender: Option< + mpsc::Sender<(HashValue, HashValue, Instant, Instant, Duration, usize)>, + >, + ) -> Self { + Self { + executor, + parent_block_id, + version: 0, + start_time: Instant::now(), + commit_sender, + } + } + + pub fn execute_block(&mut self, transactions: Vec) { + let num_txns = transactions.len(); + self.version += num_txns as Version; + + let execution_start = std::time::Instant::now(); + + let block_id = HashValue::random(); + let output = self + .executor + .execute_block((block_id, transactions), self.parent_block_id) + .unwrap(); + + self.parent_block_id = block_id; + + if let Some(ref commit_sender) = self.commit_sender { + commit_sender + .send(( + block_id, + output.root_hash(), + self.start_time, + execution_start, + Instant::now().duration_since(execution_start), + num_txns, + )) + .unwrap(); + } else { + let ledger_info_with_sigs = super::transaction_committer::gen_li_with_sigs( + block_id, + output.root_hash(), + self.version, + ); + self.executor + .commit_blocks(vec![block_id], ledger_info_with_sigs) + .unwrap(); + } + } +} diff --git a/execution/executor-benchmark/src/transaction_generator.rs b/execution/executor-benchmark/src/transaction_generator.rs new file mode 100644 index 0000000000000..239735100b19a --- /dev/null +++ b/execution/executor-benchmark/src/transaction_generator.rs @@ -0,0 +1,279 @@ +// Copyright (c) The Diem Core Contributors +// SPDX-License-Identifier: Apache-2.0 + +use diem_crypto::{ + ed25519::{Ed25519PrivateKey, Ed25519PublicKey}, + PrivateKey, SigningKey, Uniform, +}; +use diem_transaction_builder::stdlib::{ + encode_create_parent_vasp_account_script, encode_peer_to_peer_with_metadata_script, +}; +use diem_types::{ + account_address::AccountAddress, + account_config::{ + testnet_dd_account_address, treasury_compliance_account_address, xus_tag, AccountResource, + XUS_NAME, + }, + chain_id::ChainId, + protocol_spec::DpnProto, + transaction::{ + authenticator::AuthenticationKey, RawTransaction, Script, SignedTransaction, Transaction, + Version, + }, +}; +use rand::{rngs::StdRng, SeedableRng}; +use std::{convert::TryFrom, sync::mpsc}; +use storage_interface::DbReader; + +struct AccountData { + private_key: Ed25519PrivateKey, + public_key: Ed25519PublicKey, + address: AccountAddress, + sequence_number: u64, +} + +impl AccountData { + pub fn auth_key_prefix(&self) -> Vec { + AuthenticationKey::ed25519(&self.public_key) + .prefix() + .to_vec() + } +} + +pub struct TransactionGenerator { + /// The current state of the accounts. The main purpose is to keep track of the sequence number + /// so generated transactions are guaranteed to be successfully executed. + accounts: Vec, + + /// Used to mint accounts. + genesis_key: Ed25519PrivateKey, + + /// Record the number of txns generated. + version: Version, + + /// For deterministic transaction generation. + rng: StdRng, + + /// Each generated block of transactions are sent to this channel. Using `SyncSender` to make + /// sure if execution is slow to consume the transactions, we do not run out of memory. + block_sender: Option>>, +} + +impl TransactionGenerator { + pub fn new(genesis_key: Ed25519PrivateKey, num_accounts: usize) -> Self { + Self::new_impl(genesis_key, num_accounts, None) + } + + pub fn new_with_sender( + genesis_key: Ed25519PrivateKey, + num_accounts: usize, + block_sender: mpsc::SyncSender>, + ) -> Self { + Self::new_impl(genesis_key, num_accounts, Some(block_sender)) + } + + fn new_impl( + genesis_key: Ed25519PrivateKey, + num_accounts: usize, + block_sender: Option>>, + ) -> Self { + let seed = [1u8; 32]; + let mut rng = StdRng::from_seed(seed); + + let mut accounts = Vec::with_capacity(num_accounts); + for _i in 0..num_accounts { + let private_key = Ed25519PrivateKey::generate(&mut rng); + let public_key = private_key.public_key(); + let address = diem_types::account_address::from_public_key(&public_key); + let account = AccountData { + private_key, + public_key, + address, + sequence_number: 0, + }; + accounts.push(account); + } + + Self { + accounts, + genesis_key, + version: 0, + rng, + block_sender, + } + } + + pub fn version(&self) -> Version { + self.version + } + + pub fn run_mint(&mut self, init_account_balance: u64, block_size: usize) { + assert!(self.block_sender.is_some()); + self.gen_account_creations(block_size); + self.gen_mint_transactions(init_account_balance, block_size); + } + + pub fn run_transfer(&mut self, block_size: usize, num_transfer_blocks: usize) { + assert!(self.block_sender.is_some()); + self.gen_transfer_transactions(block_size, num_transfer_blocks); + } + + pub fn gen_account_creations(&mut self, block_size: usize) -> Vec> { + let tc_account = treasury_compliance_account_address(); + let mut txn_block = vec![]; + + for (i, block) in self.accounts.chunks(block_size).enumerate() { + let mut transactions = Vec::with_capacity(block_size); + for (j, account) in block.iter().enumerate() { + let txn = create_transaction( + tc_account, + (i * block_size + j) as u64, + &self.genesis_key, + self.genesis_key.public_key(), + encode_create_parent_vasp_account_script( + xus_tag(), + 0, + account.address, + account.auth_key_prefix(), + vec![], + false, /* add all currencies */ + ), + ); + transactions.push(txn); + } + self.version += transactions.len() as Version; + if let Some(sender) = &self.block_sender { + sender.send(transactions).unwrap(); + } else { + txn_block.push(transactions); + } + } + txn_block + } + + /// Generates transactions that allocate `init_account_balance` to every account. + pub fn gen_mint_transactions( + &mut self, + init_account_balance: u64, + block_size: usize, + ) -> Vec> { + let testnet_dd_account = testnet_dd_account_address(); + let mut txn_block = vec![]; + + for (i, block) in self.accounts.chunks(block_size).enumerate() { + let mut transactions = Vec::with_capacity(block_size); + for (j, account) in block.iter().enumerate() { + let txn = create_transaction( + testnet_dd_account, + (i * block_size + j) as u64, + &self.genesis_key, + self.genesis_key.public_key(), + encode_peer_to_peer_with_metadata_script( + xus_tag(), + account.address, + init_account_balance, + vec![], + vec![], + ), + ); + transactions.push(txn); + } + self.version += transactions.len() as Version; + + if let Some(sender) = &self.block_sender { + sender.send(transactions).unwrap(); + } else { + txn_block.push(transactions); + } + } + txn_block + } + + /// Generates transactions for random pairs of accounts. + pub fn gen_transfer_transactions( + &mut self, + block_size: usize, + num_blocks: usize, + ) -> Vec> { + let mut txn_block = vec![]; + for _i in 0..num_blocks { + let mut transactions = Vec::with_capacity(block_size); + for _j in 0..block_size { + let indices = rand::seq::index::sample(&mut self.rng, self.accounts.len(), 2); + let sender_idx = indices.index(0); + let receiver_idx = indices.index(1); + + let sender = &self.accounts[sender_idx]; + let receiver = &self.accounts[receiver_idx]; + let txn = create_transaction( + sender.address, + sender.sequence_number, + &sender.private_key, + sender.public_key.clone(), + encode_peer_to_peer_with_metadata_script( + xus_tag(), + receiver.address, + 1, /* amount */ + vec![], + vec![], + ), + ); + transactions.push(txn); + + self.accounts[sender_idx].sequence_number += 1; + } + self.version += transactions.len() as Version; + + if let Some(sender) = &self.block_sender { + sender.send(transactions).unwrap(); + } else { + txn_block.push(transactions); + } + } + txn_block + } + + /// Verifies the sequence numbers in storage match what we have locally. + pub fn verify_sequence_number(&self, db: &dyn DbReader) { + for account in &self.accounts { + let address = account.address; + let blob = db + .get_latest_account_state(address) + .expect("Failed to query storage.") + .expect("Account must exist."); + let account_resource = AccountResource::try_from(&blob).unwrap(); + assert_eq!(account_resource.sequence_number(), account.sequence_number); + } + } + + /// Drops the sender to notify the receiving end of the channel. + pub fn drop_sender(&mut self) { + self.block_sender.take().unwrap(); + } +} + +fn create_transaction( + sender: AccountAddress, + sequence_number: u64, + private_key: &Ed25519PrivateKey, + public_key: Ed25519PublicKey, + program: Script, +) -> Transaction { + let now = diem_infallible::duration_since_epoch(); + let expiration_time = now.as_secs() + 3600; + + let raw_txn = RawTransaction::new_script( + sender, + sequence_number, + program, + 1_000_000, /* max_gas_amount */ + 0, /* gas_unit_price */ + XUS_NAME.to_owned(), /* gas_currency_code */ + expiration_time, + ChainId::test(), + ); + + let signature = private_key.sign(&raw_txn); + let signed_txn = SignedTransaction::new(raw_txn, public_key, signature); + Transaction::UserTransaction(signed_txn) +} diff --git a/storage/diemdb-benchmark/Cargo.toml b/storage/diemdb-benchmark/Cargo.toml index df3115b65efe4..ad6b9f39d10ab 100644 --- a/storage/diemdb-benchmark/Cargo.toml +++ b/storage/diemdb-benchmark/Cargo.toml @@ -11,17 +11,19 @@ edition = "2018" [dependencies] anyhow = "1.0.38" -byteorder = "1.4.3" indicatif = "0.15.0" itertools = { version = "0.10.0", default-features = false } -rand = "0.8.3" structopt = "0.3.21" +executor = { path = "../../execution/executor" } +executor-benchmark = { path = "../../execution/executor-benchmark" } diemdb = { path = "../diemdb" } diem-crypto = { path = "../../crypto/crypto" } +diem-genesis-tool = {path = "../../config/management/genesis", features = ["testing"] } diem-jellyfish-merkle = { path = "../jellyfish-merkle" } diem-config = { path = "../../config" } diem-types = { path = "../../types" } +diem-vm= { path = "../../language/diem-vm" } diem-workspace-hack = { path = "../../common/workspace-hack" } storage-interface = { path = "../storage-interface" } diff --git a/storage/diemdb-benchmark/src/lib.rs b/storage/diemdb-benchmark/src/lib.rs index c2d8188af3bcc..644ba9eb87d36 100644 --- a/storage/diemdb-benchmark/src/lib.rs +++ b/storage/diemdb-benchmark/src/lib.rs @@ -1,82 +1,34 @@ // Copyright (c) The Diem Core Contributors // SPDX-License-Identifier: Apache-2.0 -use byteorder::{BigEndian, WriteBytesExt}; -use diem_config::config::RocksdbConfig; -use diem_crypto::hash::HashValue; +use diem_config::{config::RocksdbConfig, utils::get_genesis_txn}; use diem_jellyfish_merkle::metrics::{ DIEM_JELLYFISH_INTERNAL_ENCODED_BYTES, DIEM_JELLYFISH_LEAF_ENCODED_BYTES, DIEM_JELLYFISH_STORAGE_READS, }; -use diem_types::{ - account_address::AccountAddress, - account_state_blob::AccountStateBlob, - block_info::BlockInfo, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, - transaction::{ChangeSet, Transaction, TransactionToCommit, WriteSetPayload}, - vm_status::KeptVMStatus, - write_set::WriteSetMut, -}; +use diem_vm::DiemVM; use diemdb::{ metrics::DIEM_STORAGE_ROCKSDB_PROPERTIES, schema::JELLYFISH_MERKLE_NODE_CF_NAME, DiemDB, }; +use executor::{ + db_bootstrapper::{generate_waypoint, maybe_bootstrap}, + Executor, +}; +use executor_benchmark::{ + transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator, +}; use indicatif::{ProgressBar, ProgressStyle}; -use itertools::Itertools; -use rand::Rng; use std::{ - collections::{BTreeMap, HashMap}, fs, path::PathBuf, + sync::{mpsc, Arc}, }; -use storage_interface::{DbReader, DbWriter}; - -pub fn gen_account_from_index(account_index: u64) -> AccountAddress { - let mut array = [0u8; AccountAddress::LENGTH]; - array - .as_mut() - .write_u64::(account_index) - .expect("Unable to write u64 to array"); - AccountAddress::new(array) -} - -pub fn gen_random_blob(size: usize, rng: &mut R) -> AccountStateBlob { - let mut v = vec![0u8; size]; - rng.fill(v.as_mut_slice()); - AccountStateBlob::from(v) -} - -fn gen_txn_to_commit( - max_accounts: u64, - blob_size: usize, - rng: &mut R, -) -> TransactionToCommit { - let txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(ChangeSet::new( - WriteSetMut::new(vec![]) - .freeze() - .expect("freeze cannot fail"), - vec![], - ))); - let account1 = gen_account_from_index(rng.gen_range(0..max_accounts)); - let account2 = gen_account_from_index(rng.gen_range(0..max_accounts)); - let mut states = HashMap::new(); - let blob1 = gen_random_blob(blob_size, rng); - let blob2 = gen_random_blob(blob_size, rng); - states.insert(account1, blob1); - states.insert(account2, blob2); - TransactionToCommit::new( - txn, - states, - None, - vec![], /* events */ - 0, /* gas_used */ - KeptVMStatus::Executed, - ) -} +use storage_interface::DbReaderWriter; pub fn run_benchmark( num_accounts: usize, - total_version: u64, - blob_size: usize, + init_account_balance: u64, + block_size: usize, db_dir: PathBuf, prune_window: Option, ) { @@ -86,55 +38,64 @@ pub fn run_benchmark( // create if not exists fs::create_dir_all(db_dir.clone()).unwrap(); - let db = DiemDB::open( - &db_dir, - false, /* readonly */ - prune_window, /* pruner */ - RocksdbConfig::default(), - ) - .expect("DB should open."); + let (config, genesis_key) = diem_genesis_tool::test_config(); + // Create executor. + let (db, db_rw) = DbReaderWriter::wrap( + DiemDB::open( + &db_dir, + false, /* readonly */ + prune_window, /* pruner */ + RocksdbConfig::default(), + ) + .expect("DB should open."), + ); + let waypoint = generate_waypoint::(&db_rw, get_genesis_txn(&config).unwrap()).unwrap(); + maybe_bootstrap::(&db_rw, get_genesis_txn(&config).unwrap(), waypoint).unwrap(); + let executor = Arc::new(Executor::new(db_rw)); + let genesis_block_id = executor.committed_block_id(); - let mut rng = ::rand::thread_rng(); - let mut version = 0; + let (block_sender, block_receiver) = mpsc::sync_channel(50 /* bound */); // Set a progressing bar - let bar = ProgressBar::new(total_version); + let bar = Arc::new(ProgressBar::new(0)); bar.set_style( ProgressStyle::default_bar() .template("[{elapsed}] {bar:100.cyan/blue} {pos:>7}/{len:7} {msg}"), ); + let gen_thread_bar = Arc::clone(&bar); + let exe_thread_bar = Arc::clone(&bar); - for chunk in &(0..total_version).chunks(1000 /* split by 1000 */) { - let txns_to_commit = chunk - .map(|_| gen_txn_to_commit(num_accounts as u64, blob_size, &mut rng)) - .collect::>(); - let version_bump = txns_to_commit.len() as u64; - db.save_transactions( - &txns_to_commit, - version, - None, /* ledger_info_with_sigs */ - ) - .expect("commit cannot fail"); - version = version.checked_add(version_bump).expect("Cannot overflow"); - bar.inc(version_bump); - } - let accu_root_hash = db.get_accumulator_root_hash(total_version - 1).unwrap(); - // Last txn - let li = LedgerInfo::new( - BlockInfo::new( - /* current_epoch = */ 0, - /* round = */ 0, - /* block_id */ HashValue::random_with_rng(&mut rng), - accu_root_hash, - total_version - 1, - /* timestamp = */ 0, - None, - ), - HashValue::random_with_rng(&mut rng), - ); - let li_with_sigs = LedgerInfoWithSignatures::new(li, BTreeMap::new()); - db.save_transactions(&[], total_version, Some(&li_with_sigs)) - .unwrap(); + // Spawn two threads to run transaction generator and executor separately. + let gen_thread = std::thread::Builder::new() + .name("txn_generator".to_string()) + .spawn(move || { + println!("Generating transactions..."); + let mut generator = + TransactionGenerator::new_with_sender(genesis_key, num_accounts, block_sender); + generator.run_mint(init_account_balance, block_size); + gen_thread_bar.set_length(generator.version()); + generator + }) + .expect("Failed to spawn transaction generator thread."); + let exe_thread = std::thread::Builder::new() + .name("txn_executor".to_string()) + .spawn(move || { + let mut exe = TransactionExecutor::new(executor, genesis_block_id, None); + while let Ok(transactions) = block_receiver.recv() { + let version_bump = transactions.len() as u64; + exe.execute_block(transactions); + exe_thread_bar.inc(version_bump); + } + }) + .expect("Failed to spawn transaction executor thread."); + + // Wait for generator to finish. + let mut generator = gen_thread.join().unwrap(); + generator.drop_sender(); + // Wait until all transactions are committed. + exe_thread.join().unwrap(); + // Do a sanity check on the sequence number to make sure all transactions are committed. + generator.verify_sequence_number(db.as_ref()); bar.finish(); db.update_rocksdb_properties().unwrap(); @@ -151,8 +112,9 @@ pub fn run_benchmark( let leaf_bytes = DIEM_JELLYFISH_LEAF_ENCODED_BYTES.get(); let internal_bytes = DIEM_JELLYFISH_INTERNAL_ENCODED_BYTES.get(); println!( - "created a DiemDB til version {}, where {} accounts with avg blob size {} bytes exist.", - total_version, num_accounts, blob_size + "created a DiemDB til version {}, where {} accounts exist.", + bar.length(), + num_accounts, ); println!("DB dir: {}", db_dir.as_path().display()); println!("Jellyfish Merkle physical size: {}", db_size); diff --git a/storage/diemdb-benchmark/src/main.rs b/storage/diemdb-benchmark/src/main.rs index e2731297df129..957c9a42c901b 100644 --- a/storage/diemdb-benchmark/src/main.rs +++ b/storage/diemdb-benchmark/src/main.rs @@ -6,14 +6,14 @@ use structopt::StructOpt; #[derive(Debug, StructOpt)] struct Opt { - #[structopt(short, default_value = "1000000")] + #[structopt(long, default_value = "1000000")] num_accounts: usize, - #[structopt(short, default_value = "1000000")] - version: u64, + #[structopt(long, default_value = "1000000")] + init_account_balance: u64, - #[structopt(short, default_value = "40")] - blob_size: usize, + #[structopt(long, default_value = "500")] + block_size: usize, #[structopt(long, parse(from_os_str))] db_dir: PathBuf, @@ -27,8 +27,8 @@ fn main() { diemdb_benchmark::run_benchmark( opt.num_accounts, - opt.version, - opt.blob_size, + opt.init_account_balance, + opt.block_size, opt.db_dir, opt.prune_window, ); diff --git a/storage/scratchpad-benchmark/Cargo.toml b/storage/scratchpad-benchmark/Cargo.toml index cf3bb28266d19..d79c212ffc0bd 100644 --- a/storage/scratchpad-benchmark/Cargo.toml +++ b/storage/scratchpad-benchmark/Cargo.toml @@ -11,6 +11,7 @@ edition = "2018" [dependencies] anyhow = "1.0.38" +byteorder = "1.4.3" itertools = { version = "0.10.0", default-features = false } rand = "0.8.3" rayon = "1.5.0" diff --git a/storage/scratchpad-benchmark/src/lib.rs b/storage/scratchpad-benchmark/src/lib.rs index 8026f213929ec..9c1742b8b398f 100644 --- a/storage/scratchpad-benchmark/src/lib.rs +++ b/storage/scratchpad-benchmark/src/lib.rs @@ -2,10 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; +use byteorder::{BigEndian, WriteBytesExt}; use diem_config::config::RocksdbConfig; -use diem_types::{account_address::HashAccountAddress, account_state_blob::AccountStateBlob}; +use diem_types::{ + account_address::{AccountAddress, HashAccountAddress}, + account_state_blob::AccountStateBlob, +}; use diemdb::DiemDB; -use diemdb_benchmark::{gen_account_from_index, gen_random_blob}; use executor_types::ProofReader; use rand::Rng; use std::{collections::HashMap, path::PathBuf}; @@ -13,6 +16,21 @@ use storage_interface::DbReader; type SparseMerkleTree = scratchpad::SparseMerkleTree; +fn gen_account_from_index(account_index: u64) -> AccountAddress { + let mut array = [0u8; AccountAddress::LENGTH]; + array + .as_mut() + .write_u64::(account_index) + .expect("Unable to write u64 to array"); + AccountAddress::new(array) +} + +fn gen_random_blob(size: usize, rng: &mut R) -> AccountStateBlob { + let mut v = vec![0u8; size]; + rng.fill(v.as_mut_slice()); + AccountStateBlob::from(v) +} + pub fn run_benchmark(num_updates: usize, max_accounts: u64, blob_size: usize, db_dir: PathBuf) { let db = DiemDB::open( &db_dir,