Skip to content

Commit

Permalink
client/authority-discovery: Throttle DHT requests (paritytech#7018)
Browse files Browse the repository at this point in the history
* client/authority-discovery: Throttle DHT requests

Instead of passing one DHT query for each authority down to the network
every query interval, only pass MAX_IN_FLIGHT_LOOKUPS at a given point
in time, triggering new ones when previous ones return.

* client/authority-discovery/worker/test: Fix wrong constant
  • Loading branch information
mxinden authored Sep 7, 2020
1 parent f745c7f commit ccd06f0
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 163 deletions.
6 changes: 2 additions & 4 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ pub enum Error {
HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError),
/// Failed calling into the Substrate runtime.
CallingRuntime(sp_blockchain::Error),
/// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it
/// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This
/// error is the result of the above failing.
MatchingHashedAuthorityIdWithAuthorityId,
/// Received a dht record with a key that does not match any in-flight awaited keys.
ReceivingUnexpectedRecord,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// Failed to encode a protobuf payload.
Expand Down
145 changes: 93 additions & 52 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use libp2p::{core::multiaddr, multihash::Multihash};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{
config::MultiaddrWithPeerId,
Expand Down Expand Up @@ -70,6 +71,9 @@ const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
/// Maximum number of addresses cached per authority. Additional addresses are discarded.
const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;

/// Maximum number of in-flight DHT lookups at any given point in time.
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;

/// Role an authority discovery module can run as.
pub enum Role {
/// Actual authority as well as a reference to its key store.
Expand Down Expand Up @@ -137,12 +141,17 @@ where

/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
/// Interval on which to query for addresses of other authorities.
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: Interval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: Interval,

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
/// Set of in-flight lookups.
in_flight_lookups: HashMap<libp2p::kad::record::Key, AuthorityId>,

addr_cache: addr_cache::AddrCache,

metrics: Option<Metrics>,
Expand Down Expand Up @@ -183,8 +192,8 @@ where
Duration::from_secs(12 * 60 * 60),
);

// External addresses of other authorities can change at any given point in time. The
// interval on which to query for external addresses of other authorities is a trade off
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME;
let query_interval_duration = Duration::from_secs(10 * 60);
Expand All @@ -193,9 +202,9 @@ where
// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above, just delayed by 2 minutes.
// group on the same interval as the [`query_interval`] above, just delayed by 5 minutes.
let priority_group_set_interval = interval_at(
query_interval_start + Duration::from_secs(2 * 60),
query_interval_start + Duration::from_secs(5 * 60),
query_interval_duration,
);

Expand Down Expand Up @@ -229,6 +238,8 @@ where
publish_interval,
query_interval,
priority_group_set_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
addr_cache,
role,
metrics,
Expand Down Expand Up @@ -270,7 +281,9 @@ where

if let Some(metrics) = &self.metrics {
metrics.publish.inc();
metrics.amount_last_published.set(addresses.len() as u64);
metrics.amount_addresses_last_published.set(
addresses.len().try_into().unwrap_or(std::u64::MAX),
);
}

let mut serialized_addresses = vec![];
Expand Down Expand Up @@ -314,15 +327,9 @@ where
Ok(())
}

fn request_addresses_of_others(&mut self) -> Result<()> {
fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);

let authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?;

let local_keys = match &self.role {
Role::Authority(key_store) => {
key_store.read()
Expand All @@ -333,21 +340,52 @@ where
Role::Sentry => HashSet::new(),
};

for authority_id in authorities.iter() {
// Make sure we don't look up our own keys.
if !local_keys.contains(authority_id.as_ref()) {
if let Some(metrics) = &self.metrics {
metrics.request.inc();
}
let mut authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?
.into_iter()
.filter(|id| !local_keys.contains(id.as_ref()))
.collect();

self.network
.get_value(&hash_authority_id(authority_id.as_ref()));
}
self.addr_cache.retain_ids(&authorities);

authorities.shuffle(&mut thread_rng());
self.pending_lookups = authorities;
// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
// query interval ticks are far enough apart for all lookups to succeed.
self.in_flight_lookups.clear();

if let Some(metrics) = &self.metrics {
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}

Ok(())
}

fn start_new_lookups(&mut self) {
while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
let authority_id = match self.pending_lookups.pop() {
Some(authority) => authority,
None => return,
};
let hash = hash_authority_id(authority_id.as_ref());
self.network
.get_value(&hash);
self.in_flight_lookups.insert(hash, authority_id);

if let Some(metrics) = &self.metrics {
metrics.requests.inc();
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}
}
}

/// Handle incoming Dht events.
///
/// Returns either:
Expand Down Expand Up @@ -385,10 +423,17 @@ where
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}

debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
Some(DhtEvent::ValuePut(hash)) => {
if let Some(metrics) = &self.metrics {
Expand Down Expand Up @@ -434,23 +479,9 @@ where
}
})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;

let authorities = {
let block_id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.addr_cache.retain_ids(&authorities);
authorities
.into_iter()
.map(|id| (hash_authority_id(id.as_ref()), id))
.collect::<HashMap<_, _>>()
};

// Check if the event origins from an authority in the current or next authority set.
let authority_id: &AuthorityId = authorities
.get(&remote_key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
let authority_id: AuthorityId = self.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;

let local_peer_id = self.network.local_peer_id();

Expand All @@ -463,7 +494,7 @@ where
let signature = AuthoritySignature::decode(&mut &signature[..])
.map_err(Error::EncodingDecodingScale)?;

if !AuthorityPair::verify(&signature, &addresses, authority_id) {
if !AuthorityPair::verify(&signature, &addresses, &authority_id) {
return Err(Error::VerifyingDhtPayload);
}

Expand Down Expand Up @@ -503,7 +534,7 @@ where
.collect();

if !remote_addresses.is_empty() {
self.addr_cache.insert(authority_id.clone(), remote_addresses);
self.addr_cache.insert(authority_id, remote_addresses);
if let Some(metrics) = &self.metrics {
metrics.known_authorities_count.set(
self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)
Expand Down Expand Up @@ -610,15 +641,15 @@ where
}
}

// Request addresses of authorities.
// Request addresses of authorities, refilling the pending lookups queue.
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}

if let Err(e) = self.request_addresses_of_others() {
if let Err(e) = self.refill_pending_lookups_queue() {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
"Failed to refill pending lookups queue: {:?}", e,
);
}
}
Expand Down Expand Up @@ -652,6 +683,8 @@ where
}
}

self.start_new_lookups();

Poll::Pending
}
}
Expand Down Expand Up @@ -712,8 +745,9 @@ fn interval_at(start: Instant, duration: Duration) -> Interval {
#[derive(Clone)]
pub(crate) struct Metrics {
publish: Counter<U64>,
amount_last_published: Gauge<U64>,
request: Counter<U64>,
amount_addresses_last_published: Gauge<U64>,
requests: Counter<U64>,
requests_pending: Gauge<U64>,
dht_event_received: CounterVec<U64>,
handle_value_found_event_failure: Counter<U64>,
known_authorities_count: Gauge<U64>,
Expand All @@ -730,22 +764,29 @@ impl Metrics {
)?,
registry,
)?,
amount_last_published: register(
amount_addresses_last_published: register(
Gauge::new(
"authority_discovery_amount_external_addresses_last_published",
"Number of external addresses published when authority discovery last \
published addresses."
)?,
registry,
)?,
request: register(
requests: register(
Counter::new(
"authority_discovery_authority_addresses_requested_total",
"Number of times authority discovery has requested external addresses of a \
single authority."
)?,
registry,
)?,
requests_pending: register(
Gauge::new(
"authority_discovery_authority_address_requests_pending",
"Number of pending authority address requests."
)?,
registry,
)?,
dht_event_received: register(
CounterVec::new(
Opts::new(
Expand Down
Loading

0 comments on commit ccd06f0

Please sign in to comment.