Skip to content

Commit

Permalink
Cleanup GatewayMetrics (MystenLabs#3194)
Browse files Browse the repository at this point in the history
* Remove unused gaetway_metrics from active_authority

* Add AuthAggMetrics

* Use AuthAggMetrics

* Cleanup ActiveAuthority constructor
  • Loading branch information
lxfind authored Jul 14, 2022
1 parent 5dede27 commit 7c04487
Show file tree
Hide file tree
Showing 18 changed files with 139 additions and 163 deletions.
31 changes: 6 additions & 25 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@
*/

use arc_swap::ArcSwap;
use std::{
collections::{BTreeMap, HashMap},
ops::Deref,
sync::Arc,
time::Duration,
};
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use sui_storage::{follower_store::FollowerStore, node_sync_store::NodeSyncStore};
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
Expand All @@ -44,7 +39,7 @@ use tracing::error;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI, gateway_state::GatewayMetrics,
authority_client::AuthorityAPI,
};
use tokio::time::Instant;

Expand Down Expand Up @@ -110,15 +105,13 @@ pub struct ActiveAuthority<A> {
pub net: ArcSwap<AuthorityAggregator<A>>,
// Network health
pub health: Arc<Mutex<HashMap<AuthorityName, AuthorityHealth>>>,
pub gateway_metrics: GatewayMetrics,
}

impl<A> ActiveAuthority<A> {
pub fn new(
authority: Arc<AuthorityState>,
follower_store: Arc<FollowerStore>,
authority_clients: BTreeMap<AuthorityName, A>,
gateway_metrics: GatewayMetrics,
net: AuthorityAggregator<A>,
) -> SuiResult<Self> {
let committee = authority.clone_committee();

Expand All @@ -131,28 +124,17 @@ impl<A> ActiveAuthority<A> {
)),
state: authority,
follower_store,
net: ArcSwap::from(Arc::new(AuthorityAggregator::new(
committee,
authority_clients,
gateway_metrics.clone(),
))),
gateway_metrics,
net: ArcSwap::from(Arc::new(net)),
})
}

pub fn new_with_ephemeral_follower_store(
authority: Arc<AuthorityState>,
authority_clients: BTreeMap<AuthorityName, A>,
gateway_metrics: GatewayMetrics,
net: AuthorityAggregator<A>,
) -> SuiResult<Self> {
let working_dir = tempfile::tempdir().unwrap();
let follower_store = Arc::new(FollowerStore::open(&working_dir).expect("cannot open db"));
Self::new(
authority,
follower_store,
authority_clients,
gateway_metrics,
)
Self::new(authority, follower_store, net)
}

/// Returns the amount of time we should wait to be able to contact at least
Expand Down Expand Up @@ -208,7 +190,6 @@ impl<A> Clone for ActiveAuthority<A> {
follower_store: self.follower_store.clone(),
net: ArcSwap::from(self.net.load().clone()),
health: self.health.clone(),
gateway_metrics: self.gateway_metrics.clone(),
}
}
}
Expand Down
21 changes: 8 additions & 13 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
authority_active::{checkpoint_driver::CheckpointProcessControl, ActiveAuthority},
authority_client::LocalAuthorityClient,
checkpoints::checkpoint_tests::TestSetup,
gateway_state::GatewayMetrics,
safe_client::SafeClient,
};

Expand All @@ -30,13 +29,12 @@ async fn checkpoint_active_flow_happy_path() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down Expand Up @@ -108,13 +106,12 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down Expand Up @@ -202,13 +199,12 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down Expand Up @@ -296,13 +292,12 @@ async fn test_empty_checkpoint() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down
11 changes: 4 additions & 7 deletions crates/sui-core/src/authority_active/execution_driver/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::gateway_state::GatewayMetrics;
use crate::{authority_active::ActiveAuthority, checkpoints::checkpoint_tests::TestSetup};

