Skip to content

Commit

Permalink
Add pruning of expired records on given maintenance interval.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Feb 28, 2024
1 parent f97b514 commit ed094bb
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/bin/avail-light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ async fn run(shutdown: Controller<String>) -> Result<()> {
block_confidence_treshold: cfg.confidence,
replication_factor: cfg.replication_factor,
query_timeout: cfg.query_timeout,
pruning_interval: cfg.store_pruning_interval,
};

tokio::task::spawn(shutdown.with_cancel(avail_light::maintenance::run(
Expand Down
11 changes: 10 additions & 1 deletion src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;
use tracing::{error, info};

use crate::{
network::p2p::Client as P2pClient,
Expand All @@ -15,6 +15,7 @@ pub struct StaticConfigParams {
pub block_confidence_treshold: f64,
pub replication_factor: u16,
pub query_timeout: u32,
pub pruning_interval: u32,
}

pub async fn process_block(
Expand All @@ -23,6 +24,14 @@ pub async fn process_block(
static_config_params: StaticConfigParams,
metrics: &Arc<impl Metrics>,
) -> Result<()> {
if block_number % static_config_params.pruning_interval == 0 {
info!(block_number, "Pruning...");
match p2p_client.prune_expired_records().await {
Ok(pruned) => info!(block_number, pruned, "Pruning finished"),
Err(error) => error!(block_number, "Pruning failed: {error:#}"),
}
}

p2p_client
.shrink_kademlia_map()
.await
Expand Down
35 changes: 35 additions & 0 deletions src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,31 @@ impl BlockStat {
}
}

struct PruneExpiredRecords {
now: Instant,
response_sender: Option<oneshot::Sender<Result<usize>>>,
}

impl Command for PruneExpiredRecords {
fn run(&mut self, mut entries: EventLoopEntries) -> Result<(), Report> {
let store = entries.behavior_mut().kademlia.store_mut();

let before = store.records_iter().count();
store.retain(|_, record| !record.is_expired(self.now));
let after = store.records_iter().count();

self.response_sender
.take()
.unwrap()
.send(Ok(before - after))
.expect("PruneExpiredRecords receiver dropped");

Ok(())
}

fn abort(&mut self, _: Report) {}
}

struct StartListening {
addr: Multiaddr,
response_sender: Option<oneshot::Sender<Result<()>>>,
Expand Down Expand Up @@ -578,6 +603,16 @@ impl Client {
.await
}

pub async fn prune_expired_records(&self) -> Result<usize> {
self.execute_sync(|response_sender| {
Box::new(PruneExpiredRecords {
now: Instant::now(),
response_sender: Some(response_sender),
})
})
.await
}

// Since callers ignores DHT errors, debug logs are used to observe DHT behavior.
// Return type assumes that cell is not found in case when error is present.
async fn fetch_cell_from_dht(&self, block_number: u32, position: Position) -> Option<Cell> {
Expand Down
5 changes: 4 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ pub struct RuntimeConfig {
pub per_connection_event_buffer_size: usize,
pub dial_concurrency_factor: u8,
/// Sets the timeout for a single Kademlia query. (default: 60s).
pub query_timeout: u32,
pub store_pruning_interval: u32,
/// Sets the allowed level of parallelism for iterative Kademlia queries. (default: 3).
pub query_timeout: u32,
/// Sets the Kademlia record store pruning interval in blocks (default: 180).
pub query_parallelism: u16,
/// Sets the Kademlia caching strategy to use for successful lookups. (default: 1).
/// If set to 0, caching is disabled.
Expand Down Expand Up @@ -780,6 +782,7 @@ impl Default for RuntimeConfig {
task_command_buffer_size: 32,
per_connection_event_buffer_size: 7,
dial_concurrency_factor: 8,
store_pruning_interval: 180,
query_timeout: 10,
query_parallelism: 3,
caching_max_peers: 1,
Expand Down

0 comments on commit ed094bb

Please sign in to comment.