Skip to content

Relay submissions more metrics #672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/rbuilder/benches/benchmarks/mev_boost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rbuilder::mev_boost::{
use reth::primitives::SealedBlock;
use reth_chainspec::SEPOLIA;
use reth_primitives::kzg::Blob;
use ssz::Encode;
use std::{fs, path::PathBuf, sync::Arc};

fn mev_boost_serialize_submit_block(data: DenebSubmitBlockRequest) {
Expand Down
12 changes: 7 additions & 5 deletions crates/rbuilder/src/live_builder/block_output/relay_submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use crate::{
},
primitives::mev_boost::{MevBoostRelayBidSubmitter, MevBoostRelayID},
telemetry::{
add_relay_submit_time, add_subsidy_value, inc_conn_relay_errors,
inc_failed_block_simulations, inc_initiated_submissions, inc_other_relay_errors,
inc_relay_accepted_submissions, inc_subsidized_blocks, inc_too_many_req_relay_errors,
mark_submission_start_time,
add_relay_submission_stats, add_relay_submit_time, add_subsidy_value,
inc_conn_relay_errors, inc_failed_block_simulations, inc_initiated_submissions,
inc_other_relay_errors, inc_relay_accepted_submissions, inc_subsidized_blocks,
inc_too_many_req_relay_errors, mark_submission_start_time,
},
utils::{duration_ms, error_storage::store_error_event},
};
Expand Down Expand Up @@ -203,6 +203,7 @@ async fn run_submit_to_relays_job(
top_competitor_bid: block.trace.seen_competition_bid,
},
order_ids: executed_orders.map(|o| o.id()).collect(),
sealed_at: block.trace.orders_sealed_at,
};

