Skip to content

Commit

Permalink
feat: batch transactions import (FuelLabs#1349)
Browse files Browse the repository at this point in the history
Related issues:
- Closes FuelLabs#1315
- Closes FuelLabs#1089

This PR introduces a structure for holding data collections called
`Batch`. A Batch encapsulates a collection of homogeneous data returned
by a P2P request, retaining only successfully received items. A Batch
may contain fewer items than originally requested, in which case an
error has occurred.

For transactions, we now use batches to request and return transaction
data from peers. This means a reduced number of database trips and more
succinct (less chatty) p2p communication.

---------

Co-authored-by: xgreenx <[email protected]>
  • Loading branch information
Brandon Vrooman and xgreenx authored Oct 2, 2023
1 parent d2ef5de commit 5d1a9a3
Show file tree
Hide file tree
Showing 22 changed files with 1,086 additions and 665 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Description of the upcoming release here.

### Changed

- [#1349](https://github.com/FuelLabs/fuel-core/pull/1349): Updated peer-to-peer transactions API to support multiple blocks in a single request, and updated block synchronization to request multiple blocks based on the configured range of headers.
- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`.
- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`.
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ anyhow = "1.0"
async-trait = "0.1"
cynic = { version = "2.2.1", features = ["http-reqwest"] }
clap = "4.1"
derive_more = { version = "0.99" }
hyper = { version = "0.14.26" }
rand = "0.8"
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description = "Tx client and schema specification."
[dependencies]
anyhow = { workspace = true }
cynic = { workspace = true }
derive_more = { version = "0.99" }
derive_more = { workspace = true }
eventsource-client = { version = "0.10.2", optional = true }
fuel-core-types = { workspace = true, features = ["serde"] }
futures = { workspace = true, optional = true }
Expand Down
23 changes: 16 additions & 7 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -127,12 +127,21 @@ impl Database {
}
}

pub fn get_transactions_on_block(
pub fn get_transactions_on_blocks(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
Ok(self
.get_sealed_block_by_id(block_id)?
.map(|Sealed { entity: block, .. }| block.into_inner().1))
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
let transactions = block_height_range
.into_iter()
.map(BlockHeight::from)
.map(|block_height| {
let transactions = self
.get_sealed_block_by_height(&block_height)?
.map(|Sealed { entity: block, .. }| block.into_inner().1)
.map(Transactions);
Ok(transactions)
})
.collect::<StorageResult<_>>()?;
Ok(transactions)
}
}
9 changes: 4 additions & 5 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand Down Expand Up @@ -41,9 +40,9 @@ impl P2pDb for Database {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
self.get_transactions_on_block(block_id)
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
self.get_transactions_on_blocks(block_height_range)
}
}

Expand Down
43 changes: 19 additions & 24 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ use fuel_core_sync::ports::{
};
use fuel_core_types::{
blockchain::{
primitives::{
BlockId,
DaBlockHeight,
},
primitives::DaBlockHeight,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
Expand All @@ -28,6 +24,7 @@ use fuel_core_types::{
},
PeerId,
SourcePeer,
Transactions,
},
};
use std::ops::Range;
Expand All @@ -50,43 +47,41 @@ impl PeerToPeerPort for P2PAdapter {

async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
block_height_range: Range<u32>,
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
let result = if let Some(service) = &self.service {
service.get_sealed_block_headers(block_height_range).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
};
match result {
Ok((peer_id, headers)) => {
let peer_id: PeerId = peer_id.into();
let headers = peer_id.bind(headers);
Ok(headers)
}
Err(err) => Err(err),
}
}

