Skip to content

Commit

Permalink
log cleaning. closes spacejam#79. closes spacejam#48. (spacejam#102)
Browse files Browse the repository at this point in the history
* add quickcheck to the log, fix a few bugs

* clean up replaced log entries on disk using fallocate

* fix issue where non-linux systems were not zeroing out log entries completely. added regression test.

* make clippy happy
  • Loading branch information
spacejam authored Sep 3, 2017
1 parent ab06a10 commit 0f10f12
Show file tree
Hide file tree
Showing 21 changed files with 677 additions and 430 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ target
Cargo.lock
*swp
*swo
rsdb.tmp.*
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsdb"
version = "0.10.0"
version = "0.11.0"
authors = ["Tyler Neely <[email protected]>"]
description = "a flash-sympathetic persistent lock-free B+ tree, pagecache, and log"
license = "Apache-2.0"
Expand All @@ -11,9 +11,9 @@ documentation = "https://docs.rs/rsdb/"
readme = "README.md"

[features]
default = ["libc"]
default = []
bench = ["clap", "num_cpus", "log", "rand", "rayon", "zstd", "env_logger"]
stress = ["docopt", "chan-signal", "rayon", "rand"]
stress = ["docopt", "chan-signal", "rayon", "rand", "zstd"]

[[bin]]
name = "bench"
Expand All @@ -24,14 +24,14 @@ name = "stress"
required-features = ["stress"]

[dependencies]
libc = "0.2"
crossbeam = "0.3"
coco = "0.2"
bincode = "0.8"
serde = "1.0"
serde_derive = "1.0"
lazy_static = "0.2"
log = {version = "0.3", optional = true}
libc = {version = "0.2", optional = true}
rayon = {version = "0.8", optional = true}
env_logger = {version = "0.4", optional = true}
num_cpus = {version = "1.6", optional = true}
Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# RSDB
# rsdb

