Skip to content

Commit

Permalink
[consensus] Rate limit block streaming (subscriber) for receiver & se…
Browse files Browse the repository at this point in the history
…nder (MystenLabs#18104)

Co-authored-by: MW Tian <[email protected]>
  • Loading branch information
arun-koshy and mwtian authored Jun 7, 2024
1 parent a2a09ff commit 0eb0145
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub(crate) struct NodeMetrics {
pub(crate) invalid_blocks: IntCounterVec,
pub(crate) rejected_blocks: IntCounterVec,
pub(crate) rejected_future_blocks: IntCounterVec,
pub(crate) subscribed_blocks: IntCounterVec,
pub(crate) verified_blocks: IntCounterVec,
pub(crate) committed_leaders_total: IntCounterVec,
pub(crate) last_committed_authority_round: IntGaugeVec,
Expand Down Expand Up @@ -324,6 +325,12 @@ impl NodeMetrics {
&["authority"],
registry,
).unwrap(),
subscribed_blocks: register_int_counter_vec_with_registry!(
"subscribed_blocks",
"Number of blocks received from each peer before verification",
&["authority"],
registry,
).unwrap(),
verified_blocks: register_int_counter_vec_with_registry!(
"verified_blocks",
"Number of blocks received from each peer that are verified",
Expand Down
43 changes: 21 additions & 22 deletions consensus/core/src/network/tonic_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,19 @@ impl NetworkClient for TonicClient {
let response = client.subscribe_blocks(request).await.map_err(|e| {
ConsensusError::NetworkRequest(format!("subscribe_blocks failed: {e:?}"))
})?;
let stream = response
.into_inner()
.filter_map(move |b| async move {
match b {
Ok(response) => Some(response.block),
Err(e) => {
debug!("Network error received from {}: {e:?}", peer);
None
}
let stream = response.into_inner().filter_map(move |b| async move {
match b {
Ok(response) => Some(response.block),
Err(e) => {
debug!("Network error received from {}: {e:?}", peer);
None
}
})
.boxed();
Ok(stream)
}
});
let rate_limited_stream =
tokio_stream::StreamExt::throttle(stream, self.context.parameters.min_round_delay / 2)
.boxed();
Ok(rate_limited_stream)
}

async fn fetch_blocks(
Expand Down Expand Up @@ -354,16 +354,13 @@ impl ChannelPool {

/// Proxies Tonic requests to NetworkService with actual handler implementation.
struct TonicServiceProxy<S: NetworkService> {
_context: Arc<Context>,
context: Arc<Context>,
service: Arc<S>,
}

impl<S: NetworkService> TonicServiceProxy<S> {
fn new(context: Arc<Context>, service: Arc<S>) -> Self {
Self {
_context: context,
service,
}
Self { context, service }
}
}

Expand Down Expand Up @@ -402,8 +399,8 @@ impl<S: NetworkService> ConsensusService for TonicServiceProxy<S> {
else {
return Err(tonic::Status::internal("PeerInfo not found"));
};
let mut reuqest_stream = request.into_inner();
let first_request = match reuqest_stream.next().await {
let mut request_stream = request.into_inner();
let first_request = match request_stream.next().await {
Some(Ok(r)) => r,
Some(Err(e)) => {
debug!(
Expand All @@ -421,9 +418,11 @@ impl<S: NetworkService> ConsensusService for TonicServiceProxy<S> {
.handle_subscribe_blocks(peer_index, first_request.last_received_round)
.await
.map_err(|e| tonic::Status::internal(format!("{e:?}")))?
.map(|block| Ok(SubscribeBlocksResponse { block }))
.boxed();
Ok(Response::new(stream))
.map(|block| Ok(SubscribeBlocksResponse { block }));
let rate_limited_stream =
tokio_stream::StreamExt::throttle(stream, self.context.parameters.min_round_delay / 2)
.boxed();
Ok(Response::new(rate_limited_stream))
}

type FetchBlocksStream = Iter<std::vec::IntoIter<Result<FetchBlocksResponse, tonic::Status>>>;
Expand Down
6 changes: 6 additions & 0 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
'stream: loop {
match blocks.next().await {
Some(block) => {
context
.metrics
.node_metrics
.subscribed_blocks
.with_label_values(&[&peer_hostname])
.inc();
let result = authority_service
.handle_send_block(peer, block.clone())
.await;
Expand Down

0 comments on commit 0eb0145

Please sign in to comment.