Skip to content

Commit

Permalink
Document queue APIs; ensure user key/value sizes are checked; fix bug…
Browse files Browse the repository at this point in the history
… in MAX_VALUE_SIZE
  • Loading branch information
tomerfiliba committed Sep 12, 2024
1 parent 12e9db0 commit 7b85ba7
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub(crate) const MAX_TOTAL_VALUE_SIZE: usize = 0xffff; // 16 bits
pub(crate) const NAMESPACING_RESERVED_SIZE: usize = 0xff;
pub(crate) const VALUE_RESERVED_SIZE: usize = 0xff;
pub const MAX_KEY_SIZE: usize = MAX_TOTAL_KEY_SIZE - NAMESPACING_RESERVED_SIZE;
pub const MAX_VALUE_SIZE: usize = MAX_TOTAL_KEY_SIZE - VALUE_RESERVED_SIZE;
pub const MAX_VALUE_SIZE: usize = MAX_TOTAL_VALUE_SIZE - VALUE_RESERVED_SIZE;

const _: () = assert!(MAX_KEY_SIZE <= u16::MAX as usize);
const _: () = assert!(MAX_VALUE_SIZE <= u16::MAX as usize);
41 changes: 39 additions & 2 deletions src/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ struct Queue {
num_items: u64,
}

impl Queue {
fn span(&self) -> u64 {
self.tail_idx - self.head_idx
}
fn is_empty(&self) -> bool {
self.head_idx == self.tail_idx
}
}

enum QueuePos {
Head,
Tail,
Expand Down Expand Up @@ -141,13 +150,16 @@ impl CandyStore {
Ok(item_idx as usize)
}

/// Pushed a new element at the front (head) of the queue, returning the element's index in the queue
pub fn push_to_queue_head<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B1,
val: &B2,
) -> Result<usize> {
self._push_to_queue(queue_key.as_ref(), val.as_ref(), QueuePos::Head)
}

/// Pushed a new element at the end (tail) of the queue, returning the element's index in the queue
pub fn push_to_queue_tail<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B1,
Expand Down Expand Up @@ -191,7 +203,7 @@ impl CandyStore {
}
}

if queue.head_idx == queue.tail_idx {
if queue.is_empty() {
self.remove_raw(&full_queue_key)?;
} else {
self.set_raw(&full_queue_key, &queue_bytes)?;
Expand All @@ -200,19 +212,28 @@ impl CandyStore {
Ok(val)
}

/// Removes and returns the head element of the queue, or None if the queue is empty
pub fn pop_queue_head<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Head)
}

/// Removes and returns the tail element of the queue, or None if the queue is empty
pub fn pop_queue_tail<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Tail)
}

/// Removes an element by index from the queue, returning the value it had or None if it did not exist (as well
/// as if the queue itself does not exist).
///
/// This will leave a "hole" in the queue, which means we will skip over it in future iterations, but this could
/// lead to inefficienies as if you keep only the head and tail elements of a long queue, while removing elements
/// from the middle.
pub fn remove_from_queue<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
Expand All @@ -236,7 +257,7 @@ impl CandyStore {
queue.tail_idx -= 1;
}
queue.num_items -= 1;
if queue.head_idx == queue.tail_idx {
if queue.is_empty() {
self.remove_raw(&full_queue_key)?;
} else {
self.set_raw(&full_queue_key, &queue_bytes)?;
Expand All @@ -246,6 +267,7 @@ impl CandyStore {
Ok(Some(val))
}

/// Discards the queue (dropping all elements in contains). Returns true if it had existed before, false otherwise
pub fn discard_queue<B: AsRef<[u8]> + ?Sized>(&self, queue_key: &B) -> Result<bool> {
let queue_key = queue_key.as_ref();
let (queue_ph, full_queue_key) = self.make_queue_key(queue_key);
Expand Down Expand Up @@ -275,6 +297,14 @@ impl CandyStore {
}
}

/// Extends the queue with elements from the given iterator. The queue will be created if it did not exist before,
/// and elements are pushed at the tail-end of the queue. This is more efficient than calling
/// [Self::push_to_queue_tail] in a loop
///
/// Note: this is not an atomic (crash-safe) operation: if your program crashes while extending the queue, it
/// is possible that only some of the elements will have been appended.
///
/// Returns the indices of the elements added (a range)
pub fn extend_queue<'a, B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
Expand Down Expand Up @@ -314,6 +344,7 @@ impl CandyStore {
Ok(indices)
}

/// Returns (without removing) the head element of the queue, or None if the queue is empty
pub fn peek_queue_head<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
Expand All @@ -325,6 +356,7 @@ impl CandyStore {
Ok(Some(v))
}

/// Returns (without removing) the tail element of the queue, or None if the queue is empty
pub fn peek_queue_tail<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
Expand All @@ -336,6 +368,8 @@ impl CandyStore {
Ok(Some(v))
}

/// Returns a forward iterator (head to tail) over the elements of the queue. If the queue does not exist,
/// this is an empty iterator.
pub fn iter_queue<'a, B: AsRef<[u8]> + ?Sized>(&'a self, queue_key: &B) -> QueueIterator<'a> {
QueueIterator {
store: &self,
Expand All @@ -346,6 +380,8 @@ impl CandyStore {
}
}

/// Returns a backward iterator (tail to head) over the elements of the queue. If the queue does not exist,
/// this is an empty iterator.
pub fn iter_queue_backwards<'a, B: AsRef<[u8]> + ?Sized>(
&'a self,
queue_key: &B,
Expand All @@ -359,6 +395,7 @@ impl CandyStore {
}
}

