Skip to content

Commit

Permalink
Introduce ttl eviction for RecycleStore (solana-labs#15513)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Feb 25, 2021
1 parent d8ba56e commit 21b4300
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 9 deletions.
22 changes: 21 additions & 1 deletion runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

const INTERVAL_MS: u64 = 100;
Expand All @@ -30,6 +30,13 @@ const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
const CLEAN_INTERVAL_BLOCKS: u64 = 100;

// This value is chosen to spread the dropping cost over 3 expiration checks
// RecycleStores are fully populated almost all of its lifetime. So, otherwise
// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;

pub type SnapshotRequestSender = Sender<SnapshotRequest>;
pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
pub type DroppedSlotsSender = Sender<Slot>;
Expand Down Expand Up @@ -286,6 +293,7 @@ impl AccountsBackgroundService {
let mut last_cleaned_block_height = 0;
let mut removed_slots_count = 0;
let mut total_remove_slots_time = 0;
let mut last_expiration_check_time = Instant::now();
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
Expand All @@ -304,6 +312,8 @@ impl AccountsBackgroundService {
&mut total_remove_slots_time,
);

Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);

// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.

Expand Down Expand Up @@ -397,6 +407,16 @@ impl AccountsBackgroundService {
*removed_slots_count = 0;
}
}

fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
let now = Instant::now();
if now.duration_since(*last_expiration_check_time).as_secs()
> RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
{
bank.expire_old_recycle_stores();
*last_expiration_check_time = now;
}
}
}

