Skip to content

Commit

Permalink
now LSMTree::load() and close() work fine. add corresponding test. ne…
Browse files Browse the repository at this point in the history
…ed more unit tests.
  • Loading branch information
brucechin authored and Lianke Qin committed Mar 14, 2020
1 parent 3a18e9f commit 9673909
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 46 deletions.
134 changes: 95 additions & 39 deletions src/lsm.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
#![allow(unstable)]

use crate::buffer;
use crate::data_type::{EntryT, ValueT, TOMBSTONE};
use crate::data_type::{EntryT, ValueT, ENTRY_SIZE, TOMBSTONE};
use crate::level;
use crate::merge;
use crate::run;
use rand::Rng;
use std::io;

//use bit_vec::Iter;
//use rand::distributions::weighted::WeightedError::TooMany;
//use std::borrow::Borrow;
use std::collections::HashMap;
//use std::ptr::null;
//use std::sync::{Arc, Mutex};
use crate::data_type;
use std::cmp::max;
use std::fs::read_dir;
use std::path::{Path, PathBuf};
use std::{fs, str};

Expand Down Expand Up @@ -48,7 +52,7 @@ impl LSMTree {
/// ```
///
/// use lsm_kv::lsm;
/// let mut lsm = lsm::LSMTree::new(100, 5, 10, 0.5, 4, "hello".to_string());
/// let mut lsm = lsm::LSMTree::new(100, 5, 10, 0.5, 4, "doc_test".to_string());
/// lsm.put("hello", "world");
/// lsm.put("facebook", "google");
/// lsm.put("amazon", "linkedin");
Expand All @@ -57,6 +61,11 @@ impl LSMTree {
/// lsm.del("hello");
/// assert_eq!(lsm.get("hello"), None);
/// lsm.range("amazon", "facebook");
/// lsm.close();
/// let mut lsm2 = lsm::LSMTree::new(100, 5, 10, 0.5, 4, "doc_test".to_string());
/// lsm2.load();
/// assert_eq!(lsm2.get("hello"), None);
/// assert_eq!(lsm2.get("facebook"), Some("google".to_string()));
///
/// ```
//TODO after loading the read will fail due to bloom filter problem
Expand Down Expand Up @@ -98,6 +107,7 @@ impl LSMTree {

pub fn get_run(&mut self, mut run_id: usize) -> Option<&mut run::Run> {
for level in &mut self.levels {
//println!("level len : {}", level.runs.len());
if run_id < level.runs.len() {
//println!("get run {}", run_id);
return level.runs.get_mut(run_id);
Expand All @@ -108,6 +118,14 @@ impl LSMTree {
None
}

pub fn num_runs(&self) -> usize {
let mut res: usize = 0;
for level in self.levels.iter() {
res += level.runs.len();
}
res
}

//compact level i data to level i+1
fn merge_down(&mut self, current: usize) {
let mut merge_ctx: merge::MergeContextT = merge::MergeContextT::new();
Expand Down Expand Up @@ -246,13 +264,13 @@ impl LSMTree {
}
_ => {
//not found in buffer, start searching in vector<Level>
println!("key {} not found in buffer", str::from_utf8(&key).unwrap());
for current_run in 0..self.depth * self.levels.len() as u64 {
//println!("key {} not found in buffer", str::from_utf8(&key).unwrap());
for current_run in 0..self.num_runs() as u64 {
let current_val: ValueT;
println!(
"search in Run {}, latest_run is {}",
current_run, latest_run
);
// println!(
// "search in Run {}, latest_run is {}",
// current_run, latest_run
// );
//let mut run: run::Run;
if latest_run >= 0 || (self.get_run(current_run as usize).is_none()) {
// Stop search if we discovered a key in another run, or
Expand All @@ -264,11 +282,11 @@ impl LSMTree {
// Couldn't find the key in the current run, so we need
// to keep searching.
//search();
println!(
"key {} not found in Run {}",
str::from_utf8(&key).unwrap(),
current_run
);
// println!(
// "key {} not found in Run {}",
// str::from_utf8(&key).unwrap(),
// current_run
// );
} else {
// Update val if the run is more recent than the
// last, then stop searching since there's no need
Expand Down Expand Up @@ -346,34 +364,60 @@ impl LSMTree {
//visit every run and load into LSMTree vec<Level>
if level_dir.is_dir() {
let files = fs::read_dir(level_dir)?;
let mut entries: Vec<PathBuf> = files.filter(Result::is_ok)
.map(|e| e.unwrap().path())
.collect();
let mut entries: Vec<PathBuf> = files
.filter(Result::is_ok)
.map(|e| e.unwrap().path())
.collect();
//make sure we read from Run-0.text to Run-max_runs.txt
entries.sort();
for run_file in entries {
let run_file_entry = run_file;
println!("cur file path is {:?}", run_file_entry);
//println!("cur file path is {:?}", run_file_entry);
let mut cur_run = run::Run::from(
max_size as u64,
self.bf_bits_per_entry,
depth,
run_file_entry,
);
//TODO do we need to have extra operations to connect cur_run and run_file_entry???
//reconstruct the bloom filter for cur_run
//reconstruct the bloom filter for cur_run. information like max_key, fence_pointers could be stored for easier reconstructions
let mut counter = 0;
for key in cur_run.get_keys().iter() {
//println!("{} set", str::from_utf8(key).unwrap());
if (counter % (page_size::get() / ENTRY_SIZE) as u64 == 0) {
cur_run.fence_pointers.push(key.clone());
}
cur_run.bloom_filter.set(key);
counter += 1;
cur_run.max_key = max(key.clone(), cur_run.max_key.clone());
}
cur_run.size = counter;
self.levels[depth].runs.push_back(cur_run);
}
}
println!("cur level has {} Runs", self.levels[depth].runs.len());
//println!("cur level has {} Runs", self.levels[depth].runs.len());
}

Ok(())
}

pub fn clear(&mut self) {
//remove all files and clear all Runs in self.levels
if let Ok(dir) = read_dir(format!("/tmp/{}/", self.tree_name)) {
for entry in dir {
if let Ok(entry) = entry {
let path = entry.path();
if entry.path().is_dir() {
fs::remove_dir_all(path).expect("fail to remove dir");
} else {
fs::remove_file(path).expect("fail to remove file");
}
}
}
self.levels.clear();
self.buffer.empty();
}
}

pub fn close(&mut self) {
//save the buffer as a Run in level 0 even if it is not full.
self.merge_down(0);
Expand Down Expand Up @@ -403,31 +447,43 @@ impl LSMTree {
}

#[test]
fn test_lsm() {
println!("hello lsm test");
}

#[test]
fn test_merge() {
let mut lsm = LSMTree::new(8, 5, 8, 0.5, 4, "hello".to_string());
for i in 0..100 {
fn test_close_load() {
let test_size = 1000;
let mut lsm = LSMTree::new(8, 5, 8, 0.5, 4, "close_load_test".to_string());
for i in 0..test_size {
lsm.put(&i.to_string(), &i.to_string());
}
// for j in 0..100 {
// println!("{:?}", lsm.get(&j.to_string()));
// }
for j in 0..test_size {
assert_eq!(Some(j.to_string()), lsm.get(&j.to_string()));
}
lsm.close();
println!("close done");
let mut lsm2 = LSMTree::new(8, 5, 8, 0.5, 4, "hello".to_string());
let mut lsm2 = LSMTree::new(8, 5, 8, 0.5, 4, "close_load_test".to_string());
lsm2.load();
println!("load done");
for j in 0..200 {
println!("{:?}", lsm2.get(&j.to_string()));
for j in 0..test_size {
assert_eq!(Some(j.to_string()), lsm2.get(&j.to_string()));
}
}

#[test]
fn test_load() {
let mut lsm = LSMTree::new(10, 5, 8, 0.5, 4, "hello".to_string());
lsm.load();
fn test_range() {
let mut lsm = LSMTree::new(100, 5, 10, 0.5, 4, "hello".to_string());
lsm.put("hello", "world");
lsm.put("facebook", "google");
lsm.put("amazon", "linkedin");
assert_eq!(vec!["linkedin", "google"], lsm.range("amazon", "facebook"));
}

#[test]
fn test_clear() {
let test_size = 1000;
let mut lsm = LSMTree::new(8, 5, 8, 0.5, 4, "clear_test".to_string());
for i in 0..test_size {
lsm.put(&i.to_string(), &i.to_string());
}
lsm.clear();
for j in 0..test_size {
assert_eq!(None, lsm.get(&j.to_string()));
}
}
11 changes: 4 additions & 7 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ impl Run {
}
}

pub fn from(max_size: u64,
bf_bits_per_entry: f32,
level: usize,
file_path : PathBuf,
) -> Run{
Run {
pub fn from(max_size: u64, bf_bits_per_entry: f32, level: usize, file_path: PathBuf) -> Run {
Run {
bloom_filter: bloomfilter::Bloom::new(
(bf_bits_per_entry * max_size as f32) as usize,
max_size as usize,
Expand All @@ -98,7 +94,7 @@ impl Run {
}

pub fn map_read(&mut self, len: usize, offset: usize) -> Vec<EntryT> {
assert!(self.mapping.is_none());
//assert!(self.mapping.is_none());

match OpenOptions::new()
.read(true)
Expand Down Expand Up @@ -183,6 +179,7 @@ impl Run {
return None;
}
let page_index: usize;

match self.fence_pointers.binary_search(key) {
Ok(find) => {
page_index = find;
Expand Down

0 comments on commit 9673909

Please sign in to comment.