Skip to content

Commit

Permalink
Soften txpool p2p reputation requirements (FuelLabs#1663)
Browse files Browse the repository at this point in the history
It is fairly common for gossipped transactions to fail due to transient
reasons such as UTXO's being spent before the transaction is included. A
transaction that was valid when broadcast could become invalid if its
inputs were spent in a newly confirmed block. Penalizing nodes for this
could unfairly punish nodes that are simply "behind" in their view of
the blockchain state.

In bitcoin, only incorrectly formatted transactions (i.e. invalid
signatures or corrupt data according to consensus rules) are considered
immediately punishable, and all transient (ie. state dependent) failures
are ignored but unpunished.
https://github.com/bitcoin/bitcoin/blob/6ff0aa089c01ff3e610ecb47814ed739d685a14c/src/net_processing.cpp#L1849

However, bitcoin does implement rate limiting on peers to prevent spam
of either valid or invalid transactions to work around this. That is not
currently implemented in this PR (but noted in this issue: FuelLabs#1677)

https://github.com/bitcoin/bitcoin/blob/6ff0aa089c01ff3e610ecb47814ed739d685a14c/src/net_processing.cpp#L3842C23-L3842C37
  • Loading branch information
Voxelot authored Feb 20, 2024
1 parent a838222 commit 610385f
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 174 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Changed

- [#1663](https://github.com/FuelLabs/fuel-core/pull/1663): Reduce the punishment criteria for mempool gossipping.
- [#1658](https://github.com/FuelLabs/fuel-core/pull/1658): Removed `Receipts` table. Instead, receipts are part of the `TransactionStatuses` table.
- [#1640](https://github.com/FuelLabs/fuel-core/pull/1640): Upgrade to fuel-vm 0.45.0.
- [#1635](https://github.com/FuelLabs/fuel-core/pull/1635): Move updating of the owned messages and coins to off-chain worker.
Expand Down
7 changes: 6 additions & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ impl TxPoolPort for TxPoolAdapter {
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
self.service.insert(txs).await
self.service
.insert(txs)
.await
.into_iter()
.map(|res| res.map_err(anyhow::Error::from))
.collect()
}

fn tx_update_subscribe(
Expand Down
165 changes: 82 additions & 83 deletions crates/services/txpool/src/containers/dependency.rs

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ where
Some(Ok(_)) => {
GossipsubMessageAcceptance::Accept
},
Some(Err(_)) => {
// Use similar p2p punishment rules as bitcoin
// https://github.com/bitcoin/bitcoin/blob/6ff0aa089c01ff3e610ecb47814ed739d685a14c/src/net_processing.cpp#L1856
Some(Err(Error::ConsensusValidity(_))) | Some(Err(Error::MintIsDisallowed)) => {
GossipsubMessageAcceptance::Reject
}
},
_ => GossipsubMessageAcceptance::Ignore
}
}
Expand All @@ -262,14 +264,13 @@ where
}
};

if acceptance != GossipsubMessageAcceptance::Ignore {
let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};
// notify p2p layer about whether this tx was accepted
let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};

let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);
}
let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);

should_continue = true;
} else {
Expand Down Expand Up @@ -355,7 +356,7 @@ where
pub async fn insert(
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
) -> Vec<Result<InsertionResult, Error>> {
// verify txs
let current_height = *self.current_height.lock();

Expand Down
12 changes: 9 additions & 3 deletions crates/services/txpool/src/service/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ impl MockP2P {
});
Box::pin(stream)
});
p2p.expect_broadcast_transaction()
.returning(move |_| Ok(()));

p2p
}
}
Expand Down Expand Up @@ -190,7 +189,14 @@ impl TestContextBuilder {
let config = self.config.unwrap_or_default();
let mock_db = self.mock_db;

let p2p = self.p2p.unwrap_or_else(|| MockP2P::new_with_txs(vec![]));
let mut p2p = self.p2p.unwrap_or_else(|| MockP2P::new_with_txs(vec![]));
// set default handlers for p2p methods after test is set up, so they will be last on the FIFO
// ordering of methods handlers: https://docs.rs/mockall/0.12.1/mockall/index.html#matching-multiple-calls
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, _| Ok(()));
p2p.expect_broadcast_transaction()
.returning(move |_| Ok(()));

let importer = self
.importer
.unwrap_or_else(|| MockImporter::with_blocks(vec![]));
Expand Down
139 changes: 136 additions & 3 deletions crates/services/txpool/src/service/tests_p2p.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
use super::*;
use crate::service::test_helpers::{
MockP2P,
TestContextBuilder,
use crate::{
service::test_helpers::{
MockP2P,
TestContextBuilder,
},
test_helpers::TEST_COIN_AMOUNT,
};
use fuel_core_services::Service;
use fuel_core_storage::rand::{
prelude::StdRng,
SeedableRng,
};
use fuel_core_types::fuel_tx::{
field::Inputs,
AssetId,
Transaction,
TransactionBuilder,
UniqueIdentifier,
};
use std::{
Expand Down Expand Up @@ -133,3 +143,126 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() {
"expected a timeout because no broadcast should have occurred"
)
}

#[tokio::test]
async fn test_gossipped_transaction_with_check_error_rejected() {
// verify that gossipped transactions which fail basic sanity checks are rejected (punished)

let mut ctx_builder = TestContextBuilder::new();
// add coin to builder db and generate a valid tx
let mut tx1 = ctx_builder.setup_script_tx(10);
// now intentionally muck up the tx such that it will return a CheckError,
// by duplicating an input
let script = tx1.as_script_mut().unwrap();
let input = script.inputs()[0].clone();
script.inputs_mut().push(input);
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Reject);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}

