Skip to content

Commit

Permalink
Add API to merge small shards
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Sep 9, 2024
1 parent b4db947 commit 592c8e5
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 20 deletions.
127 changes: 119 additions & 8 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ShardRouter {
if shards.is_empty() {
shards = Self::create_initial_shards(&config, &stats, &threadpool)?;
}
let root = Self::treeify(shards, stats.clone(), threadpool.clone());
let root = Self::treeify(shards, &stats, &threadpool);
Ok(Self {
span: root.span(),
config,
Expand Down Expand Up @@ -115,7 +115,10 @@ impl ShardRouter {
if !filetype.is_file() {
continue;
}
if filename.starts_with("bottom_") || filename.starts_with("top_") {
if filename.starts_with("bottom_")
|| filename.starts_with("top_")
|| filename.starts_with("merge_")
{
std::fs::remove_file(entry.path())?;
continue;
} else if !filename.starts_with("shard_") {
Expand Down Expand Up @@ -161,14 +164,18 @@ impl ShardRouter {
Ok(shards)
}

fn calc_step(num_items: usize) -> u32 {
let step = (Self::END_OF_SHARDS as f64)
/ (num_items as f64 / Shard::EXPECTED_CAPACITY as f64).max(1.0);
1 << (step as u32).ilog2()
}

fn create_initial_shards(
config: &Arc<InternalConfig>,
stats: &Arc<InternalStats>,
threadpool: &Arc<CompactionThreadPool>,
) -> Result<Vec<Shard>> {
let step = (Self::END_OF_SHARDS as f64)
/ (config.expected_number_of_keys as f64 / Shard::EXPECTED_CAPACITY as f64).max(1.0);
let step = 1 << (step as u32).ilog2();
let step = Self::calc_step(config.expected_number_of_keys);

let mut shards = vec![];
let mut start = 0;
Expand Down Expand Up @@ -207,8 +214,8 @@ impl ShardRouter {

fn treeify(
shards: Vec<Shard>,
stats: Arc<InternalStats>,
threadpool: Arc<CompactionThreadPool>,
stats: &Arc<InternalStats>,
threadpool: &Arc<CompactionThreadPool>,
) -> ShardNode {
// algorithm: first find the smallest span, and let that be our base unit, say it's 1K. then go over
// 0..64K in 1K increments and pair up every consecutive pairs whose size is 1K. we count on the spans to be
Expand Down Expand Up @@ -309,7 +316,7 @@ impl ShardRouter {
}

let shards = Self::create_initial_shards(&self.config, &self.stats, &self.threadpool)?;
*guard = Self::treeify(shards, self.stats.clone(), self.threadpool.clone());
*guard = Self::treeify(shards, &self.stats, &self.threadpool);

Ok(())
}
Expand Down Expand Up @@ -382,4 +389,108 @@ impl ShardRouter {
}
}
}

fn _merge(
&self,
bottom: &ShardRouter,
top: &ShardRouter,
max_fill: usize,
shards_to_remove: &mut u32,
) -> Result<Option<ShardRouter>> {
if *shards_to_remove == 0 {
return Ok(None);
}

let bottom_guard = bottom.node.write();
let top_guard = top.node.write();

match (&*bottom_guard, &*top_guard) {
(ShardNode::Leaf(b), ShardNode::Leaf(t)) => {
if b.get_stats()?.num_items() > max_fill {
return Ok(None);
}
if t.get_stats()?.num_items() > max_fill {
return Ok(None);
}
if let Some(sh) = Shard::merge(b, t)? {
*shards_to_remove = *shards_to_remove - 1;
let span = sh.span.clone();
Ok(Some(ShardRouter {
config: self.config.clone(),
node: RwLock::new(ShardNode::Leaf(sh)),
span,
stats: self.stats.clone(),
threadpool: self.threadpool.clone(),
}))
} else {
Ok(None)
}
}
(ShardNode::Leaf(_), ShardNode::Vertex(b, t)) => {
if let Some(merged_top) = self._merge(&b, &t, max_fill, shards_to_remove)? {
self._merge(bottom, &merged_top, max_fill, shards_to_remove)
} else {
Ok(None)
}
}
(ShardNode::Vertex(b, t), ShardNode::Leaf(_)) => {
if let Some(merged_bottom) = self._merge(&b, &t, max_fill, shards_to_remove)? {
self._merge(&merged_bottom, top, max_fill, shards_to_remove)
} else {
Ok(None)
}
}
(ShardNode::Vertex(b1, t1), ShardNode::Vertex(b2, t2)) => {
let m1 = self._merge(b1, t1, max_fill, shards_to_remove)?;
let m2 = self._merge(b2, t2, max_fill, shards_to_remove)?;
match (m1, m2) {
(Some(m1), Some(m2)) => self._merge(&m1, &m2, max_fill, shards_to_remove),
(Some(m1), None) => self._merge(&m1, top, max_fill, shards_to_remove),
(None, Some(m2)) => self._merge(bottom, &m2, max_fill, shards_to_remove),
(None, None) => Ok(None),
}
}
}
}

pub(crate) fn merge_small_shards(&self, max_fill_level: f32) -> Result<bool> {
ensure!(max_fill_level > 0.0 && max_fill_level < 0.5);
let max_fill = (Shard::EXPECTED_CAPACITY as f32 * max_fill_level) as usize;

let mut num_items = 0usize;
let mut starting_num_shards = 0u32;
for count in self.call_on_all_shards(|sh| Ok(sh.get_stats()?.num_items()))? {
starting_num_shards += 1;
num_items += count;
}

let needed_shards = Self::END_OF_SHARDS
/ Self::calc_step(num_items.max(self.config.expected_number_of_keys));

if starting_num_shards <= needed_shards {
return Ok(false);
}
let mut shards_to_remove = starting_num_shards - needed_shards;

{
let mut guard = self.node.write();

match &*guard {
ShardNode::Leaf(_) => None,
ShardNode::Vertex(bottom, top) => {
self._merge(&bottom, &top, max_fill, &mut shards_to_remove)?
}
};

*guard = Self::treeify(
Self::load(&self.config, &self.stats, &self.threadpool)?,
&self.stats,
&self.threadpool,
);
}

let new_num_shards: u32 = self.call_on_all_shards(|_| Ok(1))?.iter().sum();

Ok(new_num_shards != starting_num_shards)
}
}
108 changes: 101 additions & 7 deletions src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,20 @@ impl CompactionThreadPool {
}
}

#[derive(Debug, Clone)]
pub(crate) struct ShardStats {
pub write_offset: usize,
pub wasted_bytes: usize,
pub num_inserts: usize,
pub num_removals: usize,
}

impl ShardStats {
pub(crate) fn num_items(&self) -> usize {
self.num_inserts - self.num_removals
}
}

pub(crate) struct Shard {
pub(crate) span: Range<u32>,
pub(crate) config: Arc<InternalConfig>,
Expand Down Expand Up @@ -617,6 +631,86 @@ impl Shard {
Ok((bottom, top))
}

pub(crate) fn merge(bottom: &Shard, top: &Shard) -> Result<Option<Shard>> {
let bottom_files = bottom.files.write();
let top_files = top.files.write();

let tmp_filename = bottom.config.dir_path.join(format!(
"merge_{:04x}-{:04x}",
bottom.span.start, top.span.end
));
let mmap_file = MmapFile::create(&tmp_filename, &bottom.config)?;

let combined = Shard::new(
bottom.span.start..top.span.end,
mmap_file,
bottom.config.clone(),
bottom.stats.clone(),
bottom.threadpool.clone(),
)?;
let combined_files = combined.files.write();

for row_idx in 0..NUM_ROWS {
let mut target_col = 0;
for files in [&bottom_files, &top_files] {
let src_row = &files.0.header().rows.0[row_idx];
for (src_col, &sig) in src_row.signatures.iter().enumerate() {
if sig == INVALID_SIG {
continue;
}
let (k, v) = files
.0
.read_kv(&combined.stats, src_row.offsets_and_sizes[src_col])?;
let ph = PartedHash::new(&combined.config.hash_seed, &k);
assert_eq!(row_idx, ph.row_selector());

let target_row = combined_files.0.row_mut(ph.row_selector());
if target_col >= ROW_WIDTH {
// too many items fall in this row, we can't merge
std::fs::remove_file(tmp_filename)?;
return Ok(None);
}
assert_eq!(
target_row.signatures[target_col], INVALID_SIG,
"row={} target_col={} sig={}",
row_idx, target_col, target_row.signatures[target_col]
);
target_row.offsets_and_sizes[target_col] =
combined_files.0.write_kv(&combined.stats, &k, &v)?;
std::sync::atomic::fence(Ordering::SeqCst);
target_row.signatures[target_col] = ph.signature();
combined_files
.0
.header()
.num_inserts
.fetch_add(1, Ordering::Relaxed);
target_col += 1;
}
}
}

let dst_filename = combined.config.dir_path.join(format!(
"shard_{:04x}-{:04x}",
combined.span.start, combined.span.end
));
let bottom_filename = combined.config.dir_path.join(format!(
"shard_{:04x}-{:04x}",
bottom.span.start, bottom.span.end
));
let top_filename = combined
.config
.dir_path
.join(format!("shard_{:04x}-{:04x}", top.span.start, top.span.end));

std::fs::rename(tmp_filename, dst_filename)?;
std::fs::remove_file(bottom_filename)?;
std::fs::remove_file(top_filename)?;

drop(combined_files);

Ok(Some(combined))
}

fn operate_on_row<T>(
&self,
row_idx: usize,
Expand Down Expand Up @@ -979,16 +1073,16 @@ impl Shard {
})
}

pub(crate) fn get_stats(&self) -> Result<(u64, u64, u64, u64)> {
pub(crate) fn get_stats(&self) -> Result<ShardStats> {
self.wait_for_compaction()?;
let files_guard = self.files.read();
let hdr = files_guard.0.header();
Ok((
hdr.write_offset.load(Ordering::Relaxed),
hdr.wasted_bytes.load(Ordering::Relaxed),
hdr.num_inserts.load(Ordering::Relaxed),
hdr.num_removals.load(Ordering::Relaxed),
))
Ok(ShardStats {
write_offset: hdr.write_offset.load(Ordering::Relaxed) as usize,
wasted_bytes: hdr.wasted_bytes.load(Ordering::Relaxed) as usize,
num_inserts: hdr.num_inserts.load(Ordering::Relaxed) as usize,
num_removals: hdr.num_removals.load(Ordering::Relaxed) as usize,
})
}
}

Expand Down
25 changes: 20 additions & 5 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,30 @@ impl CandyStore {
let mut stats = Stats::default();
self.stats.fill_stats(&mut stats);

for (occ, wasted, inserts, removals) in shard_stats {
for stats2 in shard_stats {
stats.num_shards += 1;
stats.occupied_bytes += occ as usize;
stats.wasted_bytes += wasted as usize;
stats.num_inserts += inserts as usize;
stats.num_removals += removals as usize;
stats.occupied_bytes += stats2.write_offset;
stats.wasted_bytes += stats2.wasted_bytes;
stats.num_inserts += stats2.num_inserts;
stats.num_removals += stats2.num_removals;
}
stats
}

/// Merges small shards (shards with a used capacity of less than `max_fill_level`), `max_fill_level` should
/// be a number between 0 and 0.5, the reasonable choice is 0.25.
///
/// Note 1: this is an expensive operation that takes a global lock on the store (no other operations can
/// take place while merging is in progress). Only use it if you expect the number of items to be at half or
/// less than what it was (i.e., after a peak period)
///
/// Note 2: merging will stop once we reach the number of shards required for [Config::expected_number_of_keys],
/// if configured
///
/// Returns true if any shards were merged, false otherwise
pub fn merge_small_shards(&self, max_fill_level: f32) -> Result<bool> {
self.root.merge_small_shards(max_fill_level)
}
}

// impl Drop for CandyStore {
Expand Down
Loading

0 comments on commit 592c8e5

Please sign in to comment.