Skip to content

Commit

Permalink
Implement limits on the size of transactions in ChunkStateWitness (ne…
Browse files Browse the repository at this point in the history
…ar#11406)

Fixes: near#11103

This PR adds 3 new limitations to control total size of transactions
included in ChunkStateWitness
1) Reduce `max_transaction_size` from 4MiB to 1.5MiB. Transactions
larger than 1.5MiB will be rejected.
2) Limit size of transactions included in the last two chunks to 2MiB.
`ChunkStateWitness` contains transactions from both the current and the
previous chunk, so we have to limit the sum of transactions from both of
those chunks.
3) Limit size of storage proof generated during transaction validation
to 500kiB (soft limit).

In total that limits the size transaction related fields to 2.5MiB.

About 1):
Having 4MiB transactions is troublesome, because it means that we have
to allow at least 4MiB of transactions to be included in
`ChunkStateWitness`. Having so much space taken up by the transactions
could cause problems with witness size. See
near#11379 for more information.

About 2):
`ChunkStateWitness` contains both transactions from the previous chunk
(`transactions`) and the new chunk (`new_transactions`). This is
annoying because it halves the space that we can reserve for
transactions. To make sure that the size stays reasonable we limit the
sum of both those fields to 2MiB. On current mainnet traffic the sum of
these fields stays under 400kiB, so 2MiB should be more than enough.
This limit has to be slightly higher than the limit for a single
transaction, so we can't make it 1MiB, it has to be at least 1.5MiB.

About 3):
On mainnet traffic the size of transactions storage proof is under
500kiB on most chunks, so adding this limit shouldn't affect the
throughput. I assume that every transactions generates a limited amount
of storage proof during validation, so we can have a soft limit for the
total size of storage proof. Implementing a hard limit would be
difficult because it's hard to predict how much storage proof will be
generated by validating a transaction.

Transactions are validated by running `prepare_transactions` on the
validator, so there's no need for separate validation code.
  • Loading branch information
jancionear authored May 30, 2024
1 parent 722a56b commit 8d3edac
Show file tree
Hide file tree
Showing 78 changed files with 1,105 additions and 287 deletions.
187 changes: 116 additions & 71 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ impl RuntimeAdapter for NightshadeRuntime {
time_limit: Option<Duration>,
) -> Result<PreparedTransactions, Error> {
let start_time = std::time::Instant::now();
let PrepareTransactionsChunkContext { shard_id, gas_limit } = chunk;
let PrepareTransactionsChunkContext { shard_id, gas_limit, .. } = chunk;

let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block.block_hash)?;
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
Expand Down Expand Up @@ -777,23 +777,19 @@ impl RuntimeAdapter for NightshadeRuntime {
usize::MAX
};

// In general, we limit the number of transactions via send_fees.
// However, as a second line of defense, we want to limit the byte size
// of transaction as well. Rather than introducing a separate config for
// the limit, we compute it heuristically from the gas limit and the
// cost of roundtripping a byte of data through disk. For today's value
// of parameters, this corresponds to about 13megs worth of
// transactions.
let size_limit = transactions_gas_limit
/ (runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_write_value_byte)
+ runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_read_value_byte));
let size_limit: u64 = calculate_transactions_size_limit(
protocol_version,
&runtime_config,
chunk.last_chunk_transactions_size,
transactions_gas_limit,
);
// for metrics only
let mut rejected_due_to_congestion = 0;
let mut rejected_invalid_tx = 0;
let mut rejected_invalid_for_chain = 0;

