Skip to content

Commit

Permalink
Split fetch_verified into p2p and rpc functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Nov 23, 2023
1 parent 0de40d5 commit 9bb2450
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,14 @@ struct DHTWithRPCFallbackClient {

type Commitments = [[u8; config::COMMITMENT_SIZE]];

#[async_trait]
impl Client for DHTWithRPCFallbackClient {
async fn fetch_verified(
impl DHTWithRPCFallbackClient {
async fn fetch_verified_from_dht(
&self,
block_number: u32,
block_hash: H256,
dimensions: Dimensions,
commitments: &Commitments,
positions: &[Position],
) -> Result<(Vec<Cell>, Vec<Position>, FetchStats)> {
) -> Result<(Vec<Cell>, Vec<Position>)> {
let (mut dht_fetched, mut unfetched) = self
.p2p_client
.fetch_cells_from_dht(block_number, positions)
Expand All @@ -99,57 +97,90 @@ impl Client for DHTWithRPCFallbackClient {
dht_fetched.retain(|cell| verified.contains(&cell.position));
unfetched.append(&mut unverified);

let (rpc_fetched, unfetched, dht_put_stats) = if self.disable_rpc {
(vec![], unfetched, None)
} else {
let mut fetched = self
.rpc_client
.request_kate_proof(block_hash, &unfetched)
.await?;
Ok((dht_fetched, unfetched))
}

let (verified, unverified) = proof::verify(
block_number,
dimensions,
&fetched,
commitments,
self.pp.clone(),
)
.context("Failed to verify fetched cells")?;
async fn fetch_verified_from_rpc(
&self,
block_number: u32,
block_hash: H256,
dimensions: Dimensions,
commitments: &Commitments,
positions: &[Position],
) -> Result<(Vec<Cell>, Vec<Position>)> {
let mut fetched = self
.rpc_client
.request_kate_proof(block_hash, positions)
.await?;

info!(
block_number,
cells_total = unfetched.len(),
cells_fetched = fetched.len(),
cells_verified = verified.len(),
"Cells fetched from RPC"
);
let (verified, unverified) = proof::verify(
block_number,
dimensions,
&fetched,
commitments,
self.pp.clone(),
)
.context("Failed to verify fetched cells")?;

info!(
block_number,
cells_total = positions.len(),
cells_fetched = fetched.len(),
cells_verified = verified.len(),
"Cells fetched from RPC"
);

fetched.retain(|cell| verified.contains(&cell.position));
fetched.retain(|cell| verified.contains(&cell.position));
Ok((fetched, unverified))
}
}

let begin = Instant::now();
#[async_trait]
impl Client for DHTWithRPCFallbackClient {
async fn fetch_verified(
&self,
block_number: u32,
block_hash: H256,
dimensions: Dimensions,
commitments: &Commitments,
positions: &[Position],
) -> Result<(Vec<Cell>, Vec<Position>, FetchStats)> {
let (dht_fetched, unfetched) = self
.fetch_verified_from_dht(block_number, dimensions, commitments, positions)
.await?;

let dht_put_success_rate = self
.p2p_client
.insert_cells_into_dht(block_number, fetched.clone())
.await;
if self.disable_rpc {
let stats = FetchStats::new(positions.len(), dht_fetched.len(), 0, None);
return Ok((dht_fetched, unfetched, stats));
};

info!(
let (rpc_fetched, unfetched) = self
.fetch_verified_from_rpc(
block_number,
"DHT PUT operation success rate: {dht_put_success_rate}"
);

(
fetched,
unverified,
Some((dht_put_success_rate as f64, begin.elapsed())),
block_hash,
dimensions,
commitments,
&unfetched,
)
};
.await?;

let begin = Instant::now();

let dht_put_success_rate = self
.p2p_client
.insert_cells_into_dht(block_number, rpc_fetched.clone())
.await;

info!(
block_number,
"DHT PUT operation success rate: {dht_put_success_rate}"
);

let stats = FetchStats::new(
positions.len(),
dht_fetched.len(),
rpc_fetched.len(),
dht_put_stats,
Some((dht_put_success_rate as f64, begin.elapsed())),
);

let mut fetched = vec![];
Expand Down

0 comments on commit 9bb2450

Please sign in to comment.