Skip to content

Commit

Permalink
fix(anvil): guard evm execute access (foundry-rs#2026)
Browse files Browse the repository at this point in the history
* fix(anvil): guard executor access

* perf: opt lock management
  • Loading branch information
mattsse authored Jun 20, 2022
1 parent d60cb8b commit 8f0ede2
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 91 deletions.
22 changes: 10 additions & 12 deletions anvil/src/eth/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ impl EthApi {
)?
.or_zero_fees();

let (exit, out, gas, _) = self.backend.call(request, fees, block_number)?;
let (exit, out, gas, _) = self.backend.call(request, fees, block_number).await?;

trace!(target = "node", "Call status {:?}, gas {}", exit, gas);

Expand Down Expand Up @@ -695,7 +695,7 @@ impl EthApi {

// ensure tx succeeds
let (exit, out, _, mut state) =
self.backend.call(request.clone(), FeeDetails::zero(), block_number)?;
self.backend.call(request.clone(), FeeDetails::zero(), block_number).await?;

ensure_return_ok(exit, &out)?;

Expand Down Expand Up @@ -1124,7 +1124,7 @@ impl EthApi {

// mine all the blocks
for _ in 0..blocks.as_u64() {
self.mine_one();
self.mine_one().await;

if let Some(interval) = interval {
tokio::time::sleep(Duration::from_secs(interval)).await;
Expand Down Expand Up @@ -1328,12 +1328,9 @@ impl EthApi {
}
}

// lock the miner
let _miner = self.miner.mode_write();

// mine all the blocks
for _ in 0..blocks_to_mine {
self.mine_one();
self.mine_one().await;
}

Ok("0x0".to_string())
Expand Down Expand Up @@ -1571,7 +1568,8 @@ impl EthApi {
call_to_estimate.gas = Some(gas_limit);

// execute the call without writing to db
let (exit, _, gas, _) = self.backend.call(call_to_estimate, fees.clone(), block_number)?;
let (exit, _, gas, _) =
self.backend.call(call_to_estimate, fees.clone(), block_number).await?;
match exit {
Return::Return | Return::Continue | Return::SelfDestruct | Return::Stop => {
// succeeded
Expand All @@ -1586,7 +1584,7 @@ impl EthApi {
return if request.gas.is_some() || request.gas_price.is_some() {
request.gas = Some(self.backend.gas_limit());
let (exit, out, _, _) =
self.backend.call(request.clone(), fees, block_number)?;
self.backend.call(request.clone(), fees, block_number).await?;
match exit {
return_ok!() => {
// transaction succeeded by manually increasing the gas limit to highest
Expand Down Expand Up @@ -1626,7 +1624,7 @@ impl EthApi {
while (highest_gas_limit - lowest_gas_limit) > U256::one() {
request.gas = Some(mid_gas_limit);
let (exit, _, _gas, _) =
self.backend.call(request.clone(), fees.clone(), block_number)?;
self.backend.call(request.clone(), fees.clone(), block_number).await?;
match exit {
return_ok!() => {
highest_gas_limit = mid_gas_limit;
Expand Down Expand Up @@ -1707,9 +1705,9 @@ impl EthApi {
}

/// Mines exactly one block
pub fn mine_one(&self) {
pub async fn mine_one(&self) {
let transactions = self.pool.ready_transactions().collect::<Vec<_>>();
let outcome = self.backend.mine_block(transactions);
let outcome = self.backend.mine_block(transactions).await;

trace!(target: "node", "mined block {}", outcome.block_number);
self.pool.on_mined_block(outcome);
Expand Down
57 changes: 57 additions & 0 deletions anvil/src/eth/backend/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,66 @@ use foundry_evm::{
revm::{BlockEnv, CfgEnv, Env, Return, SpecId, TransactOut},
trace::node::CallTraceNode,
};
use parking_lot::RwLock;
use std::sync::Arc;
use tracing::{trace, warn};

/// A lock that's used to guard access to evm execution, depending on the mode of the evm
///
/// The necessity for this type is that when in fork, transacting on the evm can take quite a bit of
/// time if the requested state needs to be fetched from the remote endpoint first. This can cause
/// block production to stall, because this is a blocking operation. This type is used to guard
/// access to evm execution.
///
/// This is only necessary in fork mode, so if `is_fork` is `false` `read` and `write` will be a
/// noop.
#[derive(Debug, Clone)]
pub(crate) struct EvmExecutorLock {
executor_lock: Arc<tokio::sync::RwLock<()>>,
is_fork: Arc<RwLock<bool>>,
}

// === impl EvmExecutorLock ===

impl EvmExecutorLock {
pub fn new(is_fork: bool) -> Self {
Self {
executor_lock: Arc::new(tokio::sync::RwLock::new(())),
is_fork: Arc::new(RwLock::new(is_fork)),
}
}

/// Sets the fork status
#[allow(unused)]
pub fn set_fork(&self, is_fork: bool) {
*self.is_fork.write() = is_fork
}

pub fn is_fork(&self) -> bool {
*self.is_fork.read()
}

/// Locks this RwLock with shared read access, causing the current task to yield until the lock
/// has been acquired.
pub async fn read(&self) -> EvmExecutorReadGuard<'_> {
let guard = if self.is_fork() { Some(self.executor_lock.read().await) } else { None };
EvmExecutorReadGuard(guard)
}

/// Locks this RwLock with exclusive write access, causing the current task to yield until the
/// lock has been acquired.
pub async fn write(&self) -> EvmExecutorWriteGuard<'_> {
let guard = if self.is_fork() { Some(self.executor_lock.write().await) } else { None };
EvmExecutorWriteGuard(guard)
}
}

#[derive(Debug)]
pub(crate) struct EvmExecutorReadGuard<'a>(Option<tokio::sync::RwLockReadGuard<'a, ()>>);

#[derive(Debug)]
pub(crate) struct EvmExecutorWriteGuard<'a>(Option<tokio::sync::RwLockWriteGuard<'a, ()>>);

/// Represents an executed transaction (transacted on the DB)
pub struct ExecutedTransaction {
transaction: Arc<PoolTransaction>,
Expand Down
156 changes: 90 additions & 66 deletions anvil/src/eth/backend/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
cheats,
cheats::CheatsManager,
db::Db,
executor::{ExecutedTransactions, TransactionExecutor},
executor::{EvmExecutorLock, ExecutedTransactions, TransactionExecutor},
fork::ClientFork,
genesis::GenesisConfig,
notifications::{NewBlockNotification, NewBlockNotifications},
Expand Down Expand Up @@ -90,6 +90,8 @@ pub struct Backend {
new_block_listeners: Arc<Mutex<Vec<UnboundedSender<NewBlockNotification>>>>,
/// keeps track of active snapshots at a specific block
active_snapshots: Arc<Mutex<HashMap<U256, (u64, H256)>>>,
/// A lock used to sync evm executor access
executor_lock: EvmExecutorLock,
}

impl Backend {
Expand All @@ -107,6 +109,7 @@ impl Backend {
fees,
genesis: Default::default(),
active_snapshots: Arc::new(Mutex::new(Default::default())),
executor_lock: EvmExecutorLock::new(false),
}
}

Expand Down Expand Up @@ -138,6 +141,7 @@ impl Backend {
blockchain,
states: Arc::new(RwLock::new(Default::default())),
env,
executor_lock: EvmExecutorLock::new(fork.is_some()),
fork,
time: Default::default(),
cheats: Default::default(),
Expand Down Expand Up @@ -427,83 +431,100 @@ impl Backend {
///
/// this will execute all transaction in the order they come in and return all the markers they
/// provide.
pub fn mine_block(&self, pool_transactions: Vec<Arc<PoolTransaction>>) -> MinedBlockOutcome {
tokio::task::block_in_place(|| self.do_mine_block(pool_transactions))
pub async fn mine_block(
&self,
pool_transactions: Vec<Arc<PoolTransaction>>,
) -> MinedBlockOutcome {
self.do_mine_block(pool_transactions).await
}

fn do_mine_block(&self, pool_transactions: Vec<Arc<PoolTransaction>>) -> MinedBlockOutcome {
async fn do_mine_block(
&self,
pool_transactions: Vec<Arc<PoolTransaction>>,
) -> MinedBlockOutcome {
trace!(target: "backend", "creating new block with {} transactions", pool_transactions.len());

let current_base_fee = self.base_fee();
// acquire all locks
let mut env = self.env.write();
let mut db = self.db.write();
let mut storage = self.blockchain.storage.write();

// store current state
self.states.write().insert(storage.best_hash, db.current_state());

// increase block number for this block
env.block.number = env.block.number.saturating_add(U256::one());
env.block.basefee = current_base_fee;
env.block.timestamp = self.time.next_timestamp().into();

let executor = TransactionExecutor {
db: &mut *db,
validator: self,
pending: pool_transactions.into_iter(),
block_env: env.block.clone(),
cfg_env: env.cfg.clone(),
parent_hash: storage.best_hash,
gas_used: U256::zero(),
};
let (outcome, header, block_hash) = {
let _lock = self.executor_lock.write().await;

let current_base_fee = self.base_fee();
// acquire all locks
let mut env = self.env.write();
let mut db = self.db.write();
let mut storage = self.blockchain.storage.write();

// store current state
self.states.write().insert(storage.best_hash, db.current_state());

// increase block number for this block
env.block.number = env.block.number.saturating_add(U256::one());
env.block.basefee = current_base_fee;
env.block.timestamp = self.time.next_timestamp().into();

let executor = TransactionExecutor {
db: &mut *db,
validator: self,
pending: pool_transactions.into_iter(),
block_env: env.block.clone(),
cfg_env: env.cfg.clone(),
parent_hash: storage.best_hash,
gas_used: U256::zero(),
};

// create the new block with the current timestamp
let ExecutedTransactions { block, included, invalid } = executor.execute();
let BlockInfo { block, transactions, receipts } = block;
// create the new block with the current timestamp
let ExecutedTransactions { block, included, invalid } = executor.execute();
let BlockInfo { block, transactions, receipts } = block;

let header = block.header.clone();
let header = block.header.clone();

let block_hash = block.header.hash();
let block_number: U64 = env.block.number.as_u64().into();

trace!(
target: "backend",
"Mined block {} with {} tx {:?}",
block_number,
transactions.len(),
transactions.iter().map(|tx| tx.transaction_hash).collect::<Vec<_>>()
);
let block_hash = block.header.hash();
let block_number: U64 = env.block.number.as_u64().into();

// update block metadata
storage.best_number = block_number;
storage.best_hash = block_hash;

storage.blocks.insert(block_hash, block);
storage.hashes.insert(block_number, block_hash);
trace!(
target: "backend",
"Mined block {} with {} tx {:?}",
block_number,
transactions.len(),
transactions.iter().map(|tx| tx.transaction_hash).collect::<Vec<_>>()
);

node_info!("");
// insert all transactions
for (info, receipt) in transactions.into_iter().zip(receipts) {
// log some tx info
{
node_info!(" Transaction: {:?}", info.transaction_hash);
if let Some(ref contract) = info.contract_address {
node_info!(" Contract created: {:?}", contract);
// update block metadata
storage.best_number = block_number;
storage.best_hash = block_hash;

storage.blocks.insert(block_hash, block);
storage.hashes.insert(block_number, block_hash);

node_info!("");
// insert all transactions
for (info, receipt) in transactions.into_iter().zip(receipts) {
// log some tx info
{
node_info!(" Transaction: {:?}", info.transaction_hash);
if let Some(ref contract) = info.contract_address {
node_info!(" Contract created: {:?}", contract);
}
node_info!(" Gas used: {}", receipt.gas_used());
}
node_info!(" Gas used: {}", receipt.gas_used());

let mined_tx = MinedTransaction {
info,
receipt,
block_hash,
block_number: block_number.as_u64(),
};
storage.transactions.insert(mined_tx.info.transaction_hash, mined_tx);
}
let timestamp = utc_from_secs(header.timestamp);

let mined_tx =
MinedTransaction { info, receipt, block_hash, block_number: block_number.as_u64() };
storage.transactions.insert(mined_tx.info.transaction_hash, mined_tx);
}
let timestamp = utc_from_secs(header.timestamp);
node_info!(" Block Number: {}", block_number);
node_info!(" Block Hash: {:?}", block_hash);
node_info!(" Block Time: {:?}\n", timestamp.to_rfc2822());

node_info!(" Block Number: {}", block_number);
node_info!(" Block Hash: {:?}", block_hash);
node_info!(" Block Time: {:?}\n", timestamp.to_rfc2822());
let outcome = MinedBlockOutcome { block_number, included, invalid };

(outcome, header, block_hash)
};
let next_block_base_fee = self.fees.get_next_block_base_fee_per_gas(
header.gas_used,
header.gas_limit,
Expand All @@ -516,21 +537,24 @@ impl Backend {
// update next base fee
self.fees.set_base_fee(next_block_base_fee.into());

MinedBlockOutcome { block_number, included, invalid }
outcome
}

/// Executes the `CallRequest` without writing to the DB
///
/// # Errors
///
/// Returns an error if the `block_number` is greater than the current height
pub fn call(
pub async fn call(
&self,
request: CallRequest,
fee_details: FeeDetails,
block_number: Option<BlockNumber>,
) -> Result<(Return, TransactOut, u64, State), BlockchainError> {
trace!(target: "backend", "calling from [{:?}] fees={:?}", request.from, fee_details);

let _lock = self.executor_lock.read().await;

let CallRequest { from, to, gas, value, data, nonce, access_list, .. } = request;

let FeeDetails { gas_price, max_fee_per_gas, max_priority_fee_per_gas } = fee_details;
Expand Down
10 changes: 6 additions & 4 deletions anvil/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ impl Stream for BlockProducer {
if !pin.queued.is_empty() {
if let Some(backend) = pin.idle_backend.take() {
let transactions = pin.queued.pop_front().expect("not empty; qed");
pin.block_mining = Some(Box::pin(futures::future::ready((
backend.mine_block(transactions),
backend,
))));
pin.block_mining = Some(Box::pin(async move {
trace!(target: "miner", "creating new block");
let block = backend.mine_block(transactions).await;
trace!(target: "miner", "created new block: {}", block.block_number);
(block, backend)
}));
}
}

Expand Down
Loading

0 comments on commit 8f0ede2

Please sign in to comment.