async fn get_transactions(
&self,
block: SourcePeer<BlockId>,
) -> anyhow::Result<Option<Vec<Transaction>>> {
range: SourcePeer<Range<u32>>,
) -> anyhow::Result<Option<Vec<Transactions>>> {
let SourcePeer {
peer_id,
data: block,
} = block;
data: range,
} = range;
if let Some(service) = &self.service {
service
.get_transactions_from_peer(peer_id.into(), block)
.get_transactions_from_peer(peer_id.into(), range)
.await
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
fn report_peer(&self, peer: PeerId, report: PeerReportReason) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_report = self.process_report(report);
Expand Down
5 changes: 2 additions & 3 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,12 @@ impl ProtocolName for MessageExchangePostcardProtocol {

#[cfg(test)]
mod tests {
use fuel_core_types::blockchain::primitives::BlockId;

use super::*;

#[test]
fn test_request_size_fits() {
let m = RequestMessage::Transactions(BlockId::default());
let arbitrary_range = 2..6;
let m = RequestMessage::Transactions(arbitrary_range);
assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE);
}
}
23 changes: 14 additions & 9 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,17 @@ mod tests {
BlockHeader,
PartialBlockHeader,
},
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::{
Transaction,
TransactionBuilder,
},
services::p2p::GossipsubMessageAcceptance,
services::p2p::{
GossipsubMessageAcceptance,
Transactions,
},
};
use futures::{
future::join_all,
Expand Down Expand Up @@ -1557,7 +1559,7 @@ mod tests {
tokio::select! {
message_sent = rx_test_end.recv() => {
// we received a signal to end the test
assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage");
assert!(message_sent.unwrap(), "Received incorrect or missing message");
break;
}
node_a_event = node_a.next_event() => {
Expand Down Expand Up @@ -1604,7 +1606,7 @@ mod tests {
}
});
}
RequestMessage::Transactions(_) => {
RequestMessage::Transactions(_range) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();
Expand All @@ -1613,7 +1615,8 @@ mod tests {
let response_message = rx_orchestrator.await;

if let Ok(Some(transactions)) = response_message {
let _ = tx_test_end.send(transactions.len() == 5).await;
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
Expand Down Expand Up @@ -1647,7 +1650,8 @@ mod tests {
let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers)));
}
RequestMessage::Transactions(_) => {
let transactions = (0..5).map(|_| Transaction::default_test_tx()).collect();
let txs = (0..5).map(|_| Transaction::default_test_tx()).collect();
let transactions = vec![Transactions(txs)];
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
}
}
Expand All @@ -1662,8 +1666,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_transactions() {
request_response_works_with(RequestMessage::Transactions(BlockId::default()))
.await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::Transactions(arbitrary_range)).await
}

#[tokio::test]
Expand All @@ -1675,7 +1679,8 @@ mod tests {
#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_headers_range_inclusive() {
request_response_works_with(RequestMessage::SealedHeaders(2..6)).await
let arbitrary_range = 2..6;
request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await
}

#[tokio::test]
Expand Down
7 changes: 3 additions & 4 deletions crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use std::ops::Range;

Expand All @@ -29,8 +28,8 @@ pub trait P2pDb: Send + Sync {

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>>;
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>;
}

pub trait BlockHeightImporter: Send + Sync {
Expand Down
18 changes: 6 additions & 12 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ use std::{

use fuel_core_types::{
blockchain::{
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
use libp2p::PeerId;
use serde::{
Deserialize,
Serialize,
};
use serde_with::{
serde_as,
FromInto,
};
use thiserror::Error;
use tokio::sync::oneshot;

Expand All @@ -34,33 +29,32 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>(
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
// The Peer that requested the message receives the response over the wire in `NetworkResponse` format.
// It then unpacks it into `ResponseMessage`.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receving channel.
// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel.
// Client Peer: `RequestMessage` (send request)
// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response)
// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response)

#[serde_as]
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
pub enum RequestMessage {
Block(BlockHeight),
SealedHeaders(Range<u32>),
Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId),
Transactions(Range<u32>),
}

/// Final Response Message that p2p service sends to the Orchestrator
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ResponseMessage {
SealedBlock(Box<Option<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Vec<Transaction>>),
Transactions(Option<Vec<Transactions>>),
}

/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
Transactions(oneshot::Sender<Option<Vec<Transactions>>>),
}

/// Response that is sent over the wire
Expand All @@ -78,7 +72,7 @@ pub enum NetworkResponse {
pub enum OutboundResponse {
Block(Option<Arc<SealedBlock>>),
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Arc<Vec<Transaction>>>),
Transactions(Option<Arc<Vec<Transactions>>>),
}

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 5d1a9a3

Please sign in to comment.