let submission_span = info_span!(
Expand Down Expand Up @@ -402,10 +403,11 @@ async fn submit_bid_to_the_relay(
};
let submit_time = submit_start.elapsed();
match relay_result {
Ok(()) => {
Ok(stats) => {
trace!("Block submitted to the relay successfully");
add_relay_submit_time(relay.id(), submit_time);
inc_relay_accepted_submissions(relay.id(), optimistic);
add_relay_submission_stats(relay.id(), stats);
}
Err(SubmitBlockErr::PayloadDelivered | SubmitBlockErr::PastSlot) => {
trace!("Block already delivered by the relay, cancelling");
Expand Down
67 changes: 38 additions & 29 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use jsonrpsee::RpcModule;
use parking_lot::Mutex;
use std::{net::Ipv4Addr, path::PathBuf, sync::Arc, time::Duration};
use std::{path::Path, time::Instant};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{
sync::mpsc,
task::{spawn_blocking, JoinHandle},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -202,7 +205,7 @@ pub async fn start_orderpool_jobs<P>(
header_receiver: mpsc::Receiver<Header>,
) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)>
where
P: StateProviderFactory + 'static,
P: StateProviderFactory + Clone + 'static,
{
if config.ignore_cancellable_orders {
warn!("ignore_cancellable_orders is set to true, some order input is ignored");
Expand Down Expand Up @@ -332,7 +335,7 @@ async fn spawn_clean_orderpool_job<P>(
global_cancellation: CancellationToken,
) -> eyre::Result<JoinHandle<()>>
where
P: StateProviderFactory + 'static,
P: StateProviderFactory + Clone + 'static,
{
let mut header_receiver: mpsc::Receiver<Header> = header_receiver;

Expand All @@ -343,32 +346,38 @@ where
tokio::select! {
header = header_receiver.recv() => {
if let Some(header) = header {
let current_block = header.number;
set_current_block(current_block);
let state = match provider_factory.latest() {
Ok(state) => state,
Err(err) => {
error!("Failed to get latest state: {}", err);
// @Metric error count
continue;
}
};

let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(current_block, &state);

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
debug!(
current_block,
tx_count,
bundle_count,
update_time_ms = update_time.as_millis(),
"Cleaned orderpool",
);
let provider_factory = provider_factory.clone();
let orderpool = orderpool.clone();
let res = spawn_blocking(move || {
let current_block = header.number;
set_current_block(current_block);
let state = match provider_factory.latest() {
Ok(state) => state,
Err(err) => {
error!(?err, "Failed to get latest state");
return;
}
};

let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(current_block, &state);

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
debug!(
current_block,
tx_count,
bundle_count,
update_time_ms = update_time.as_millis(),
"Cleaned orderpool",
);
}).await;
if let Err(err) = res {
error!(?err, "Clean orderpool error");
}
} else {
info!("Clean orderpool job: channel ended");
if !global_cancellation.is_cancelled(){
Expand Down
93 changes: 73 additions & 20 deletions crates/rbuilder/src/mev_boost/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub mod rpc;
pub mod sign_payload;
pub mod submission;

use crate::utils::{offset_datetime_to_timestamp_us, timestamp_now_us};

use super::utils::u256decimal_serde_helper;

use alloy_primitives::{Address, BlockHash, Bytes, U256};
Expand All @@ -18,8 +20,13 @@ use reqwest::{
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use ssz::Encode;
use std::{io::Write, str::FromStr};
use std::{
io::Write,
str::FromStr,
time::{Duration, Instant},
};
use submission::{SubmitBlockRequestNoBlobs, SubmitBlockRequestWithMetadata};
use tokio::task::spawn_blocking;
use url::Url;

pub use error::*;
Expand All @@ -28,6 +35,8 @@ pub use sign_payload::*;
const TOTAL_PAYMENT_HEADER: &str = "Total-Payment";
const BUNDLE_HASHES_HEADER: &str = "Bundle-Hashes";
const TOP_BID_HEADER: &str = "Top-Bid";
const SUBMIT_START_TIME_US: &str = "Submit-Start-Time-Us";
const BLOCK_SEAL_TIME_US: &str = "Block-Seal-Time-Us";
const BLOXROUTE_SHARE_HEADER: &str = "share";
const BLOXROUTE_BUILDER_VALUE_HEADER: &str = "builder-value";

Expand Down Expand Up @@ -357,6 +366,22 @@ impl std::fmt::Debug for SubmitBlockErr {
}
}

#[derive(Debug, Clone, Default)]
pub struct RelaySubmitStats {
/// time spent between starting submission and doing request
pub send_preparation_time: Duration,
/// time spent compressing payload using gzip, its counted in send_preparation_time
pub send_compression_time: Duration,
/// time spent writing request
pub send_write_request_time: Duration,
/// time spent reading response
pub send_read_request_time: Duration,
/// size in bytes of original payload (before compression if present)
pub original_payload_size: usize,
/// size in bytes of sent payload (after compression if present)
pub sent_payload_size: usize,
}

// Data API
impl RelayClient {
async fn get_one_delivered_payload(
Expand Down Expand Up @@ -508,7 +533,9 @@ impl RelayClient {
gzip: bool,
fake_relay: bool,
cancellations: bool,
stats: &mut RelaySubmitStats,
) -> Result<Response, SubmitBlockErr> {
let preparation_start = Instant::now();
let url = {
let mut url = self.url.clone();
url.set_path("/relay/v1/builder/blocks");
Expand Down Expand Up @@ -544,20 +571,28 @@ impl RelayClient {
self.add_auth_headers(&mut headers)
.map_err(|_| SubmitBlockErr::InvalidHeader)?;

stats.original_payload_size = body_data.len();
let compression_start = Instant::now();
// GZIP
if gzip {
headers.insert(
CONTENT_ENCODING,
HeaderValue::from_static(GZIP_CONTENT_ENCODING),
);
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&body_data)
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))?;
body_data = encoder
.finish()
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))?;
body_data = spawn_blocking(move || {
Copy link
Preview

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spawn_blocking call moves the entire body_data Vec to the blocking thread, which could be inefficient for large payloads. Consider using a streaming approach or at least moving only the encoder setup to the blocking thread while keeping the data processing async.

Copilot uses AI. Check for mistakes.

let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder
.write_all(&body_data)
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))?;
encoder
.finish()
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))
})
.await
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))??;
Copy link
Preview

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The double question mark operator (??) is used to handle both the spawn_blocking error and the compression error. This creates nested error handling that could be clearer if split into separate statements or using a more explicit error handling approach.

Suggested change
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))??;
.map_err(|e| SubmitBlockErr::RPCSerializationError(e.to_string()))?;
body_data = join_result?;

Copilot uses AI. Check for mistakes.

}
stats.sent_payload_size = body_data.len();
stats.send_compression_time = compression_start.elapsed();

// Set bloxroute specific headers.
if self.is_bloxroute {
Expand Down Expand Up @@ -613,12 +648,24 @@ impl RelayClient {
};
builder = builder.header(BUNDLE_HASHES_HEADER, bundle_ids);
}

builder = builder
.header(SUBMIT_START_TIME_US, timestamp_now_us().to_string())
.header(
BLOCK_SEAL_TIME_US,
offset_datetime_to_timestamp_us(submission_with_metadata.metadata.sealed_at),
);
}

Ok(builder
stats.send_preparation_time = preparation_start.elapsed();
let send_start = Instant::now();
let response = builder
.send()
.await
.map_err(|e| RelayError::RequestError(e.into()))?)
.map_err(|e| RelayError::RequestError(e.into()))?;
stats.send_write_request_time = send_start.elapsed();

Ok(response)
}

/// Submits the block (call_relay_submit_block) and processes some special errors.
Expand All @@ -629,30 +676,34 @@ impl RelayClient {
gzip: bool,
fake_relay: bool,
cancellations: bool,
) -> Result<(), SubmitBlockErr> {
) -> Result<RelaySubmitStats, SubmitBlockErr> {
let mut stats = RelaySubmitStats::default();
let resp = self
.call_relay_submit_block(data, ssz, gzip, fake_relay, cancellations)
.call_relay_submit_block(data, ssz, gzip, fake_relay, cancellations, &mut stats)
.await?;

let read_start = Instant::now();
let status = resp.status();

// always read full body to reuse TCP connection
let body_result = resp.bytes().await;
stats.send_read_request_time = read_start.elapsed();

if status == StatusCode::TOO_MANY_REQUESTS {
return Err(RelayError::TooManyRequests.into());
}
if status == StatusCode::GATEWAY_TIMEOUT {
return Err(RelayError::ConnectionError.into());
}

let data = resp
.bytes()
.await
.map_err(|e| RelayError::RequestError(e.into()))?;
let data = body_result.map_err(|e| RelayError::RequestError(e.into()))?;

if status == StatusCode::OK && data.as_ref() == b"" {
return Ok(());
return Ok(stats);
}

match serde_json::from_slice::<RelayResponse<()>>(&data) {
Ok(RelayResponse::Ok(_)) => Ok(()),
Ok(RelayResponse::Ok(_)) => Ok(stats),
Ok(RelayResponse::Error(error)) => {
let msg = error.message.as_str();
match msg {
Expand Down Expand Up @@ -690,11 +741,11 @@ impl RelayClient {
// bloxroute returns empty response in this format which we handle here because its not valid
// jsonrpc response
if data.as_ref() == b"{}\n" {
return Ok(());
return Ok(stats);
}
let data_string = String::from_utf8_lossy(&data).to_string();
if is_ignorable_relay_error(status, &data_string) {
Ok(())
Ok(stats)
} else {
Err(RelayError::UnknownRelayError(status, data_string).into())
}
Expand Down Expand Up @@ -725,6 +776,7 @@ impl RelayClient {
#[cfg(test)]
mod tests {
use submission::{BidMetadata, BidValueMetadata};
use time::OffsetDateTime;

use super::{rpc::TestDataGenerator, *};
use crate::mev_boost::{
Expand Down Expand Up @@ -884,6 +936,7 @@ mod tests {
top_competitor_bid: None,
},
order_ids: vec![],
sealed_at: OffsetDateTime::now_utc(),
},
};
relay
Expand Down
2 changes: 2 additions & 0 deletions crates/rbuilder/src/mev_boost/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use alloy_rpc_types_engine::{BlobsBundleV1, ExecutionPayloadV2, ExecutionPayload
use derive_more::Deref;
use serde::{Deserialize, Serialize};
use ssz::DecodeError;
use time::OffsetDateTime;

use super::adjustment::BidAdjustmentData;
use crate::primitives::OrderId;
Expand Down Expand Up @@ -367,6 +368,7 @@ impl ssz::Decode for CapellaSubmitBlockRequest {
pub struct BidMetadata {
pub value: BidValueMetadata,
pub order_ids: Vec<OrderId>,
pub sealed_at: OffsetDateTime,
}

#[derive(Clone, Copy, Default, Debug)]
Expand Down
6 changes: 3 additions & 3 deletions crates/rbuilder/src/primitives/mev_boost.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::mev_boost::{
submission::SubmitBlockRequestWithMetadata, RelayClient, RelayError, SubmitBlockErr,
ValidatorSlotData,
submission::SubmitBlockRequestWithMetadata, RelayClient, RelayError, RelaySubmitStats,
SubmitBlockErr, ValidatorSlotData,
};
use alloy_primitives::{utils::parse_ether, Address, U256};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
Expand Down Expand Up @@ -192,7 +192,7 @@ impl MevBoostRelayBidSubmitter {
pub async fn submit_block(
&self,
data: &SubmitBlockRequestWithMetadata,
) -> Result<(), SubmitBlockErr> {
) -> Result<RelaySubmitStats, SubmitBlockErr> {
self.client
.submit_block(
data,
Expand Down
Loading
Loading