From 839dc47942b8bcbd97de92570c09fc893b69e6da Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 19 Jun 2025 15:07:57 +0200 Subject: [PATCH 1/7] chore(tracing): improve bundle tracing, break when receiver is dropped --- src/tasks/cache/bundle.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e07c1515..b703f6b0 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,6 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use eyre::bail; use init4_bin_base::{ deps::tracing::{Instrument, debug, debug_span, error, trace, warn}, perms::SharedToken, @@ -47,7 +48,7 @@ impl BundlePoller { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let Some(token) = self.token.read() else { warn!("No token available, skipping bundle fetch"); - return Ok(vec![]); + bail!("No token available, skipping bundle fetch"); }; self.client @@ -89,10 +90,12 @@ impl BundlePoller { .await .inspect_err(|err| debug!(%err, "Error fetching bundles")) { + let _guard = span.entered(); debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { error!(err = ?err, "Failed to send bundle - channel is dropped"); + break; } } } From 3507dc4da18377a2aeb7ecd35b6d8ecd03174fc7 Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 12:54:07 +0200 Subject: [PATCH 2/7] chore: don't warn and bail --- src/tasks/cache/bundle.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index b703f6b0..2d1c6648 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -47,7 +47,6 @@ impl BundlePoller { pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let Some(token) = self.token.read() else { - warn!("No token available, skipping bundle fetch"); bail!("No token available, skipping bundle fetch"); }; From adf8803f45ce35bf4e12f10a96663761b6c8c063 Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 13:00:17 +0200 Subject: [PATCH 3/7] chore: clippy --- src/tasks/cache/bundle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 2d1c6648..9a8bf420 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -2,7 +2,7 @@ use crate::config::BuilderConfig; use eyre::bail; use init4_bin_base::{ - deps::tracing::{Instrument, debug, debug_span, error, trace, warn}, + deps::tracing::{Instrument, debug, debug_span, error, trace}, perms::SharedToken, }; use oauth2::TokenResponse; From 5424d73672aec7e1028c19de6fdd3405aa7cfd34 Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 16:20:14 +0200 Subject: [PATCH 4/7] chore: improve bundle logging --- src/tasks/cache/bundle.rs | 24 ++++++++++++++++++++++++ src/tasks/cache/tx.rs | 2 ++ 2 files changed, 26 insertions(+) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 9a8bf420..0fcbc40b 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,6 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::primitives::keccak256; use eyre::bail; use init4_bin_base::{ deps::tracing::{Instrument, debug, debug_span, error, trace}, @@ -92,6 +93,8 @@ impl BundlePoller { let _guard = span.entered(); debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { + Self::log_bundle(&bundle); + if let Err(err) = outbound.send(bundle) { error!(err = ?err, "Failed to send bundle - channel is dropped"); break; @@ -103,6 +106,27 @@ impl BundlePoller { } } + /// Utility to log a bundle and its contents. + fn log_bundle(bundle: &TxCacheBundle) { + let bundle_id = bundle.bundle().bundle.replacement_uuid.as_ref(); + trace!( + bundle_id = ?bundle_id, + "found bundle in tx cache" + ); + + for (i, tx) in bundle.bundle().txs().iter().enumerate() { + let hash = keccak256(tx); + trace!(bundle_id = ?bundle_id, tx = %hash, number = i, "bundle tx"); + } + + if let Some(ref fill) = bundle.bundle().host_fills { + trace!(bundle_id = ?bundle_id, fill_owner = %fill.permit.owner, fill_nonce = %fill.permit.permit.nonce, "bundle fill owner and nonce"); + for output in fill.outputs.iter() { + trace!(bundle_id = ?bundle_id, token = ?output.token, recipient = ?output.recipient, "bundle fill token and recipient") + } + } + } + /// Spawns a task that sends bundles it finds to its channel sender. pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = unbounded_channel(); diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index ef75327d..50ea788a 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -82,6 +82,8 @@ impl TxPoller { let _guard = span.entered(); debug!(count = ?transactions.len(), "found transactions"); for tx in transactions.into_iter() { + trace!(tx = %tx.hash(), "sending transaction to outbound channel"); + if outbound.send(tx).is_err() { // If there are no receivers, we can shut down trace!("No receivers left, shutting down"); From bc9a08b52b07b57a5144b252b0ff5a8f4d1624a1 Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 16:27:20 +0200 Subject: [PATCH 5/7] chore: log on ingestion into cache --- src/tasks/cache/bundle.rs | 24 ------------------------ src/tasks/cache/task.rs | 26 +++++++++++++++++++++++++- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 0fcbc40b..9a8bf420 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,6 +1,5 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use alloy::primitives::keccak256; use eyre::bail; use init4_bin_base::{ deps::tracing::{Instrument, debug, debug_span, error, trace}, @@ -93,8 +92,6 @@ impl BundlePoller { let _guard = span.entered(); debug!(count = ?bundles.len(), "found bundles"); for bundle in bundles.into_iter() { - Self::log_bundle(&bundle); - if let Err(err) = outbound.send(bundle) { error!(err = ?err, "Failed to send bundle - channel is dropped"); break; @@ -106,27 +103,6 @@ impl BundlePoller { } } - /// Utility to log a bundle and its contents. - fn log_bundle(bundle: &TxCacheBundle) { - let bundle_id = bundle.bundle().bundle.replacement_uuid.as_ref(); - trace!( - bundle_id = ?bundle_id, - "found bundle in tx cache" - ); - - for (i, tx) in bundle.bundle().txs().iter().enumerate() { - let hash = keccak256(tx); - trace!(bundle_id = ?bundle_id, tx = %hash, number = i, "bundle tx"); - } - - if let Some(ref fill) = bundle.bundle().host_fills { - trace!(bundle_id = ?bundle_id, fill_owner = %fill.permit.owner, fill_nonce = %fill.permit.permit.nonce, "bundle fill owner and nonce"); - for output in fill.outputs.iter() { - trace!(bundle_id = ?bundle_id, token = ?output.token, recipient = ?output.recipient, "bundle fill token and recipient") - } - } - } - /// Spawns a task that sends bundles it finds to its channel sender. pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = unbounded_channel(); diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs index c07e2ced..65c54a5c 100644 --- a/src/tasks/cache/task.rs +++ b/src/tasks/cache/task.rs @@ -1,5 +1,5 @@ use crate::tasks::env::SimEnv; -use alloy::consensus::TxEnvelope; +use alloy::{consensus::TxEnvelope, primitives::keccak256}; use init4_bin_base::deps::tracing::{debug, info}; use signet_sim::SimCache; use signet_tx_cache::types::TxCacheBundle; @@ -7,6 +7,7 @@ use tokio::{ sync::{mpsc, watch}, task::JoinHandle, }; +use tracing::trace; /// Cache task for the block builder. /// @@ -52,9 +53,11 @@ impl CacheTask { } } Some(bundle) = self.bundles.recv() => { + Self::log_bundle(&bundle); cache.add_item(bundle.bundle, basefee); } Some(txn) = self.txns.recv() => { + trace!(tx = %txn.hash(), "ingesting transaction into cache"); cache.add_item(txn, basefee); } } @@ -68,4 +71,25 @@ impl CacheTask { let fut = self.task_future(sim_cache); (c, tokio::spawn(fut)) } + + /// Utility to log a bundle and its contents. + fn log_bundle(bundle: &TxCacheBundle) { + let bundle_id = bundle.bundle().bundle.replacement_uuid.as_ref(); + trace!( + bundle_id = ?bundle_id, + "ingesting bundle into cache" + ); + + for (i, tx) in bundle.bundle().txs().iter().enumerate() { + let hash = keccak256(tx); + trace!(bundle_id = ?bundle_id, tx = %hash, number = i, "bundle tx"); + } + + if let Some(ref fill) = bundle.bundle().host_fills { + trace!(bundle_id = ?bundle_id, fill_owner = %fill.permit.owner, fill_nonce = %fill.permit.permit.nonce, "bundle fill owner and nonce"); + for output in fill.outputs.iter() { + trace!(bundle_id = ?bundle_id, token = ?output.token, recipient = ?output.recipient, "bundle fill token and recipient") + } + } + } } From 207efed4df7de33314feb88a6ff9100387cac927 Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 17:27:19 +0200 Subject: [PATCH 6/7] chore: rm tracing changes --- src/tasks/cache/task.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs index 65c54a5c..608fd99b 100644 --- a/src/tasks/cache/task.rs +++ b/src/tasks/cache/task.rs @@ -1,5 +1,5 @@ use crate::tasks::env::SimEnv; -use alloy::{consensus::TxEnvelope, primitives::keccak256}; +use alloy::consensus::TxEnvelope; use init4_bin_base::deps::tracing::{debug, info}; use signet_sim::SimCache; use signet_tx_cache::types::TxCacheBundle; @@ -53,7 +53,6 @@ impl CacheTask { } } Some(bundle) = self.bundles.recv() => { - Self::log_bundle(&bundle); cache.add_item(bundle.bundle, basefee); } Some(txn) = self.txns.recv() => { @@ -71,25 +70,4 @@ impl CacheTask { let fut = self.task_future(sim_cache); (c, tokio::spawn(fut)) } - - /// Utility to log a bundle and its contents. - fn log_bundle(bundle: &TxCacheBundle) { - let bundle_id = bundle.bundle().bundle.replacement_uuid.as_ref(); - trace!( - bundle_id = ?bundle_id, - "ingesting bundle into cache" - ); - - for (i, tx) in bundle.bundle().txs().iter().enumerate() { - let hash = keccak256(tx); - trace!(bundle_id = ?bundle_id, tx = %hash, number = i, "bundle tx"); - } - - if let Some(ref fill) = bundle.bundle().host_fills { - trace!(bundle_id = ?bundle_id, fill_owner = %fill.permit.owner, fill_nonce = %fill.permit.permit.nonce, "bundle fill owner and nonce"); - for output in fill.outputs.iter() { - trace!(bundle_id = ?bundle_id, token = ?output.token, recipient = ?output.recipient, "bundle fill token and recipient") - } - } - } } From f095489b74fdd71ddab55863fc7abf5b95a54fcc Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 20 Jun 2025 17:29:30 +0200 Subject: [PATCH 7/7] chore: rm tx traces --- src/tasks/cache/task.rs | 2 -- src/tasks/cache/tx.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs index 608fd99b..c07e2ced 100644 --- a/src/tasks/cache/task.rs +++ b/src/tasks/cache/task.rs @@ -7,7 +7,6 @@ use tokio::{ sync::{mpsc, watch}, task::JoinHandle, }; -use tracing::trace; /// Cache task for the block builder. /// @@ -56,7 +55,6 @@ impl CacheTask { cache.add_item(bundle.bundle, basefee); } Some(txn) = self.txns.recv() => { - trace!(tx = %txn.hash(), "ingesting transaction into cache"); cache.add_item(txn, basefee); } } diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 50ea788a..ef75327d 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -82,8 +82,6 @@ impl TxPoller { let _guard = span.entered(); debug!(count = ?transactions.len(), "found transactions"); for tx in transactions.into_iter() { - trace!(tx = %tx.hash(), "sending transaction to outbound channel"); - if outbound.send(tx).is_err() { // If there are no receivers, we can shut down trace!("No receivers left, shutting down");