/// Returns a the length of the given queue (number of elements in the queue) or 0 if the queue does not exist
pub fn queue_len<B: AsRef<[u8]> + ?Sized>(&self, queue_key: &B) -> Result<usize> {
let Some(queue) = self.fetch_queue(queue_key.as_ref())? else {
return Ok(0);
Expand Down
34 changes: 25 additions & 9 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, bail};
use anyhow::{anyhow, bail, ensure};
use fslock::LockFile;
use parking_lot::Mutex;
use std::{
Expand All @@ -10,7 +10,7 @@ use crate::{
hashing::{HashSeed, PartedHash},
router::ShardRouter,
shard::{CompactionThreadPool, InsertMode, InsertStatus, KVPair},
Stats, MAX_TOTAL_VALUE_SIZE,
Stats, MAX_KEY_SIZE, MAX_TOTAL_VALUE_SIZE,
};
use crate::{
shard::{NUM_ROWS, ROW_WIDTH},
Expand Down Expand Up @@ -266,6 +266,16 @@ impl CandyStore {
Ok(())
}

pub(crate) fn ensure_sizes(key: &[u8], val: &[u8]) -> Result<()> {
ensure!(key.len() <= MAX_KEY_SIZE, CandyError::KeyTooLong(key.len()));
ensure!(
val.len() <= MAX_VALUE_SIZE,
CandyError::ValueTooLong(val.len())
);

Ok(())
}

pub(crate) fn make_user_key(&self, mut key: Vec<u8>) -> Vec<u8> {
key.extend_from_slice(USER_NAMESPACE);
key
Expand Down Expand Up @@ -329,12 +339,15 @@ impl CandyStore {
) -> Result<InsertStatus> {
let ph = PartedHash::new(&self.config.hash_seed, full_key);

if full_key.len() > MAX_TOTAL_KEY_SIZE as usize {
return Err(anyhow!(CandyError::KeyTooLong(full_key.len())));
}
if val.len() > MAX_TOTAL_VALUE_SIZE as usize {
return Err(anyhow!(CandyError::ValueTooLong(val.len())));
}
ensure!(
full_key.len() <= MAX_TOTAL_KEY_SIZE,
CandyError::KeyTooLong(full_key.len())
);
ensure!(
val.len() <= MAX_TOTAL_VALUE_SIZE,
CandyError::ValueTooLong(val.len())
);

if full_key.len() + val.len() > self.config.max_shard_size as usize {
return Err(anyhow!(CandyError::EntryCannotFitInShard(
full_key.len() + val.len(),
Expand Down Expand Up @@ -373,6 +386,7 @@ impl CandyStore {

/// Same as [Self::set], but the key passed owned to this function
pub fn owned_set(&self, key: Vec<u8>, val: &[u8]) -> Result<SetStatus> {
Self::ensure_sizes(&key, &val)?;
self.set_raw(&self.make_user_key(key), val)
}

Expand Down Expand Up @@ -416,7 +430,8 @@ impl CandyStore {
val: &[u8],
expected_val: Option<&[u8]>,
) -> Result<ReplaceStatus> {
self.replace_raw(&self.make_user_key(key), val.as_ref(), expected_val)
Self::ensure_sizes(&key, &val)?;
self.replace_raw(&self.make_user_key(key), val, expected_val)
}

pub(crate) fn get_or_create_raw(
Expand Down Expand Up @@ -453,6 +468,7 @@ impl CandyStore {
key: Vec<u8>,
default_val: Vec<u8>,
) -> Result<GetOrCreateStatus> {
Self::ensure_sizes(&key, &default_val)?;
self.get_or_create_raw(&self.make_user_key(key), default_val)
}

Expand Down
4 changes: 2 additions & 2 deletions tests/test_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod common;

use std::collections::HashSet;

use candystore::{CandyStore, Config, Result};
use candystore::{CandyStore, Config, Result, MAX_VALUE_SIZE};

use crate::common::{run_in_tempdir, LONG_VAL};

Expand Down Expand Up @@ -127,7 +127,7 @@ fn test_histogram() -> Result<()> {
db.set("k4", &vec![b'b'; 5000])?;
db.set("k4", &vec![b'b'; 4500])?;
db.set("k5", &vec![b'b'; 50000])?;
db.set("kkkkkkkkkkkkkkk", &vec![b'b'; 0xffff])?;
db.set("kkkkkkkkkkkkkkk", &vec![b'b'; MAX_VALUE_SIZE])?;

let stats = db.stats();
assert_eq!(stats.entries_under_128, 2);
Expand Down

0 comments on commit 7b85ba7

Please sign in to comment.