// Add new transactions to the result until some limit is hit or the transactions run out.
loop {
'add_txs_loop: while let Some(transaction_group_iter) = transaction_groups.next() {
if total_gas_burnt >= transactions_gas_limit {
result.limited_by = Some(PrepareTransactionsLimit::Gas);
break;
Expand All @@ -820,71 +816,90 @@ impl RuntimeAdapter for NightshadeRuntime {
}
}

if let Some(iter) = transaction_groups.next() {
while let Some(tx) = iter.next() {
num_checked_transactions += 1;

if ProtocolFeature::CongestionControl.enabled(protocol_version) {
let receiving_shard = EpochManagerAdapter::account_id_to_shard_id(
self.epoch_manager.as_ref(),
tx.transaction.receiver_id(),
&epoch_id,
)?;
if let Some(congestion_info) =
prev_block.congestion_info.get(&receiving_shard)
{
let congestion_control = CongestionControl::new(
runtime_config.congestion_control_config,
congestion_info.congestion_info,
congestion_info.missed_chunks_count,
);
if !congestion_control.shard_accepts_transactions() {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction due to congestion");
rejected_due_to_congestion += 1;
continue;
}
if checked_feature!("stable", WitnessTransactionLimits, protocol_version)
&& state_update.trie.recorded_storage_size()
> runtime_config
.witness_config
.new_transactions_validation_state_size_soft_limit
{
result.limited_by = Some(PrepareTransactionsLimit::StorageProofSize);
break;
}

// Take a single transaction from this transaction group
while let Some(tx_peek) = transaction_group_iter.peek_next() {
// Stop adding transactions if the size limit would be exceeded
if checked_feature!("stable", WitnessTransactionLimits, protocol_version)
&& total_size.saturating_add(tx_peek.get_size()) > size_limit as u64
{
result.limited_by = Some(PrepareTransactionsLimit::Size);
break 'add_txs_loop;
}

// Take the transaction out of the pool
let tx = transaction_group_iter
.next()
.expect("peek_next() returned Some, so next() should return Some as well");
num_checked_transactions += 1;

if ProtocolFeature::CongestionControl.enabled(protocol_version) {
let receiving_shard = EpochManagerAdapter::account_id_to_shard_id(
self.epoch_manager.as_ref(),
tx.transaction.receiver_id(),
&epoch_id,
)?;
if let Some(congestion_info) = prev_block.congestion_info.get(&receiving_shard)
{
let congestion_control = CongestionControl::new(
runtime_config.congestion_control_config,
congestion_info.congestion_info,
congestion_info.missed_chunks_count,
);
if !congestion_control.shard_accepts_transactions() {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction due to congestion");
rejected_due_to_congestion += 1;
continue;
}
}
}

// Verifying the transaction is on the same chain and hasn't expired yet.
if !chain_validate(&tx) {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction that failed chain validation");
rejected_invalid_for_chain += 1;
continue;
}
// Verifying the transaction is on the same chain and hasn't expired yet.
if !chain_validate(&tx) {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction that failed chain validation");
rejected_invalid_for_chain += 1;
continue;
}

// Verifying the validity of the transaction based on the current state.
match verify_and_charge_transaction(
runtime_config,
&mut state_update,
prev_block.next_gas_price,
&tx,
false,
Some(next_block_height),
protocol_version,
) {
Ok(verification_result) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation");
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
result.transactions.push(tx);
break;
}
Err(RuntimeError::InvalidTxError(err)) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that is invalid");
rejected_invalid_tx += 1;
state_update.rollback();
}
Err(RuntimeError::StorageError(err)) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction due to storage error");
return Err(Error::StorageError(err));
}
Err(err) => unreachable!("Unexpected RuntimeError error {:?}", err),
// Verifying the validity of the transaction based on the current state.
match verify_and_charge_transaction(
runtime_config,
&mut state_update,
prev_block.next_gas_price,
&tx,
false,
Some(next_block_height),
protocol_version,
) {
Ok(verification_result) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation");
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
result.transactions.push(tx);
// Take one transaction from this group, no more.
break;
}
Err(RuntimeError::InvalidTxError(err)) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that is invalid");
rejected_invalid_tx += 1;
state_update.rollback();
}
Err(RuntimeError::StorageError(err)) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction due to storage error");
return Err(Error::StorageError(err));
}
Err(err) => unreachable!("Unexpected RuntimeError error {:?}", err),
}
} else {
break;
}
}
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions);
Expand Down Expand Up @@ -1358,6 +1373,36 @@ fn chunk_tx_gas_limit(
}
}

