diff --git a/bin/builder.rs b/bin/builder.rs index b399b52..8f71603 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -45,7 +45,6 @@ async fn main() -> eyre::Result<()> { config: config.clone(), constants, outbound_tx_channel: tx_channel, - host_provider: host_provider.clone(), }; // Set up tx submission diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs deleted file mode 100644 index c427b5a..0000000 --- a/src/tasks/submit.rs +++ /dev/null @@ -1,583 +0,0 @@ -use crate::{ - config::{HostProvider, ZenithInstance}, - quincey::Quincey, - utils::{self, extract_signature_components}, -}; -use alloy::{ - consensus::{Header, SimpleCoder, constants::GWEI_TO_WEI}, - eips::{BlockId, BlockNumberOrTag}, - network::{TransactionBuilder, TransactionBuilder4844}, - primitives::{Bytes, FixedBytes, TxHash, U256}, - providers::{Provider as _, SendableTx, WalletProvider}, - rpc::{json_rpc::ErrorPayload, types::eth::TransactionRequest}, - sol_types::{SolCall, SolError}, - transports::TransportError, -}; -use eyre::bail; -use init4_bin_base::deps::{ - metrics::{counter, histogram}, - tracing::{Instrument, debug, debug_span, error, info, instrument, trace, warn}, -}; -use signet_constants::SignetSystemConstants; -use signet_sim::BuiltBlock; -use signet_types::{SignRequest, SignResponse}; -use signet_zenith::{ - BundleHelper::{self, BlockHeader, FillPermit2, submitCall}, - Zenith::{self, IncorrectHostBlock}, -}; -use std::time::Instant; -use tokio::{ - sync::mpsc::{self}, - task::JoinHandle, -}; - -use crate::tasks::block::sim::SimResult; - -macro_rules! spawn_provider_send { - ($provider:expr, $tx:expr) => { - { - let p = $provider.clone(); - let t = $tx.clone(); - tokio::spawn(async move { - p.send_tx_envelope(t).await.inspect_err(|e| { - warn!(%e, "error in transaction broadcast") - }) - }) - } - }; -} - -/// Represents the kind of revert that can occur during simulation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum SimRevertKind { - /// Incorrect host block error - IncorrectHostBlock, - /// Bad signature error - BadSignature, - /// One rollup block per host block error - OneRollupBlockPerHostBlock, - /// Unknown error - Unknown, -} - -impl From> for SimRevertKind { - fn from(data: Option) -> Self { - let Some(data) = data else { - return Self::Unknown; - }; - - if data.starts_with(&IncorrectHostBlock::SELECTOR) { - Self::IncorrectHostBlock - } else if data.starts_with(&Zenith::BadSignature::SELECTOR) { - Self::BadSignature - } else if data.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR) { - Self::OneRollupBlockPerHostBlock - } else { - Self::Unknown - } - } -} - -#[derive(Debug, Clone)] -/// Represents an error that occurs during simulation of a transaction. -pub struct SimErrorResp { - /// The error payload containing the error code and message. - pub err: ErrorPayload, - /// The kind of revert that occurred (or unknown if not recognized). - kind: SimRevertKind, -} - -impl core::fmt::Display for SimErrorResp { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!( - f, - "SimErrorResp {{ code: {}, message: {}, kind: {:?} }}", - self.code(), - self.message(), - self.kind - ) - } -} - -impl From for SimErrorResp { - fn from(err: ErrorPayload) -> Self { - Self::new(err) - } -} - -impl SimErrorResp { - /// Creates a new `SimRevertError` with the specified kind and error - /// payload. - pub fn new(err: ErrorPayload) -> Self { - let kind = err.as_revert_data().into(); - Self { err, kind } - } - - /// True if the error is an incorrect host block. - pub fn is_incorrect_host_block(&self) -> bool { - self.as_revert_data() - .map(|b| b.starts_with(&IncorrectHostBlock::SELECTOR)) - .unwrap_or_default() - } - - /// Attempts to decode the error payload as an [`IncorrectHostBlock`]. - pub fn as_incorrect_host_block(&self) -> Option { - self.as_revert_data().and_then(|data| IncorrectHostBlock::abi_decode(&data).ok()) - } - - /// True if the error is a [`Zenith::BadSignature`]. - pub fn is_bad_signature(&self) -> bool { - self.as_revert_data() - .map(|b| b.starts_with(&Zenith::BadSignature::SELECTOR)) - .unwrap_or_default() - } - - /// Attempts to decode the error payload as a [`Zenith::BadSignature`]. - pub fn as_bad_signature(&self) -> Option { - self.as_revert_data().and_then(|data| Zenith::BadSignature::abi_decode(&data).ok()) - } - - /// True if the error is a [`Zenith::OneRollupBlockPerHostBlock`]. - pub fn is_one_rollup_block_per_host_block(&self) -> bool { - self.as_revert_data() - .map(|b| b.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR)) - .unwrap_or_default() - } - - /// Attempts to decode the error payload as a - /// [`Zenith::OneRollupBlockPerHostBlock`]. - pub fn as_one_rollup_block_per_host_block(&self) -> Option { - self.as_revert_data() - .and_then(|data| Zenith::OneRollupBlockPerHostBlock::abi_decode(&data).ok()) - } - - /// True if the error is an unknown revert. - pub fn is_unknown(&self) -> bool { - !self.is_incorrect_host_block() - && !self.is_bad_signature() - && !self.is_one_rollup_block_per_host_block() - } - - /// Returns the revert data if available. - pub fn as_revert_data(&self) -> Option { - self.err.as_revert_data() - } - - /// Returns the JSON-RPC error code. - pub const fn code(&self) -> i64 { - self.err.code - } - - /// Returns the error message. - pub fn message(&self) -> &str { - &self.err.message - } -} - -/// Control flow for transaction submission. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ControlFlow { - /// Retry - Retry, - /// Skip - Skip, - /// Succesfully submitted - Done, -} - -/// Submits sidecars in ethereum txns to mainnet ethereum -#[derive(Debug)] -pub struct SubmitTask { - /// Zenith - pub zenith: ZenithInstance, - /// Quincey - pub quincey: Quincey, - /// Constants - pub constants: SignetSystemConstants, - /// Config - pub config: crate::config::BuilderConfig, - /// Channel over which to send pending transactions - pub outbound_tx_channel: mpsc::UnboundedSender, - /// Host provider for sending transactions and fetching block & header info - pub host_provider: HostProvider, -} - -impl SubmitTask { - /// Get the provider from the zenith instance - const fn provider(&self) -> &HostProvider { - self.zenith.provider() - } - - /// Constructs the signing request from the in-progress block passed to it and assigns the - /// correct height, chain ID, gas limit, and rollup reward address. - fn construct_sig_request(&self, contents: &BuiltBlock) -> SignRequest { - let host_block_number = - self.constants.rollup_block_to_host_block_num(contents.block_number()); - - SignRequest { - host_block_number: U256::from(host_block_number), - host_chain_id: U256::from(self.config.host_chain_id), - ru_chain_id: U256::from(self.config.ru_chain_id), - gas_limit: U256::from(self.config.rollup_block_gas_limit), - ru_reward_address: self.config.builder_rewards_address, - contents: *contents.contents_hash(), - } - } - - /// Encodes the sidecar and then builds the 4844 blob transaction from the provided header and signature values. - fn build_blob_tx( - &self, - fills: Vec, - header: BundleHelper::BlockHeader, - v: u8, - r: FixedBytes<32>, - s: FixedBytes<32>, - block: &BuiltBlock, - ) -> eyre::Result { - let data = submitCall { fills, header, v, r, s }.abi_encode(); - - let sidecar = block.encode_blob::().build()?; - - Ok(TransactionRequest::default().with_blob_sidecar(sidecar).with_input(data)) - } - - /// Prepares and sends the EIP-4844 transaction with a sidecar encoded with a rollup block to the network. - async fn submit_transaction( - &self, - retry_count: usize, - resp: &SignResponse, - block: &BuiltBlock, - ) -> eyre::Result { - let tx = self.prepare_tx(retry_count, resp, block).await?; - - self.send_transaction(resp, tx).await - } - - /// Prepares the transaction by extracting the signature components, creating the transaction - /// request, and simulating the transaction with a call to the host provider. - async fn prepare_tx( - &self, - retry_count: usize, - resp: &SignResponse, - block: &BuiltBlock, - ) -> Result { - // Get the latest host block header for gas estimation - let host_header = self.latest_host_header().await?; - - // Create the transaction request with the signature values - let tx: TransactionRequest = - self.new_tx_request(retry_count, resp, block, host_header).await?; - - // Simulate the transaction with a call to the host provider and report any errors - if let Err(err) = self.sim_with_call(&tx).await { - warn!(%err, "error in transaction simulation"); - } - - Ok(tx) - } - - /// Gets the host header from the host provider by fetching the latest block. - async fn latest_host_header(&self) -> eyre::Result
{ - let previous = self - .host_provider - .get_block(BlockId::Number(BlockNumberOrTag::Latest)) - .into_future() - .await?; - debug!(?previous, "got host block for hash"); - - match previous { - Some(block) => Ok(block.header.inner), - None => Err(eyre::eyre!("host block not found")), - } - } - - /// Simulates the transaction with a call to the host provider to check for reverts. - async fn sim_with_call(&self, tx: &TransactionRequest) -> eyre::Result<()> { - match self.provider().call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { - Err(TransportError::ErrorResp(e)) => { - let e = SimErrorResp::from(e); - bail!(e) - } - Err(e) => bail!(e), - _ => Ok(()), - } - } - - /// Creates a transaction request for the blob with the given header and signature values. - async fn new_tx_request( - &self, - retry_count: usize, - resp: &SignResponse, - block: &BuiltBlock, - host_header: Header, - ) -> Result { - // manually retrieve nonce - let nonce = - self.provider().get_transaction_count(self.provider().default_signer_address()).await?; - debug!(nonce, "assigned nonce"); - - // Extract the signature components from the response - let (v, r, s) = extract_signature_components(&resp.sig); - - let (max_fee_per_gas, max_priority_fee_per_gas, max_fee_per_blob_gas) = - calculate_gas(retry_count, host_header); - - // Build the block header - let header: BlockHeader = BlockHeader { - hostBlockNumber: resp.req.host_block_number, - rollupChainId: U256::from(self.config.ru_chain_id), - gasLimit: resp.req.gas_limit, - rewardAddress: resp.req.ru_reward_address, - blockDataHash: *block.contents_hash(), - }; - debug!(?header.hostBlockNumber, "built rollup block header"); - - // Extract fills from the built block - let fills = utils::convert_fills(block); - debug!(fill_count = fills.len(), "extracted fills from rollup block"); - - // Create a blob transaction with the blob header and signature values and return it - let tx = self - .build_blob_tx(fills, header, v, r, s, block)? - .with_to(self.config.builder_helper_address) - .with_max_fee_per_gas(max_fee_per_gas) - .with_max_priority_fee_per_gas(max_priority_fee_per_gas) - .with_max_fee_per_blob_gas(max_fee_per_blob_gas) - .with_nonce(nonce); - - Ok(tx) - } - - /// Fills the transaction request with the provider and sends it to the network - /// and any additionally configured broadcast providers. - async fn send_transaction( - &self, - resp: &SignResponse, - tx: TransactionRequest, - ) -> Result { - // assign the nonce and fill the rest of the values - let SendableTx::Envelope(tx) = self.provider().fill(tx).await? else { - bail!("failed to fill transaction") - }; - debug!(tx_hash = ?tx.hash(), host_block_number = %resp.req.host_block_number, "sending transaction to network"); - - // send the tx via the primary host_provider - let fut = spawn_provider_send!(self.provider(), &tx); - - // spawn send_tx futures on retry attempts for all additional broadcast host_providers - for host_provider in self.config.connect_additional_broadcast() { - spawn_provider_send!(&host_provider, &tx); - } - - // send the in-progress transaction over the outbound_tx_channel - if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() { - error!("receipts task gone"); - } - - if let Err(e) = fut.await? { - // Detect and handle transaction underprice errors - if matches!(e, TransportError::ErrorResp(ref err) if err.code == -32603) { - debug!(tx_hash = ?tx.hash(), "underpriced transaction error - retrying tx with gas bump"); - return Ok(ControlFlow::Retry); - } - - // Unknown error, log and skip - error!(error = %e, "Primary tx broadcast failed"); - return Ok(ControlFlow::Skip); - } - - info!( - tx_hash = %tx.tx_hash(), - ru_chain_id = %resp.req.ru_chain_id, - gas_limit = %resp.req.gas_limit, - "dispatched to network" - ); - - Ok(ControlFlow::Done) - } - - /// Handles the inbound block by constructing a signature request and submitting the transaction. - #[instrument(skip_all)] - async fn handle_inbound( - &self, - retry_count: usize, - block: &BuiltBlock, - ) -> eyre::Result { - info!(retry_count, txns = block.tx_count(), "handling inbound block"); - let sig_request = self.construct_sig_request(block); - - debug!( - host_block_number = %sig_request.host_block_number, - ru_chain_id = %sig_request.ru_chain_id, - tx_count = block.tx_count(), - "constructed signature request for host block" - ); - - let signed = self.quincey.get_signature(&sig_request).await?; - - self.submit_transaction(retry_count, &signed, block).await - } - - /// Handles the retry logic for the inbound block. - async fn retrying_handle_inbound( - &self, - block: &BuiltBlock, - retry_limit: usize, - ) -> eyre::Result { - let mut retries = 0; - let building_start_time = Instant::now(); - - let (current_slot, start, end) = self.calculate_slot_window(); - debug!(current_slot, start, end, "calculating target slot window"); - - // Retry loop - let result = loop { - let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); - - let inbound_result = - match self.handle_inbound(retries, block).instrument(span.clone()).await { - Ok(control_flow) => control_flow, - Err(err) => { - // Delay until next slot if we get a 403 error - if err.to_string().contains("403 Forbidden") { - let (slot_number, _, _) = self.calculate_slot_window(); - debug!(slot_number, "403 detected - skipping slot"); - return Ok(ControlFlow::Skip); - } else { - // Otherwise, log error and retry - error!(error = %err, "error handling inbound block"); - } - - ControlFlow::Retry - } - }; - - let guard = span.entered(); - - match inbound_result { - ControlFlow::Retry => { - retries += 1; - if retries > retry_limit { - counter!("builder.building_too_many_retries").increment(1); - debug!("retries exceeded - skipping block"); - return Ok(ControlFlow::Skip); - } - drop(guard); - debug!(retries, start, end, "retrying block"); - continue; - } - ControlFlow::Skip => { - counter!("builder.skipped_blocks").increment(1); - debug!(retries, "skipping block"); - break inbound_result; - } - ControlFlow::Done => { - counter!("builder.submitted_successful_blocks").increment(1); - debug!(retries, "successfully submitted block"); - break inbound_result; - } - } - }; - - // This is reached when `Done` or `Skip` is returned - let elapsed = building_start_time.elapsed().as_millis() as f64; - histogram!("builder.block_build_time").record(elapsed); - info!( - ?result, - tx_count = block.tx_count(), - block_number = block.block_number(), - build_time = ?elapsed, - "finished block building" - ); - Ok(result) - } - - /// Calculates and returns the slot number and its start and end timestamps for the current instant. - fn calculate_slot_window(&self) -> (u64, u64, u64) { - let now_ts = utils::now(); - let current_slot = self.config.slot_calculator.calculate_slot(now_ts); - let (start, end) = self.config.slot_calculator.calculate_slot_window(current_slot); - (current_slot, start, end) - } - - /// Task future for the submit task. This function runs the main loop of the task. - async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { - loop { - // Wait to receive a new block - let Some(sim_result) = inbound.recv().await else { - debug!("upstream task gone - exiting submit task"); - break; - }; - debug!(block_number = sim_result.block.block_number(), "submit channel received block"); - - // Don't submit empty blocks - if sim_result.block.is_empty() { - debug!( - block_number = sim_result.block.block_number(), - "received empty block - skipping" - ); - continue; - } - - if let Err(e) = self.retrying_handle_inbound(&sim_result.block, 3).await { - error!(error = %e, "error handling inbound block"); - continue; - } - } - } - - /// Spawns the in progress block building task - pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { - let (sender, inbound) = mpsc::unbounded_channel::(); - let handle = tokio::spawn(self.task_future(inbound)); - (sender, handle) - } -} - -/// Calculates gas parameters based on the block environment and retry count. -fn calculate_gas(retry_count: usize, prev_header: Header) -> (u128, u128, u128) { - let fallback_blob_basefee = 500; - let fallback_basefee = 7; - - let base_fee_per_gas = match prev_header.base_fee_per_gas { - Some(basefee) => basefee, - None => fallback_basefee, - }; - - let parent_blob_basefee = prev_header.excess_blob_gas.unwrap_or(0) as u128; - let blob_basefee = if parent_blob_basefee > 0 { - // Use the parent blob base fee if available - parent_blob_basefee - } else { - // Fallback to a default value if no blob base fee is set - fallback_blob_basefee - }; - - bump_gas_from_retries(retry_count, base_fee_per_gas, blob_basefee) -} - -/// Bumps the gas parameters based on the retry count, base fee, and blob base fee. -pub fn bump_gas_from_retries( - retry_count: usize, - basefee: u64, - blob_basefee: u128, -) -> (u128, u128, u128) { - const PRIORITY_FEE_BASE: u64 = 2 * GWEI_TO_WEI; - const BASE_MULTIPLIER: u128 = 2; - const BLOB_MULTIPLIER: u128 = 2; - - // Increase priority fee by 20% per retry - let priority_fee = - PRIORITY_FEE_BASE * (12u64.pow(retry_count as u32) / 10u64.pow(retry_count as u32)); - - // Max fee includes basefee + priority + headroom (double basefee, etc.) - let max_fee_per_gas = (basefee as u128) * BASE_MULTIPLIER + (priority_fee as u128); - let max_fee_per_blob_gas = blob_basefee * BLOB_MULTIPLIER * (retry_count as u128 + 1); - - trace!( - retry_count, - max_fee_per_gas, priority_fee, max_fee_per_blob_gas, "calculated bumped gas parameters" - ); - - (max_fee_per_gas, priority_fee as u128, max_fee_per_blob_gas) -} diff --git a/src/tasks/submit/mod.rs b/src/tasks/submit/mod.rs new file mode 100644 index 0000000..1aa0d75 --- /dev/null +++ b/src/tasks/submit/mod.rs @@ -0,0 +1,8 @@ +mod prep; +pub use prep::{Bumpable, SubmitPrep}; + +mod sim_err; +pub use sim_err::{SimErrorResp, SimRevertKind}; + +mod task; +pub use task::{ControlFlow, SubmitTask}; diff --git a/src/tasks/submit/prep.rs b/src/tasks/submit/prep.rs new file mode 100644 index 0000000..c3579b9 --- /dev/null +++ b/src/tasks/submit/prep.rs @@ -0,0 +1,190 @@ +use crate::{ + config::{BuilderConfig, HostProvider}, + quincey::Quincey, + utils, +}; +use alloy::{ + consensus::{Header, SimpleCoder}, + network::{TransactionBuilder, TransactionBuilder4844}, + primitives::{B256, U256}, + providers::{Provider, WalletProvider}, + rpc::types::TransactionRequest, + sol_types::SolCall, +}; +use init4_bin_base::deps::tracing::debug; +use signet_constants::SignetSystemConstants; +use signet_sim::BuiltBlock; +use signet_types::{SignRequest, SignResponse}; +use signet_zenith::BundleHelper; +use std::sync::OnceLock; + +/// Preparation logic for transactions issued to the host chain by the +/// [`SubmitTask`]. +/// +/// [`SubmitTask`]: crate::tasks::submit::SubmitTask +#[derive(Debug, Clone)] +pub struct SubmitPrep<'a> { + // The block we are preparing a transaction for + block: &'a BuiltBlock, + + // Info we need to prepare the transaction + provider: HostProvider, + quincey: Quincey, + config: BuilderConfig, + constants: SignetSystemConstants, + + // Memoized quincey request and response + sig_request: OnceLock, + quincey_resp: OnceLock, +} + +impl<'a> SubmitPrep<'a> { + /// Create a new `SubmitPrep` instance. + pub fn new( + block: &'a BuiltBlock, + provider: HostProvider, + quincey: Quincey, + config: BuilderConfig, + constants: SignetSystemConstants, + ) -> Self { + Self { + block, + sig_request: Default::default(), + quincey_resp: Default::default(), + provider, + quincey, + config, + constants, + } + } + + /// Construct a quincey signature request for the block. + fn sig_request(&self) -> &SignRequest { + self.sig_request.get_or_init(|| { + let host_block_number = + self.constants.rollup_block_to_host_block_num(self.block.block_number()); + + SignRequest { + host_block_number: U256::from(host_block_number), + host_chain_id: U256::from(self.config.host_chain_id), + ru_chain_id: U256::from(self.config.ru_chain_id), + gas_limit: U256::from(self.config.rollup_block_gas_limit), + ru_reward_address: self.config.builder_rewards_address, + contents: *self.block.contents_hash(), + } + }) + } + + /// Get the quincey signature response for the block. + async fn quincey_resp(&self) -> eyre::Result<&SignResponse> { + if let Some(resp) = self.quincey_resp.get() { + return Ok(resp); + } + + let sig = self.quincey.get_signature(self.sig_request()).await?; + Ok(self.quincey_resp.get_or_init(|| sig)) + } + + /// Get the signature components from the response. + async fn quincey_signature(&self) -> eyre::Result<(u8, B256, B256)> { + self.quincey_resp().await.map(|resp| &resp.sig).map(utils::extract_signature_components) + } + + /// Converts the fills in the block to a vector of `FillPermit2`. + fn fills(&self) -> Vec { + utils::convert_fills(self.block) + } + + /// Encodes the sidecar and then builds the 4844 blob transaction from the provided header and signature values. + async fn build_blob_tx(&self) -> eyre::Result { + let (v, r, s) = self.quincey_signature().await?; + + // Build the block header + let header = BundleHelper::BlockHeader { + hostBlockNumber: self.sig_request().host_block_number, + rollupChainId: U256::from(self.constants.ru_chain_id()), + gasLimit: self.sig_request().gas_limit, + rewardAddress: self.sig_request().ru_reward_address, + blockDataHash: *self.block.contents_hash(), + }; + debug!(?header.hostBlockNumber, "built rollup block header"); + + let data = BundleHelper::submitCall { fills: self.fills(), header, v, r, s }.abi_encode(); + + let sidecar = self.block.encode_blob::().build()?; + + Ok(TransactionRequest::default().with_blob_sidecar(sidecar).with_input(data)) + } + + async fn new_tx_request(&self) -> eyre::Result { + let nonce = + self.provider.get_transaction_count(self.provider.default_signer_address()).await?; + + debug!(nonce, "assigned nonce"); + + // Create a blob transaction with the blob header and signature values and return it + let tx = self + .build_blob_tx() + .await? + .with_to(self.config.builder_helper_address) + .with_nonce(nonce); + + Ok(tx) + } + + /// Prepares a transaction for submission to the host chain. + pub async fn prep_transaction(self, prev_host: &Header) -> eyre::Result { + let req = self.new_tx_request().await?; + Ok(Bumpable::new(req, prev_host)) + } +} + +/// A fee-bumpable transaction request for the host chain. +#[derive(Debug, Clone)] +pub struct Bumpable { + req: TransactionRequest, + bumps: usize, +} + +impl Bumpable { + /// Instantiate a new `Bumpable` transaction request. + pub fn new(mut req: TransactionRequest, prev_host: &Header) -> Self { + crate::utils::populate_initial_gas(&mut req, prev_host); + Self { req, bumps: 0 } + } + + /// Get a reference to the inner transaction request. + pub const fn req(&self) -> &TransactionRequest { + &self.req + } + + /// Get the current bump count. + pub const fn bump_count(&self) -> usize { + self.bumps + } + + /// Bump the fees for the transaction request. + pub fn bump(&mut self) { + self.bumps += 1; + + // Bump max_priority fee per gas by 20% + let mpfpg = self.req.max_priority_fee_per_gas.as_mut().expect("set on construction"); + let bump = *mpfpg / 5; + *mpfpg += bump; + + // Increase max_fee_per_gas by the same amount as we increased mpfpg + let mfpg = self.req.max_fee_per_gas.as_mut().expect("set on construction"); + *mfpg += bump; + + // Do not bump max_fee_per_blob_gas, as we require confirmation in a + // specific block, and the blob base fee in the block is known. + + debug!(new_mpfpg = mpfpg, new_mfpg = mfpg, "Bumped fees",); + } + + /// Bump the fees, and return a copy of the transaction request. + pub fn bumped(&mut self) -> TransactionRequest { + self.bump(); + self.req.clone() + } +} diff --git a/src/tasks/submit/sim_err.rs b/src/tasks/submit/sim_err.rs new file mode 100644 index 0000000..678301b --- /dev/null +++ b/src/tasks/submit/sim_err.rs @@ -0,0 +1,129 @@ +use alloy::{primitives::Bytes, rpc::json_rpc::ErrorPayload, sol_types::SolError}; +use signet_zenith::Zenith::{self, IncorrectHostBlock}; + +/// Represents the kind of revert that can occur during simulation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SimRevertKind { + /// Incorrect host block error + IncorrectHostBlock, + /// Bad signature error + BadSignature, + /// One rollup block per host block error + OneRollupBlockPerHostBlock, + /// Unknown error + Unknown, +} + +impl From> for SimRevertKind { + fn from(data: Option) -> Self { + let Some(data) = data else { + return Self::Unknown; + }; + + if data.starts_with(&IncorrectHostBlock::SELECTOR) { + Self::IncorrectHostBlock + } else if data.starts_with(&Zenith::BadSignature::SELECTOR) { + Self::BadSignature + } else if data.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR) { + Self::OneRollupBlockPerHostBlock + } else { + Self::Unknown + } + } +} + +#[derive(Debug, Clone)] +/// Represents an error that occurs during simulation of a transaction. +pub struct SimErrorResp { + /// The error payload containing the error code and message. + pub err: ErrorPayload, + /// The kind of revert that occurred (or unknown if not recognized). + kind: SimRevertKind, +} + +impl core::fmt::Display for SimErrorResp { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "SimErrorResp {{ code: {}, message: {}, kind: {:?} }}", + self.code(), + self.message(), + self.kind + ) + } +} + +impl From for SimErrorResp { + fn from(err: ErrorPayload) -> Self { + Self::new(err) + } +} + +impl SimErrorResp { + /// Creates a new `SimRevertError` with the specified kind and error + /// payload. + pub fn new(err: ErrorPayload) -> Self { + let kind = err.as_revert_data().into(); + Self { err, kind } + } + + /// True if the error is an incorrect host block. + pub fn is_incorrect_host_block(&self) -> bool { + self.as_revert_data() + .map(|b| b.starts_with(&IncorrectHostBlock::SELECTOR)) + .unwrap_or_default() + } + + /// Attempts to decode the error payload as an [`IncorrectHostBlock`]. + pub fn as_incorrect_host_block(&self) -> Option { + self.as_revert_data().and_then(|data| IncorrectHostBlock::abi_decode(&data).ok()) + } + + /// True if the error is a [`Zenith::BadSignature`]. + pub fn is_bad_signature(&self) -> bool { + self.as_revert_data() + .map(|b| b.starts_with(&Zenith::BadSignature::SELECTOR)) + .unwrap_or_default() + } + + /// Attempts to decode the error payload as a [`Zenith::BadSignature`]. + pub fn as_bad_signature(&self) -> Option { + self.as_revert_data().and_then(|data| Zenith::BadSignature::abi_decode(&data).ok()) + } + + /// True if the error is a [`Zenith::OneRollupBlockPerHostBlock`]. + pub fn is_one_rollup_block_per_host_block(&self) -> bool { + self.as_revert_data() + .map(|b| b.starts_with(&Zenith::OneRollupBlockPerHostBlock::SELECTOR)) + .unwrap_or_default() + } + + /// Attempts to decode the error payload as a + /// [`Zenith::OneRollupBlockPerHostBlock`]. + pub fn as_one_rollup_block_per_host_block(&self) -> Option { + self.as_revert_data() + .and_then(|data| Zenith::OneRollupBlockPerHostBlock::abi_decode(&data).ok()) + } + + /// True if the error is an unknown revert. + pub fn is_unknown(&self) -> bool { + !self.is_incorrect_host_block() + && !self.is_bad_signature() + && !self.is_one_rollup_block_per_host_block() + } + + /// Returns the revert data if available. + pub fn as_revert_data(&self) -> Option { + self.err.as_revert_data() + } + + /// Returns the JSON-RPC error code. + pub const fn code(&self) -> i64 { + self.err.code + } + + /// Returns the error message. + pub fn message(&self) -> &str { + &self.err.message + } +} diff --git a/src/tasks/submit/task.rs b/src/tasks/submit/task.rs new file mode 100644 index 0000000..a93b013 --- /dev/null +++ b/src/tasks/submit/task.rs @@ -0,0 +1,298 @@ +use crate::{ + config::{HostProvider, ZenithInstance}, + quincey::Quincey, + tasks::{ + block::sim::SimResult, + submit::{Bumpable, SimErrorResp, SubmitPrep}, + }, + utils, +}; +use alloy::{ + eips::BlockNumberOrTag, + primitives::TxHash, + providers::{Provider as _, SendableTx}, + rpc::types::eth::TransactionRequest, + transports::TransportError, +}; +use eyre::bail; +use init4_bin_base::deps::{ + metrics::{counter, histogram}, + tracing::{Instrument, debug, debug_span, error, info, warn}, +}; +use signet_constants::SignetSystemConstants; +use std::time::Instant; +use tokio::{sync::mpsc, task::JoinHandle}; + +macro_rules! spawn_provider_send { + ($provider:expr, $tx:expr) => { + { + let p = $provider.clone(); + let t = $tx.clone(); + tokio::spawn(async move { + p.send_tx_envelope(t).await.inspect_err(|e| { + warn!(%e, "error in transaction broadcast") + }) + }) + } + }; +} + +/// Control flow for transaction submission. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ControlFlow { + /// Retry + Retry, + /// Skip + Skip, + /// Succesfully submitted + Done, +} + +/// Submits sidecars in ethereum txns to mainnet ethereum +#[derive(Debug)] +pub struct SubmitTask { + /// Zenith + pub zenith: ZenithInstance, + /// Quincey + pub quincey: Quincey, + /// Constants + pub constants: SignetSystemConstants, + /// Config + pub config: crate::config::BuilderConfig, + /// Channel over which to send pending transactions + pub outbound_tx_channel: mpsc::UnboundedSender, +} + +impl SubmitTask { + /// Get the provider from the zenith instance + const fn provider(&self) -> &HostProvider { + self.zenith.provider() + } + + /// Simulates the transaction with a call to the host provider to check for reverts. + async fn sim_with_call(&self, tx: &TransactionRequest) -> eyre::Result<()> { + match self.provider().call(tx.clone()).block(BlockNumberOrTag::Pending.into()).await { + Err(TransportError::ErrorResp(e)) => { + let e = SimErrorResp::from(e); + bail!(e) + } + Err(e) => bail!(e), + _ => Ok(()), + } + } + + /// Fills the transaction request with the provider and sends it to the network + /// and any additionally configured broadcast providers. + async fn send_transaction(&self, tx: TransactionRequest) -> Result { + // assign the nonce and fill the rest of the values + let SendableTx::Envelope(tx) = self.provider().fill(tx).await? else { + bail!("failed to fill transaction") + }; + debug!(tx_hash = ?tx.hash(), "sending transaction to network"); + + // send the tx via the primary host_provider + let fut = spawn_provider_send!(self.provider(), &tx); + + // spawn send_tx futures on retry attempts for all additional broadcast host_providers + for host_provider in self.config.connect_additional_broadcast() { + spawn_provider_send!(&host_provider, &tx); + } + + // send the in-progress transaction over the outbound_tx_channel + if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() { + error!("receipts task gone"); + } + + if let Err(error) = fut.await? { + // Detect and handle transaction underprice errors + if matches!(error, TransportError::ErrorResp(ref err) if err.code == -32603) { + debug!(tx_hash = ?tx.hash(), "underpriced transaction error - retrying tx with gas bump"); + return Ok(ControlFlow::Retry); + } + + // Unknown error, log and skip + error!(%error, "Primary tx broadcast failed"); + return Ok(ControlFlow::Skip); + } + + info!( + tx_hash = %tx.tx_hash(), + "dispatched to network" + ); + + Ok(ControlFlow::Done) + } + + /// Handles the retry logic for the inbound block. + async fn retrying_send( + &self, + mut bumpable: Bumpable, + retry_limit: usize, + ) -> eyre::Result { + let submitting_start_time = Instant::now(); + + let (current_slot, start, end) = self.calculate_slot_window(); + debug!(current_slot, start, end, "calculating target slot window"); + + let mut req = bumpable.req().clone(); + + // Retry loop + let result = loop { + let span = debug_span!( + "SubmitTask::retrying_send", + retries = bumpable.bump_count(), + nonce = bumpable.req().nonce, + ); + + let inbound_result = match self.send_transaction(req).instrument(span.clone()).await { + Ok(control_flow) => control_flow, + Err(error) => { + // Log error and retry + error!(%error, "error handling inbound block"); + ControlFlow::Retry + } + }; + + let guard = span.entered(); + + match inbound_result { + ControlFlow::Retry => { + // bump the req + req = bumpable.bumped(); + if bumpable.bump_count() > retry_limit { + counter!("builder.building_too_many_retries").increment(1); + debug!("retries exceeded - skipping block"); + return Ok(ControlFlow::Skip); + } + drop(guard); + debug!(retries = bumpable.bump_count(), start, end, "retrying block"); + continue; + } + ControlFlow::Skip => { + counter!("builder.skipped_blocks").increment(1); + debug!(retries = bumpable.bump_count(), "skipping block"); + break inbound_result; + } + ControlFlow::Done => { + counter!("builder.submitted_successful_blocks").increment(1); + debug!(retries = bumpable.bump_count(), "successfully submitted block"); + break inbound_result; + } + } + }; + + // This is reached when `Done` or `Skip` is returned + let elapsed = submitting_start_time.elapsed().as_millis() as f64; + histogram!("builder.submit_timer").record(elapsed); + info!( + ?result, + build_time = ?elapsed, + "finished block submitting" + ); + Ok(result) + } + + /// Calculates and returns the slot number and its start and end timestamps for the current instant. + fn calculate_slot_window(&self) -> (u64, u64, u64) { + let now_ts = utils::now(); + let current_slot = self.config.slot_calculator.calculate_slot(now_ts); + let (start, end) = self.config.slot_calculator.calculate_slot_window(current_slot); + (current_slot, start, end) + } + + /// Task future for the submit task. This function runs the main loop of the task. + async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { + loop { + // Wait to receive a new block + let Some(sim_result) = inbound.recv().await else { + debug!("upstream task gone - exiting submit task"); + break; + }; + let ru_block_number = sim_result.block.block_number(); + let host_block_number = self.constants.rollup_block_to_host_block_num(ru_block_number); + + let span = debug_span!( + "SubmitTask::loop", + ru_block_number, + host_block_number, + block_tx_count = sim_result.block.tx_count(), + ); + let guard = span.enter(); + + debug!(ru_block_number, "submit channel received block"); + + // Don't submit empty blocks + if sim_result.block.is_empty() { + debug!(ru_block_number, "received empty block - skipping"); + continue; + } + + // drop guard before await + drop(guard); + + let Ok(Some(prev_host)) = self + .provider() + .get_block_by_number(host_block_number.into()) + .into_future() + .instrument(span.clone()) + .await + else { + let _guard = span.enter(); + warn!(ru_block_number, host_block_number, "failed to get previous host block"); + continue; + }; + + // Prep the span we'll use for the transaction submission + let submission_span = debug_span!( + parent: span, + "SubmitTask::tx_submission", + tx_count = sim_result.block.tx_count(), + host_block_number, + ru_block_number, + ); + + // Prepare the transaction request for submission + let prep = SubmitPrep::new( + &sim_result.block, + self.provider().clone(), + self.quincey.clone(), + self.config.clone(), + self.constants, + ); + let bumpable = match prep + .prep_transaction(&prev_host.header) + .instrument(submission_span.clone()) + .await + { + Ok(bumpable) => bumpable, + Err(error) => { + error!(%error, "failed to prepare transaction for submission"); + continue; + } + }; + + // Simulate the transaction to check for reverts + if let Err(error) = + self.sim_with_call(bumpable.req()).instrument(submission_span.clone()).await + { + error!(%error, "simulation failed for transaction"); + continue; + }; + + // Now send the transaction + if let Err(error) = + self.retrying_send(bumpable, 3).instrument(submission_span.clone()).await + { + error!(%error, "error dispatching block to host chain"); + continue; + } + } + } + + /// Spawns the in progress block building task + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, inbound) = mpsc::unbounded_channel::(); + let handle = tokio::spawn(self.task_future(inbound)); + (sender, handle) + } +} diff --git a/src/utils.rs b/src/utils.rs index 211da62..53a8414 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,9 @@ -use alloy::primitives::{B256, Signature}; +use alloy::{ + consensus::{Header, constants::GWEI_TO_WEI}, + eips::{eip1559::BaseFeeParams, eip7840::BlobParams}, + primitives::{B256, Signature}, + rpc::types::TransactionRequest, +}; use signet_sim::BuiltBlock; use signet_zenith::BundleHelper::FillPermit2; use std::time::UNIX_EPOCH; @@ -27,6 +32,23 @@ pub fn extract_signature_components(sig: &Signature) -> (u8, B256, B256) { (v, r, s) } +/// Populates the initial gas parameters for a transaction request based on the +/// previous block header. +pub fn populate_initial_gas(req: &mut TransactionRequest, prev_header: &Header) { + const STARTING_MPFPG: u128 = 2 * GWEI_TO_WEI as u128; + + let base_fee_per_gas = prev_header + .next_block_base_fee(BaseFeeParams::ethereum()) + .expect("signet deployed after 1559 active") as u128; + let blob_basefee = prev_header + .next_block_blob_fee(BlobParams::prague()) + .expect("signet deployed after 4844 active"); + + req.max_priority_fee_per_gas = Some(STARTING_MPFPG); + req.max_fee_per_gas = Some((base_fee_per_gas * 1025 / 1024) + STARTING_MPFPG); + req.max_fee_per_blob_gas = Some(blob_basefee); +} + #[cfg(test)] mod tests { use super::*;