#[cfg(test)]
Expand Down
140 changes: 132 additions & 8 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,27 +615,61 @@ pub struct StoreAccountsTiming {

#[derive(Debug, Default)]
struct RecycleStores {
entries: Vec<Arc<AccountStorageEntry>>,
entries: Vec<(Instant, Arc<AccountStorageEntry>)>,
total_bytes: u64,
}

// 30 min should be enough to be certain there won't be any prospective recycle uses for given
// store entry
// That's because it already processed ~2500 slots and ~25 passes of AccountsBackgroundService
pub const EXPIRATION_TTL_SECONDS: u64 = 1800;

impl RecycleStores {
fn add_entry(&mut self, new_entry: Arc<AccountStorageEntry>) {
self.total_bytes += new_entry.total_bytes();
self.entries.push(new_entry)
self.entries.push((Instant::now(), new_entry))
}

fn iter(&self) -> std::slice::Iter<Arc<AccountStorageEntry>> {
fn iter(&self) -> std::slice::Iter<(Instant, Arc<AccountStorageEntry>)> {
self.entries.iter()
}

fn add_entries(&mut self, new_entries: Vec<Arc<AccountStorageEntry>>) {
self.total_bytes += new_entries.iter().map(|e| e.total_bytes()).sum::<u64>();
self.entries.extend(new_entries);
let now = Instant::now();
for new_entry in new_entries {
self.entries.push((now, new_entry));
}
}

fn expire_old_entries(&mut self) -> Vec<Arc<AccountStorageEntry>> {
let mut expired = vec![];
let now = Instant::now();
let mut expired_bytes = 0;
self.entries.retain(|(recycled_time, entry)| {
if now.duration_since(*recycled_time).as_secs() > EXPIRATION_TTL_SECONDS {
if Arc::strong_count(entry) >= 2 {
warn!(
"Expiring still in-use recycled StorageEntry anyway...: id: {} slot: {}",
entry.append_vec_id(),
entry.slot(),
);
}
expired_bytes += entry.total_bytes();
expired.push(entry.clone());
false
} else {
true
}
});

self.total_bytes -= expired_bytes;

expired
}

fn remove_entry(&mut self, index: usize) -> Arc<AccountStorageEntry> {
let removed_entry = self.entries.swap_remove(index);
let (_added_time, removed_entry) = self.entries.swap_remove(index);
self.total_bytes -= removed_entry.total_bytes();
removed_entry
}
Expand Down Expand Up @@ -2260,7 +2294,7 @@ impl AccountsDb {
let mut min = std::u64::MAX;
let mut avail = 0;
let mut recycle_stores = self.recycle_stores.write().unwrap();
for (i, store) in recycle_stores.iter().enumerate() {
for (i, (_recycled_time, store)) in recycle_stores.iter().enumerate() {
if Arc::strong_count(store) == 1 {
max = std::cmp::max(store.accounts.capacity(), max);
min = std::cmp::min(store.accounts.capacity(), min);
Expand Down Expand Up @@ -2996,6 +3030,25 @@ impl AccountsDb {
self.accounts_cache.report_size();
}

pub fn expire_old_recycle_stores(&self) {
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time");
let recycle_stores = self.recycle_stores.write().unwrap().expire_old_entries();
recycle_stores_write_elapsed.stop();

let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
drop(recycle_stores);
drop_storage_entries_elapsed.stop();

self.clean_accounts_stats
.purge_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.clean_accounts_stats
.purge_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
}

// `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then
// flushes:
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
Expand Down Expand Up @@ -4495,15 +4548,16 @@ impl AccountsDb {
self.print_count_and_status(label);
info!("recycle_stores:");
let recycle_stores = self.recycle_stores.read().unwrap();
for entry in recycle_stores.iter() {
for (recycled_time, entry) in recycle_stores.iter() {
info!(
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {} (recycled: {:?})",
entry.slot(),
entry.append_vec_id(),
*entry.count_and_status.read().unwrap(),
entry.approx_store_count.load(Ordering::Relaxed),
entry.accounts.len(),
entry.accounts.capacity(),
recycled_time,
);
}
}
Expand Down Expand Up @@ -8936,4 +8990,74 @@ pub mod tests {
assert!(slot_stores(&db, 0).is_empty());
assert!(!slot_stores(&db, 1).is_empty());
}

#[test]
fn test_recycle_stores_expiration() {
solana_logger::setup();

let dummy_path = Path::new("");
let dummy_slot = 12;
let dummy_size = 1000;

let dummy_id1 = 22;
let entry1 = Arc::new(AccountStorageEntry::new(
&dummy_path,
dummy_slot,
dummy_id1,
dummy_size,
));

let dummy_id2 = 44;
let entry2 = Arc::new(AccountStorageEntry::new(
&dummy_path,
dummy_slot,
dummy_id2,
dummy_size,
));

let mut recycle_stores = RecycleStores::default();
recycle_stores.add_entry(entry1);
recycle_stores.add_entry(entry2);
assert_eq!(recycle_stores.entry_count(), 2);

// no expiration for newly added entries
let expired = recycle_stores.expire_old_entries();
assert_eq!(
expired
.iter()
.map(|e| e.append_vec_id())
.collect::<Vec<_>>(),
Vec::<AppendVecId>::new()
);
assert_eq!(
recycle_stores
.iter()
.map(|(_, e)| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id1, dummy_id2]
);
assert_eq!(recycle_stores.entry_count(), 2);
assert_eq!(recycle_stores.total_bytes(), dummy_size * 2);

// expiration for only too old entries
recycle_stores.entries[0].0 =
Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1);
let expired = recycle_stores.expire_old_entries();
assert_eq!(
expired
.iter()
.map(|e| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id1]
);
assert_eq!(
recycle_stores
.iter()
.map(|(_, e)| e.append_vec_id())
.collect::<Vec<_>>(),
vec![dummy_id2]
);
assert_eq!(recycle_stores.entry_count(), 1);
assert_eq!(recycle_stores.total_bytes(), dummy_size);
}
}
4 changes: 4 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3888,6 +3888,10 @@ impl Bank {
.flush_accounts_cache(false, Some(self.slot()))
}

pub fn expire_old_recycle_stores(&self) {
self.rc.accounts.accounts_db.expire_old_recycle_stores()
}

fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) {
if let Some(old_account) = self.get_account(&pubkey) {
match new_account.lamports.cmp(&old_account.lamports) {
Expand Down

0 comments on commit 21b4300

Please sign in to comment.