Skip to content

Commit

Permalink
Add prometheus metrics in benchmarking framework (MystenLabs#3578)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Aug 2, 2022
1 parent c549fea commit 9aff02f
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/sui-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ sui-core = { path = "../sui-core" }
sui-config = { path = "../sui-config" }
sui-types = { path = "../sui-types" }
sui-sdk = { path = "../sui-sdk" }
sui-quorum-driver = { path = "../sui-quorum-driver" }
sui-node = { path = "../sui-node" }
sui-json-rpc-types = { path = "../sui-json-rpc-types" }
sui-gateway = { path = "../sui-gateway" }

move-core-types = { git = "https://github.com/move-language/move", rev = "79071528524f08b12e9abb84c1094d8e976aa17a", features = ["address20"] }
narwhal-node = { git = "https://github.com/MystenLabs/narwhal", rev = "259a37b487570763575e6b28f8b8057b16b3e916", package = "node" }
sui-quorum-driver = { path = "../sui-quorum-driver" }
sui-node = { path = "../sui-node" }
workspace-hack = { path = "../workspace-hack"}
test-utils = { path = "../test-utils" }

Expand Down
86 changes: 81 additions & 5 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@ use futures::future::try_join_all;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use prometheus::register_gauge_with_registry;
use prometheus::register_histogram_with_registry;
use prometheus::register_int_counter_with_registry;
use prometheus::Gauge;
use prometheus::Histogram;
use prometheus::IntCounter;
use prometheus::Registry;
use sui_benchmark::workloads::workload::get_latest;
use sui_benchmark::workloads::workload::WorkloadType;
use sui_config::gateway::GatewayConfig;
use sui_config::Config;
use sui_config::PersistedConfig;
use sui_core::authority_aggregator::AuthAggMetrics;
use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::gateway_state::GatewayState;
use sui_node::metrics;
use sui_node::SuiNode;
use sui_types::base_types::ObjectID;
use sui_types::base_types::SuiAddress;
use tokio::sync::OnceCell;
Expand All @@ -27,11 +39,7 @@ use sui_benchmark::workloads::transfer_object::TransferObjectWorkload;
use sui_benchmark::workloads::workload::CombinationWorkload;
use sui_benchmark::workloads::workload::Payload;
use sui_benchmark::workloads::workload::Workload;
use sui_config::gateway::GatewayConfig;
use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_core::gateway_state::GatewayState;
use sui_node::SuiNode;
use sui_quorum_driver::QuorumDriverHandler;
use sui_sdk::crypto::SuiKeystore;
use sui_types::crypto::EncodeDecodeBase64;
Expand Down Expand Up @@ -101,6 +109,10 @@ struct Opts {
// Default workload is 100% transfer object
#[clap(subcommand)]
workload_spec: OptWorkloadSpec,
#[clap(long, default_value = "9091", global = true)]
pub server_metric_port: u16,
#[clap(long, default_value = "8081", global = true)]
pub client_metric_port: u16,
}

#[derive(Debug, Clone, Parser, Eq, PartialEq, EnumString)]
Expand Down Expand Up @@ -129,6 +141,51 @@ pub enum OptWorkloadSpec {
},
}

pub struct BenchMetrics {
pub num_success: IntCounter,
pub num_error: IntCounter,
pub num_submitted: IntCounter,
pub num_in_flight: Gauge,
pub latency_s: Histogram,
}

impl BenchMetrics {
fn new(registry: &Registry) -> Self {
BenchMetrics {
num_success: register_int_counter_with_registry!(
"num_success",
"Total number of transaction success",
registry,
)
.unwrap(),
num_error: register_int_counter_with_registry!(
"num_error",
"Total number of transaction errors",
registry,
)
.unwrap(),
num_submitted: register_int_counter_with_registry!(
"num_submitted",
"Total number of transaction submitted to sui",
registry,
)
.unwrap(),
num_in_flight: register_gauge_with_registry!(
"num_in_flight",
"Total number of transaction in flight",
registry,
)
.unwrap(),
latency_s: register_histogram_with_registry!(
"latency_s",
"Total time in seconds to return a response",
registry,
)
.unwrap(),
}
}
}