fn calculate_transactions_size_limit(
protocol_version: ProtocolVersion,
runtime_config: &RuntimeConfig,
last_chunk_transactions_size: usize,
transactions_gas_limit: Gas,
) -> u64 {
if checked_feature!("stable", WitnessTransactionLimits, protocol_version) {
// Sum of transactions in the previous and current chunks should not exceed the limit.
// Witness keeps transactions from both previous and current chunk, so we have to limit the sum of both.
runtime_config
.witness_config
.combined_transactions_size_limit
.saturating_sub(last_chunk_transactions_size)
.try_into()
.expect("Can't convert usize to u64!")
} else {
// In general, we limit the number of transactions via send_fees.
// However, as a second line of defense, we want to limit the byte size
// of transaction as well. Rather than introducing a separate config for
// the limit, we compute it heuristically from the gas limit and the
// cost of roundtripping a byte of data through disk. For today's value
// of parameters, this corresponds to about 13megs worth of
// transactions.
let roundtripping_cost =
runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_write_value_byte)
+ runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_read_value_byte);
transactions_gas_limit / roundtripping_cost
}
}

impl node_runtime::adapter::ViewRuntimeAdapter for NightshadeRuntime {
fn view_account(
&self,
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,7 @@ fn prepare_transactions(
PrepareTransactionsChunkContext {
shard_id,
gas_limit: env.runtime.genesis_config.gas_limit,
last_chunk_transactions_size: 0,
},
PrepareTransactionsBlockContext {
next_gas_price: env.runtime.genesis_config.min_gas_price,
Expand Down
11 changes: 4 additions & 7 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use near_primitives::merkle::{merklize, MerklePath};
use near_primitives::receipt::{PromiseYieldTimeout, Receipt};
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::state_part::PartId;
use near_primitives::transaction::{ExecutionOutcomeWithId, SignedTransaction};
use near_primitives::types::validator_stake::{ValidatorStake, ValidatorStakeIter};
Expand Down Expand Up @@ -343,6 +342,7 @@ pub enum PrepareTransactionsLimit {
Size,
Time,
ReceiptCount,
StorageProofSize,
}

pub struct PrepareTransactionsBlockContext {
Expand All @@ -366,12 +366,9 @@ impl From<&Block> for PrepareTransactionsBlockContext {
pub struct PrepareTransactionsChunkContext {
pub shard_id: ShardId,
pub gas_limit: Gas,
}

impl From<&ShardChunkHeader> for PrepareTransactionsChunkContext {
fn from(header: &ShardChunkHeader) -> Self {
Self { shard_id: header.shard_id(), gas_limit: header.gas_limit() }
}
/// Size of transactions added in the last existing chunk.
/// Used to calculate the allowed size of transactions in a newly produced chunk.
pub last_chunk_transactions_size: usize,
}

/// Bridge between the chain and the runtime.
Expand Down
33 changes: 29 additions & 4 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ use near_primitives::sharding::{
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{AccountId, ApprovalStake, BlockHeight, EpochId, NumBlocks, ShardId};
use near_primitives::unwrap_or_return;
use near_primitives::utils::MaybeValidated;
use near_primitives::validator_signer::ValidatorSigner;
use near_primitives::version::PROTOCOL_VERSION;
use near_primitives::views::{CatchupStatusView, DroppedReason};
use near_primitives::{checked_feature, unwrap_or_return};
use near_store::ShardUId;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::cmp::max;
Expand Down Expand Up @@ -902,8 +902,17 @@ impl Client {
.get_chunk_extra(&prev_block_hash, &shard_uid)
.map_err(|err| Error::ChunkProducer(format!("No chunk extra available: {}", err)))?;

let prev_shard_id = self.epoch_manager.get_prev_shard_id(prev_block.hash(), shard_id)?;
let last_chunk_header =
prev_block.chunks().get(prev_shard_id as usize).cloned().ok_or_else(|| {
Error::ChunkProducer(format!(
"No last chunk in prev_block_hash {:?}, prev_shard_id: {}",
prev_block_hash, prev_shard_id
))
})?;
let last_chunk = self.chain.get_chunk(&last_chunk_header.chunk_hash())?;
let prepared_transactions =
self.prepare_transactions(shard_uid, prev_block, chunk_extra.as_ref())?;
self.prepare_transactions(shard_uid, prev_block, &last_chunk, chunk_extra.as_ref())?;
#[cfg(feature = "test_features")]
let prepared_transactions = Self::maybe_insert_invalid_transaction(
prepared_transactions,
Expand Down Expand Up @@ -1036,11 +1045,11 @@ impl Client {
&mut self,
shard_uid: ShardUId,
prev_block: &Block,
last_chunk: &ShardChunk,
chunk_extra: &ChunkExtra,
) -> Result<PreparedTransactions, Error> {
let Self { chain, sharded_tx_pool, runtime_adapter: runtime, .. } = self;
let shard_id = shard_uid.shard_id as ShardId;

let prepared_transactions = if let Some(mut iter) =
sharded_tx_pool.get_pool_iterator(shard_uid)
{
Expand All @@ -1050,9 +1059,25 @@ impl Client {
source: StorageDataSource::Db,
state_patch: Default::default(),
};
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block.hash())?;
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let last_chunk_transactions_size =
if checked_feature!("stable", WitnessTransactionLimits, protocol_version) {
borsh::to_vec(last_chunk.transactions())
.map_err(|e| {
Error::ChunkProducer(format!("Failed to serialize transactions: {e}"))
})?
.len()
} else {
0
};
runtime.prepare_transactions(
storage_config,
PrepareTransactionsChunkContext { shard_id, gas_limit: chunk_extra.gas_limit() },
PrepareTransactionsChunkContext {
shard_id,
gas_limit: chunk_extra.gas_limit(),
last_chunk_transactions_size,
},
prev_block.into(),
&mut iter,
&mut chain.transaction_validity_check(prev_block.header().clone()),
Expand Down
14 changes: 10 additions & 4 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use near_chain::chain::{
};
use near_chain::sharding::shuffle_receipt_proofs;
use near_chain::types::{
ApplyChunkBlockContext, ApplyChunkResult, PreparedTransactions, RuntimeAdapter,
RuntimeStorageConfig, StorageDataSource,
ApplyChunkBlockContext, ApplyChunkResult, PrepareTransactionsChunkContext,
PreparedTransactions, RuntimeAdapter, RuntimeStorageConfig, StorageDataSource,
};
use near_chain::validate::{
validate_chunk_with_chunk_extra, validate_chunk_with_chunk_extra_and_receipts_root,
Expand Down Expand Up @@ -240,12 +240,17 @@ pub(crate) fn validate_prepared_transactions(
chunk_header: &ShardChunkHeader,
storage_config: RuntimeStorageConfig,
transactions: &[SignedTransaction],
last_chunk_transactions: &[SignedTransaction],
) -> Result<PreparedTransactions, Error> {
let parent_block = chain.chain_store().get_block(chunk_header.prev_block_hash())?;

let last_chunk_transactions_size = borsh::to_vec(last_chunk_transactions)?.len();
runtime_adapter.prepare_transactions(
storage_config,
chunk_header.into(),
PrepareTransactionsChunkContext {
shard_id: chunk_header.shard_id(),
gas_limit: chunk_header.gas_limit(),
last_chunk_transactions_size,
},
(&parent_block).into(),
&mut TransactionGroupIteratorWrapper::new(transactions),
&mut chain.transaction_validity_check(parent_block.header().clone()),
Expand Down Expand Up @@ -345,6 +350,7 @@ pub(crate) fn pre_validate_chunk_state_witness(
&state_witness.chunk_header,
transactions_validation_storage_config,
&new_transactions,
&state_witness.transactions,
) {
Ok(result) => {
if result.transactions.len() != new_transactions.len() {
Expand Down
2 changes: 2 additions & 0 deletions chain/client/src/stateless_validation/shadow_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl Client {
let shard_id = chunk.shard_id();
let chunk_hash = chunk.chunk_hash();
let chunk_header = chunk.cloned_header();
let last_chunk = self.chain.get_chunk(&prev_chunk_header.chunk_hash())?;

let transactions_validation_storage_config = RuntimeStorageConfig {
state_root: chunk_header.prev_state_root(),
Expand All @@ -69,6 +70,7 @@ impl Client {
&chunk_header,
transactions_validation_storage_config,
chunk.transactions(),
last_chunk.transactions(),
) else {
return Err(Error::Other(
"Could not produce storage proof for new transactions".to_owned(),
Expand Down
Loading

0 comments on commit 8d3edac

Please sign in to comment.