Skip to content

Commit

Permalink
chore: wrap IO-intensive code with spawn_blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-melnychuk committed Sep 28, 2023
1 parent 8c03974 commit 992255b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 35 deletions.
41 changes: 28 additions & 13 deletions crates/rpc/src/v04/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pathfinder_common::{BlockHash, TransactionHash};
use pathfinder_executor::CallError;
use pathfinder_storage::BlockId;
use serde::{Deserialize, Serialize};
use tokio::task::JoinError;

use crate::{context::RpcContext, executor::ExecutionStateError, map_gateway_transaction};

Expand Down Expand Up @@ -47,34 +48,48 @@ impl From<CallError> for TraceBlockTransactionsError {
}
}

impl From<JoinError> for TraceBlockTransactionsError {
fn from(e: JoinError) -> Self {
Self::Internal(anyhow::anyhow!("Join error: {e}"))
}
}

pub async fn trace_block_transactions(
context: RpcContext,
input: TraceBlockTransactionsInput,
) -> Result<TraceBlockTransactionsOutput, TraceBlockTransactionsError> {
let transactions: Vec<_> = {
let mut db = context.storage.connection()?;
let tx = db.transaction()?;
tokio::task::spawn_blocking(move || {
let tx = db.transaction()?;

let (transactions, _): (Vec<_>, Vec<_>) = tx
.transaction_data_for_block(BlockId::Hash(input.block_hash))?
.ok_or(TraceBlockTransactionsError::InvalidBlockHash)?
.into_iter()
.unzip();
let (transactions, _): (Vec<_>, Vec<_>) = tx
.transaction_data_for_block(BlockId::Hash(input.block_hash))?
.ok_or(TraceBlockTransactionsError::InvalidBlockHash)?
.into_iter()
.unzip();

let hashes = transactions.iter().map(|tx| tx.hash()).collect::<Vec<_>>();
let hashes = transactions.iter().map(|tx| tx.hash()).collect::<Vec<_>>();

let transactions = transactions
.into_iter()
.map(|transaction| map_gateway_transaction(transaction, &tx))
.collect::<anyhow::Result<Vec<_>, _>>()?;
let transactions = transactions
.into_iter()
.map(|transaction| map_gateway_transaction(transaction, &tx))
.collect::<anyhow::Result<Vec<_>, _>>()?;

hashes.into_iter().zip(transactions.into_iter()).collect()
Ok::<_, TraceBlockTransactionsError>(
hashes.into_iter().zip(transactions.into_iter()).collect(),
)
})
.await??
};

let execution_state =
crate::executor::execution_state(context, pathfinder_common::BlockId::Latest, None).await?;

let traces = pathfinder_executor::trace_all(execution_state, transactions)?;
let traces = tokio::task::spawn_blocking(move || {
pathfinder_executor::trace_all(execution_state, transactions)
})
.await??;

let result = traces
.into_iter()
Expand Down
58 changes: 36 additions & 22 deletions crates/rpc/src/v04/method/trace_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pathfinder_common::{BlockId, TransactionHash};
use pathfinder_executor::{CallError, Transaction};
use serde::{Deserialize, Serialize};
use tokio::task::JoinError;

use crate::{context::RpcContext, executor::ExecutionStateError, map_gateway_transaction};

Expand Down Expand Up @@ -41,38 +42,51 @@ impl From<CallError> for TraceTransactionError {
}
}

impl From<JoinError> for TraceTransactionError {
fn from(e: JoinError) -> Self {
Self::Internal(anyhow::anyhow!("Join error: {e}"))
}
}

pub async fn trace_transaction(
context: RpcContext,
input: TraceTransactionInput,
) -> Result<TraceTransactionOutput, TraceTransactionError> {
let transactions: Vec<(TransactionHash, Transaction)> = {
let mut db = context.storage.connection()?;
let tx = db.transaction()?;

let block_hash = tx
.transaction_block_hash(input.transaction_hash)?
.ok_or(TraceTransactionError::InvalidTxnHash)?;

let (transactions, _): (Vec<_>, Vec<_>) = tx
.transaction_data_for_block(pathfinder_storage::BlockId::Hash(block_hash))?
.ok_or(TraceTransactionError::InvalidTxnHash)?
.into_iter()
.unzip();

let hashes = transactions.iter().map(|tx| tx.hash()).collect::<Vec<_>>();

let transactions = transactions
.into_iter()
.map(|transaction| map_gateway_transaction(transaction, &tx))
.collect::<anyhow::Result<Vec<_>, _>>()?;

hashes.into_iter().zip(transactions.into_iter()).collect()
tokio::task::spawn_blocking(move || {
let tx = db.transaction()?;

let block_hash = tx
.transaction_block_hash(input.transaction_hash)?
.ok_or(TraceTransactionError::InvalidTxnHash)?;

let (transactions, _): (Vec<_>, Vec<_>) = tx
.transaction_data_for_block(pathfinder_storage::BlockId::Hash(block_hash))?
.ok_or(TraceTransactionError::InvalidTxnHash)?
.into_iter()
.unzip();

let hashes = transactions.iter().map(|tx| tx.hash()).collect::<Vec<_>>();

let transactions = transactions
.into_iter()
.map(|transaction| map_gateway_transaction(transaction, &tx))
.collect::<anyhow::Result<Vec<_>, _>>()?;

Ok::<_, TraceTransactionError>(
hashes.into_iter().zip(transactions.into_iter()).collect(),
)
})
.await??
};

let execution_state = crate::executor::execution_state(context, BlockId::Latest, None).await?;

let trace =
pathfinder_executor::trace_one(execution_state, transactions, input.transaction_hash)?;
let trace = tokio::task::spawn_blocking(move || {
pathfinder_executor::trace_one(execution_state, transactions, input.transaction_hash)
})
.await??;

Ok(TraceTransactionOutput(trace.into()))
}

0 comments on commit 992255b

Please sign in to comment.