struct Stats {
pub id: usize,
pub num_success: u64,
Expand Down Expand Up @@ -162,6 +219,7 @@ async fn run(
num_requests_per_worker: u64,
opts: Opts,
barrier: Arc<Barrier>,
metrics: Arc<BenchMetrics>,
) {
let mut tasks = Vec::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
Expand All @@ -175,6 +233,7 @@ async fn run(
.await;
let tx_cloned = tx.clone();
let cloned_barrier = barrier.clone();
let metrics_cloned = metrics.clone();
// Make a per worker quorum driver, otherwise they all share the same task.
let quorum_driver_handler = QuorumDriverHandler::new(clients.clone());
let qd = quorum_driver_handler.clone_quorum_driver();
Expand Down Expand Up @@ -230,6 +289,8 @@ async fn run(
if let Some(b) = retry_queue.pop_front() {
num_submitted += 1;
num_error += 1;
metrics_cloned.num_submitted.inc();
metrics_cloned.num_error.inc();
let res = qd
.execute_transaction(ExecuteTransactionRequest {
transaction: b.0.clone(),
Expand Down Expand Up @@ -268,6 +329,8 @@ async fn run(
} else {
num_in_flight += 1;
num_submitted += 1;
metrics_cloned.num_in_flight.inc();
metrics_cloned.num_submitted.inc();
let payload = free_pool.pop().unwrap();
let tx = payload.make_transaction();
let start = Instant::now();
Expand Down Expand Up @@ -309,8 +372,11 @@ async fn run(
NextOp::Response(Some((start, payload))) => {
free_pool.push(payload);
let latency = start.elapsed();
metrics_cloned.latency_s.observe(latency.as_secs_f64());
num_success += 1;
num_in_flight -= 1;
metrics_cloned.num_success.inc();
metrics_cloned.num_in_flight.dec();
if latency > max_latency {
max_latency = latency;
}
Expand Down Expand Up @@ -442,9 +508,12 @@ async fn main() -> Result<()> {
eprintln!("Configuring local benchmark..");
let configs = {
let mut configs = test_and_configure_authority_configs(opts.committee_size as usize);
let mut metric_port = opts.server_metric_port;
configs.validator_configs.iter_mut().for_each(|config| {
let parameters = &mut config.consensus_config.as_mut().unwrap().narwhal_config;
parameters.batch_size = 12800;
config.metrics_address = format!("127.0.0.1:{}", metric_port).parse().unwrap();
metric_port += 1;
});
Arc::new(configs)
};
Expand Down Expand Up @@ -556,17 +625,24 @@ async fn main() -> Result<()> {
client_runtime.block_on(async move {
let committee = GatewayState::make_committee(&gateway_config).unwrap();
let authority_clients = GatewayState::make_authority_clients(&gateway_config);
let metrics = AuthAggMetrics::new(&prometheus::Registry::new());
let registry: Registry = metrics::start_prometheus_server(
format!("127.0.0.1:{}", opts.client_metric_port)
.parse()
.unwrap(),
);
let metrics = AuthAggMetrics::new(&registry);
let aggregator = AuthorityAggregator::new(committee, authority_clients, metrics);
let mut workload = make_workload(primary_gas_id, owner, keypair, &opts);
workload.init(&aggregator).await;
let barrier = Arc::new(Barrier::new(opts.num_workers as usize));
let metrics = Arc::new(BenchMetrics::new(&registry));
run(
aggregator,
workload,
max_in_flight_ops as u64 / opts.num_workers,
opts,
barrier,
metrics,
)
.await
});
Expand Down

0 comments on commit 9aff02f

Please sign in to comment.