[![Build Status](https://travis-ci.org/spacejam/rsdb.svg?branch=master)](https://travis-ci.org/spacejam/rsdb)
[![crates.io](http://meritbadge.herokuapp.com/rsdb)](https://crates.io/crates/rsdb)
Expand All @@ -18,7 +18,7 @@ The `Tree` has a C API, so you can use this from any mainstream language.
extern crate rsdb;

let tree = rsdb::Config::default()
.path(Some("rsdb.state".to_owned()))
.path(Some(path))
.tree();

// set and get
Expand All @@ -41,13 +41,12 @@ tree.del(&k);

* Quite young, there are lots of fuzz tests but don't bet a billion
dollar business on it yet!
* Log cleaning is not yet implemented, so if you write the same
key over and over, you will run out of disk space eventually.
This is going to be implemented in the next week!
* The C API is likely to change rapidly
* Has not yet received much attention for performance tuning,
it has an extremely high theoretical performance but there
is a bit of tuning to get there. This will be happening soon!
is a bit of tuning to get there. Currently only around 200k
operations per second with certain contrived workloads. This
will be improving soon!

# Contribution Welcome!

Expand All @@ -62,7 +61,7 @@ tree.del(&k);
- [x] zstd compression
- [x] configurable cache size
- [x] C API
- [ ] log cleaning
- [x] log cleaning
- [ ] merge operator support
- [ ] higher-level interface with multi-key transaction and snapshot support
- [ ] formal verification of lock-free algorithms via symbolic execution
Expand Down
70 changes: 32 additions & 38 deletions src/bin/stress.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
extern crate rsdb;
extern crate rand;

#[cfg(feature = "stress")]
#[macro_use]
extern crate serde_derive;

#[cfg(feature = "stress")]
extern crate docopt;

#[cfg(feature = "stress")]
extern crate chan_signal;
extern crate rand;
extern crate rsdb;

use std::thread;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

#[cfg(feature = "stress")]
use chan_signal::Signal;

#[cfg(feature = "stress")]
use docopt::Docopt;

use rand::{Rng, thread_rng};

const USAGE: &'static str = "
Expand All @@ -32,22 +22,32 @@ Options:
--duration=<s> Seconds to run for [default: 10].
";

#[cfg_attr(feature = "docopt", derive(Deserialize))]
#[derive(Deserialize)]
struct Args {
flag_threads: usize,
flag_burn_in: bool,
flag_duration: u64,
}

fn report(shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
let mut last = 0;
while !shutdown.load(Ordering::Relaxed) {
thread::sleep(std::time::Duration::from_secs(1));
let total = total.load(Ordering::Acquire);

println!("did {} ops", total - last);

last = total;
}
}

fn run(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
let mut rng = thread_rng();
let mut byte = || vec![rng.gen::<u8>()];
let mut rng = thread_rng();

let mut ops = 0;

while !shutdown.load(Ordering::Relaxed) {
ops += 1;
total.fetch_add(1, Ordering::Release);
let choice = rng.gen_range(0, 5);

match choice {
Expand All @@ -72,19 +72,15 @@ fn run(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>
}

}

total.fetch_add(ops, Ordering::Release);
}

fn main() {
#[cfg(feature = "stress")]
let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

let args: Args = Docopt::new(USAGE)
.and_then(|d| d.argv(std::env::args().into_iter()).deserialize())
.unwrap_or_else(|e| e.exit());

#[cfg(feature = "stress")]
let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

let total = Arc::new(AtomicUsize::new(0));
let shutdown = Arc::new(AtomicBool::new(false));

Expand All @@ -106,31 +102,29 @@ fn main() {

let now = std::time::Instant::now();

#[cfg(feature = "stress")]
let n_threads = args.flag_threads;

#[cfg(not(feature = "stress"))]
let n_threads = 4;

for _ in 0..n_threads {
for i in 0..n_threads + 1 {
let tree = tree.clone();
let shutdown = shutdown.clone();
let total = total.clone();
let t = thread::spawn(move || run(tree, shutdown, total));
threads.push(t);
}

#[cfg(feature = "stress")]
{
if args.flag_burn_in {
signal.recv();
println!("got shutdown signal, cleaning up...");
let t = if i == 0 {
thread::spawn(move || report(shutdown, total))
} else {
thread::sleep(std::time::Duration::from_secs(args.flag_duration));
}
thread::spawn(move || run(tree, shutdown, total))
};

threads.push(t);
}

#[cfg(not(feature = "stress"))] thread::sleep(std::time::Duration::from_secs(10));
if args.flag_burn_in {
println!("waiting on signal");
signal.recv();
println!("got shutdown signal, cleaning up...");
} else {
thread::sleep(std::time::Duration::from_secs(args.flag_duration));
}

shutdown.store(true, Ordering::SeqCst);

Expand Down
35 changes: 13 additions & 22 deletions src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ pub unsafe extern "C" fn rsdb_set(
let k = slice::from_raw_parts(key, keylen).to_vec();
let v = slice::from_raw_parts(val, vallen).to_vec();
(*db).set(k.clone(), v.clone());
mem::forget(k);
mem::forget(v);
}

/// Get the value of a key.
/// Caller is responsible for freeing the returned value with rsdb_free_buf if it's non-null.
/// Caller is responsible for freeing the returned value with `rsdb_free_buf` if it's non-null.
#[no_mangle]
pub unsafe extern "C" fn rsdb_get(
db: *mut Tree,
Expand All @@ -132,7 +130,6 @@ pub unsafe extern "C" fn rsdb_get(
) -> *mut c_char {
let k = slice::from_raw_parts(key as *const u8, keylen);
let res = (*db).get(k);
mem::forget(k);
match res {
None => ptr::null_mut(),
Some(v) => leak_buf(v, vallen),
Expand All @@ -144,14 +141,13 @@ pub unsafe extern "C" fn rsdb_get(
pub unsafe extern "C" fn rsdb_del(db: *mut Tree, key: *const c_char, keylen: size_t) {
let k = slice::from_raw_parts(key as *const u8, keylen);
(*db).del(k);
mem::forget(k);
}

/// Compare and swap.
/// Returns 1 if successful, 0 if unsuccessful.
/// Otherwise sets actual_val and actual_vallen to the current value,
/// which must be freed using rsdb_free_buf by the caller if non-null.
/// actual_val will be null and actual_vallen 0 if the current value is not set.
/// Otherwise sets `actual_val` and `actual_vallen` to the current value,
/// which must be freed using `rsdb_free_buf` by the caller if non-null.
/// `actual_val` will be null and `actual_vallen` 0 if the current value is not set.
#[no_mangle]
pub unsafe extern "C" fn rsdb_cas(
db: *mut Tree,
Expand All @@ -170,40 +166,37 @@ pub unsafe extern "C" fn rsdb_cas(
None
} else {
let old_slice = slice::from_raw_parts(old_val as *const u8, old_vallen);
let copy = old_slice.clone().to_vec();
mem::forget(old_slice);
let copy = old_slice.to_vec();
Some(copy)
};

let new = if new_vallen == 0 {
None
} else {
let new_slice = slice::from_raw_parts(new_val as *const u8, new_vallen);
let copy = new_slice.clone().to_vec();
mem::forget(new_slice);
let copy = new_slice.to_vec();
Some(copy)
};

let res = (*db).cas(k.clone(), old, new);
mem::forget(k);

match res {
Ok(()) => {
return 1;
1
}
Err(None) => {
*actual_vallen = 0;
return 0;
0
}
Err(Some(v)) => {
*actual_val = leak_buf(v, actual_vallen) as *const u8;
return 0;
0
}
}
}

/// Iterate from a starting key.
/// Caller is responsible for freeing the returned iterator with rsdb_free_iter.
/// Caller is responsible for freeing the returned iterator with `rsdb_free_iter`.
#[no_mangle]
pub unsafe extern "C" fn rsdb_scan<'a>(
db: *mut Tree,
Expand All @@ -216,7 +209,7 @@ pub unsafe extern "C" fn rsdb_scan<'a>(
}

/// Get they next kv pair from an iterator.
/// Caller is responsible for freeing the key and value with rsdb_free_buf.
/// Caller is responsible for freeing the key and value with `rsdb_free_buf`.
/// Returns 0 when exhausted.
#[no_mangle]
pub unsafe extern "C" fn rsdb_iter_next(
Expand All @@ -230,10 +223,8 @@ pub unsafe extern "C" fn rsdb_iter_next(
Some((k, v)) => {
*key = leak_buf(k, keylen);
*val = leak_buf(v, vallen);
return 1;
}
None => {
return 0;
1
}
None => 0,
}
}
Loading

0 comments on commit 0f10f12

Please sign in to comment.