#[tokio::test]
async fn test_gossipped_mint_rejected() {
// verify that gossipped mint transactions are rejected (punished)
let tx1 = TransactionBuilder::mint(
0u32.into(),
0,
Default::default(),
Default::default(),
1,
AssetId::BASE,
)
.finalize_as_transaction();
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Reject);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
// setup test context
let mut ctx_builder = TestContextBuilder::new();
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}

#[tokio::test]
async fn test_gossipped_transaction_with_transient_error_ignored() {
// verify that gossipped transactions that fails stateful checks are ignored (but not punished)
let mut rng = StdRng::seed_from_u64(100);
let mut ctx_builder = TestContextBuilder::new();
// add coin to builder db and generate a valid tx
let mut tx1 = ctx_builder.setup_script_tx(10);
// now intentionally muck up the tx such that it will return a coin not found error
// by replacing the default coin with one that is not in the database
let script = tx1.as_script_mut().unwrap();
script.inputs_mut()[0] = crate::test_helpers::random_predicate(
&mut rng,
AssetId::BASE,
TEST_COIN_AMOUNT,
None,
);
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Ignore);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}
37 changes: 15 additions & 22 deletions crates/services/txpool/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ where
fn insert_single(
&mut self,
tx: Checked<Transaction>,
) -> anyhow::Result<InsertionResult> {
) -> Result<InsertionResult, Error> {
let view = self.database.latest_view();
self.insert_inner(tx, &view)
}
Expand All @@ -274,32 +274,29 @@ where
&mut self,
tx: Checked<Transaction>,
view: &View,
) -> anyhow::Result<InsertionResult> {
) -> Result<InsertionResult, Error> {
let tx: CheckedTransaction = tx.into();

let tx = Arc::new(match tx {
CheckedTransaction::Script(script) => PoolTransaction::Script(script),
CheckedTransaction::Create(create) => PoolTransaction::Create(create),
CheckedTransaction::Mint(_) => {
return Err(anyhow::anyhow!("Mint transactions is not supported"))
}
CheckedTransaction::Mint(_) => return Err(Error::MintIsDisallowed),
});

if !tx.is_computed() {
return Err(Error::NoMetadata.into())
return Err(Error::NoMetadata)
}

// verify max gas is less than block limit
if tx.max_gas() > self.config.chain_config.block_gas_limit {
return Err(Error::NotInsertedMaxGasLimit {
tx_gas: tx.max_gas(),
block_limit: self.config.chain_config.block_gas_limit,
}
.into())
})
}

if self.by_hash.contains_key(&tx.id()) {
return Err(Error::NotInsertedTxKnown.into())
return Err(Error::NotInsertedTxKnown)
}

let mut max_limit_hit = false;
Expand All @@ -309,7 +306,7 @@ where
// limit is hit, check if we can push out lowest priced tx
let lowest_price = self.by_gas_price.lowest_value().unwrap_or_default();
if lowest_price >= tx.price() {
return Err(Error::NotInsertedLimitHit.into())
return Err(Error::NotInsertedLimitHit)
}
}
if self.config.metrics {
Expand Down Expand Up @@ -361,7 +358,7 @@ where
&mut self,
tx_status_sender: &TxStatusChange,
txs: Vec<Checked<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
) -> Vec<Result<InsertionResult, Error>> {
// Check if that data is okay (witness match input/output, and if recovered signatures ara valid).
// should be done before transaction comes to txpool, or before it enters RwLocked region.
let mut res = Vec::new();
Expand Down Expand Up @@ -402,7 +399,7 @@ pub async fn check_transactions(
txs: &[Arc<Transaction>],
current_height: BlockHeight,
config: &Config,
) -> Vec<anyhow::Result<Checked<Transaction>>> {
) -> Vec<Result<Checked<Transaction>, Error>> {
let mut checked_txs = Vec::with_capacity(txs.len());

for tx in txs.iter() {
Expand All @@ -417,9 +414,9 @@ pub async fn check_single_tx(
tx: Transaction,
current_height: BlockHeight,
config: &Config,
) -> anyhow::Result<Checked<Transaction>> {
) -> Result<Checked<Transaction>, Error> {
if tx.is_mint() {
return Err(Error::NotSupportedTransactionType.into())
return Err(Error::NotSupportedTransactionType)
}

verify_tx_min_gas_price(&tx, config)?;
Expand All @@ -428,24 +425,20 @@ pub async fn check_single_tx(
let consensus_params = &config.chain_config.consensus_parameters;

let tx = tx
.into_checked_basic(current_height, consensus_params)
.map_err(|e| anyhow::anyhow!("{e:?}"))?
.check_signatures(&consensus_params.chain_id)
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
.into_checked_basic(current_height, consensus_params)?
.check_signatures(&consensus_params.chain_id)?;

let tx = tx
.check_predicates_async::<TokioWithRayon>(&CheckPredicateParams::from(
consensus_params,
))
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
.await?;

debug_assert!(tx.checks().contains(Checks::all()));

tx
} else {
tx.into_checked_basic(current_height, &config.chain_config.consensus_parameters)
.map_err(|e| anyhow::anyhow!("{e:?}"))?
tx.into_checked_basic(current_height, &config.chain_config.consensus_parameters)?
};

Ok(tx)
Expand Down
Loading

0 comments on commit 610385f

Please sign in to comment.