use std::sync::Arc;
Expand All @@ -28,13 +27,12 @@ async fn pending_exec_storage_notify() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down Expand Up @@ -112,13 +110,12 @@ async fn pending_exec_full() {

// Start active part of authority.
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let inner_agg = aggregator.clone();
let _active_handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
GatewayMetrics::new_for_tests(),
inner_agg,
)
.unwrap(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::authority::AuthorityState;
use crate::authority::AuthorityStore;
use crate::authority_aggregator::authority_aggregator_tests::*;
use crate::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use crate::safe_client::SafeClient;
use async_trait::async_trait;
Expand Down Expand Up @@ -196,7 +197,7 @@ impl AuthorityAPI for ConfigurableBatchActionClient {
pub async fn init_configurable_authorities(
authority_action: Vec<BatchAction>,
) -> (
BTreeMap<AuthorityName, ConfigurableBatchActionClient>,
AuthorityAggregator<ConfigurableBatchActionClient>,
Vec<Arc<AuthorityState>>,
Vec<ExecutionDigests>,
) {
Expand Down Expand Up @@ -312,5 +313,10 @@ pub async fn init_configurable_authorities(
.into_iter()
.map(|(name, client)| (name, client.authority_client().clone()))
.collect();
(authority_clients, states, executed_digests)
let net = AuthorityAggregator::new(
committee,
authority_clients,
AuthAggMetrics::new_for_tests(),
);
(net, states, executed_digests)
}
34 changes: 12 additions & 22 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::authority_active::gossip::configurable_batch_action_client::{
init_configurable_authorities, BatchAction, ConfigurableBatchActionClient,
};
use crate::authority_active::MAX_RETRY_DELAY_MS;
use crate::gateway_state::GatewayMetrics;
use std::collections::BTreeMap;
use std::time::Duration;
use tokio::task::JoinHandle;

Expand All @@ -19,14 +17,13 @@ pub async fn test_gossip_plain() {
BatchAction::EmitUpdateItem(),
];

let (clients, states, digests) = init_configurable_authorities(action_sequence).await;
let (net, states, digests) = init_configurable_authorities(action_sequence).await;

let _active_authorities = start_gossip_process(states.clone(), clients.clone()).await;
let _active_authorities = start_gossip_process(states.clone(), net.clone()).await;
tokio::time::sleep(Duration::from_secs(20)).await;

// Expected outcome of gossip: each digest's tx signature and cert is now on every authority.
let clients_final: Vec<_> = clients.values().collect();
for client in clients_final.iter() {
for client in net.clone_inner_clients().values() {
for digest in &digests {
let result1 = client
.handle_transaction_info_request(TransactionInfoRequest {
Expand All @@ -46,14 +43,13 @@ pub async fn test_gossip_plain() {
pub async fn test_gossip_error() {
let action_sequence = vec![BatchAction::EmitError(), BatchAction::EmitUpdateItem()];

let (clients, states, digests) = init_configurable_authorities(action_sequence).await;
let (net, states, digests) = init_configurable_authorities(action_sequence).await;

let _active_authorities = start_gossip_process(states.clone(), clients.clone()).await;
let _active_authorities = start_gossip_process(states.clone(), net.clone()).await;
// failure back-offs were set from the errors
tokio::time::sleep(Duration::from_millis(MAX_RETRY_DELAY_MS)).await;

let clients_final: Vec<_> = clients.values().collect();
for client in clients_final.iter() {
for client in net.clone_inner_clients().values() {
for digest in &digests {
let result1 = client
.handle_transaction_info_request(TransactionInfoRequest {
Expand All @@ -72,7 +68,7 @@ pub async fn test_gossip_error() {
#[tokio::test(flavor = "current_thread", start_paused = true)]
pub async fn test_gossip_after_revert() {
let action_sequence = vec![BatchAction::EmitUpdateItem(), BatchAction::EmitUpdateItem()];
let (clients, states, digests) = init_configurable_authorities(action_sequence).await;
let (net, states, digests) = init_configurable_authorities(action_sequence).await;

tokio::time::sleep(Duration::from_secs(20)).await;
// 3 (quorum) of the validators have executed 2 transactions, and 1 has none.
Expand Down Expand Up @@ -103,11 +99,10 @@ pub async fn test_gossip_after_revert() {
}
}

let _active_authorities = start_gossip_process(states.clone(), clients.clone()).await;
let _active_authorities = start_gossip_process(states.clone(), net.clone()).await;
tokio::time::sleep(Duration::from_secs(20)).await;

let clients_final: Vec<_> = clients.values().collect();
for client in clients_final.iter() {
for client in net.clone_inner_clients().values() {
let result = client
.handle_transaction_info_request(TransactionInfoRequest {
transaction_digest: digests[0].transaction,
Expand Down Expand Up @@ -155,22 +150,17 @@ pub async fn test_gossip_after_revert() {

async fn start_gossip_process(
states: Vec<Arc<AuthorityState>>,
clients: BTreeMap<AuthorityName, ConfigurableBatchActionClient>,
net: AuthorityAggregator<ConfigurableBatchActionClient>,
) -> Vec<JoinHandle<()>> {
let mut active_authorities = Vec::new();

// Start active processes.
for state in states {
let inner_clients = clients.clone();
let inner_net = net.clone();

let handle = tokio::task::spawn(async move {
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
state,
inner_clients,
GatewayMetrics::new_for_tests(),
)
.unwrap(),
ActiveAuthority::new_with_ephemeral_follower_store(state, inner_net).unwrap(),
);
active_state.spawn_gossip_process(3).await;
});
Expand Down
Loading

0 comments on commit 7c04487

Please sign in to comment.