Skip to content

Commit

Permalink
Make migration easier for consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Porof <[email protected]>
  • Loading branch information
victorporof committed Jul 23, 2020
1 parent d1fef9a commit bada6fe
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ lazy_static = "1.0"
lmdb-rkv = "0.14"
log = "0.4"
ordered-float = "1.0"
paste = "0.1"
serde = {version = "1.0", features = ["derive", "rc"]}
serde_derive = "1.0"
url = "2.0"
Expand Down
62 changes: 47 additions & 15 deletions src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

use std::{
fs,
path::Path,
path::{
Path,
PathBuf,
},
};

use lmdb::Error as LmdbError;
Expand Down Expand Up @@ -103,18 +106,39 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
}
fs::create_dir_all(path).map_err(ErrorImpl::IoError)?;
}
self.builder.open(path).map(|env| EnvironmentImpl(env, self.envtype)).map_err(ErrorImpl::LmdbError)
self.builder
.open(path)
.map_err(ErrorImpl::LmdbError)
.and_then(|lmdbenv| EnvironmentImpl::new(path, lmdbenv, self.envtype))
}
}

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum EnvironmentType {
pub enum EnvironmentType {
SingleDatabase,
MultipleNamedDatabases,
}

#[derive(Debug)]
pub struct EnvironmentImpl(lmdb::Environment, EnvironmentType);
pub struct EnvironmentImpl {
path: PathBuf,
lmdbenv: lmdb::Environment,
envtype: EnvironmentType,
}

impl EnvironmentImpl {
pub(crate) fn new(
path: &Path,
lmdbenv: lmdb::Environment,
envtype: EnvironmentType,
) -> Result<EnvironmentImpl, ErrorImpl> {
Ok(EnvironmentImpl {
path: path.to_path_buf(),
lmdbenv,
envtype,
})
}
}

impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type Database = DatabaseImpl;
Expand All @@ -126,10 +150,10 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type Stat = StatImpl;

fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
if self.1 == EnvironmentType::SingleDatabase {
if self.envtype == EnvironmentType::SingleDatabase {
return Ok(vec![None]);
}
let db = self.0.open_db(None).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)?;
let db = self.lmdbenv.open_db(None).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)?;
let reader = self.begin_ro_txn()?;
let cursor = reader.open_ro_cursor(&db)?;
let mut iter = cursor.into_iter();
Expand All @@ -143,35 +167,35 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
}

fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
self.0.open_db(name).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.open_db(name).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
}

fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
self.0.create_db(name, flags.0).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.create_db(name, flags.0).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
}

fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> {
self.0.begin_ro_txn().map(RoTransactionImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.begin_ro_txn().map(RoTransactionImpl).map_err(ErrorImpl::LmdbError)
}

fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> {
self.0.begin_rw_txn().map(RwTransactionImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.begin_rw_txn().map(RwTransactionImpl).map_err(ErrorImpl::LmdbError)
}

fn sync(&self, force: bool) -> Result<(), Self::Error> {
self.0.sync(force).map_err(ErrorImpl::LmdbError)
self.lmdbenv.sync(force).map_err(ErrorImpl::LmdbError)
}

fn stat(&self) -> Result<Self::Stat, Self::Error> {
self.0.stat().map(StatImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.stat().map(StatImpl).map_err(ErrorImpl::LmdbError)
}

fn info(&self) -> Result<Self::Info, Self::Error> {
self.0.info().map(InfoImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.info().map(InfoImpl).map_err(ErrorImpl::LmdbError)
}

fn freelist(&self) -> Result<usize, Self::Error> {
self.0.freelist().map_err(ErrorImpl::LmdbError)
self.lmdbenv.freelist().map_err(ErrorImpl::LmdbError)
}

fn load_ratio(&self) -> Result<Option<f32>, Self::Error> {
Expand All @@ -189,6 +213,14 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
}

fn set_map_size(&self, size: usize) -> Result<(), Self::Error> {
self.0.set_map_size(size).map_err(ErrorImpl::LmdbError)
self.lmdbenv.set_map_size(size).map_err(ErrorImpl::LmdbError)
}

fn get_files_on_disk(&self) -> Vec<PathBuf> {
let mut db_filename = self.path.clone();
let mut lock_filename = self.path.clone();
db_filename.push("data.mdb");
lock_filename.push("lock.mdb");
return vec![db_filename, lock_filename];
}
}
6 changes: 6 additions & 0 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,10 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
warn!("`set_map_size({})` is ignored by this storage backend.", size);
Ok(())
}

fn get_files_on_disk(&self) -> Vec<PathBuf> {
let mut db_filename = self.path.clone();
db_filename.push(DEFAULT_DB_FILENAME);
return vec![db_filename];
}
}
7 changes: 6 additions & 1 deletion src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use std::{
Debug,
Display,
},
path::Path,
path::{
Path,
PathBuf,
},
};

use crate::{
Expand Down Expand Up @@ -125,6 +128,8 @@ pub trait BackendEnvironment<'e>: Debug {
fn load_ratio(&self) -> Result<Option<f32>, Self::Error>;

fn set_map_size(&self, size: usize) -> Result<(), Self::Error>;

fn get_files_on_disk(&self) -> Vec<PathBuf>;
}

pub trait BackendRoTransaction: Debug {
Expand Down
15 changes: 15 additions & 0 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// specific language governing permissions and limitations under the License.

use std::{
fs,
os::raw::c_uint,
path::{
Path,
Expand Down Expand Up @@ -308,4 +309,18 @@ where
pub fn set_map_size(&self, size: usize) -> Result<(), StoreError> {
self.env.set_map_size(size).map_err(Into::into)
}

/// Closes this environment and deletes all its files from disk. Doesn't delete the
/// folder used when opening the environment.
pub fn close_and_delete(self) -> Result<(), StoreError> {
let files = self.env.get_files_on_disk();
self.sync(true)?;
drop(self);

for file in files {
fs::remove_file(file)?;
}

Ok(())
}
}
19 changes: 19 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
io,
path::PathBuf,
str,
sync,
thread,
thread::ThreadId,
};
Expand Down Expand Up @@ -56,6 +57,9 @@ impl From<Box<bincode::ErrorKind>> for DataError {

#[derive(Debug, Fail)]
pub enum StoreError {
#[fail(display = "manager poisoned")]
ManagerPoisonError,

#[fail(display = "database corrupted")]
DatabaseCorrupted,

Expand Down Expand Up @@ -124,11 +128,20 @@ impl From<io::Error> for StoreError {
}
}

impl<T> From<sync::PoisonError<T>> for StoreError {
fn from(_: sync::PoisonError<T>) -> StoreError {
StoreError::ManagerPoisonError
}
}

#[derive(Debug, Fail)]
pub enum MigrateError {
#[fail(display = "store error: {}", _0)]
StoreError(StoreError),

#[fail(display = "manager poisoned")]
ManagerPoisonError,

#[fail(display = "source is empty")]
SourceEmpty,

Expand All @@ -141,3 +154,9 @@ impl From<StoreError> for MigrateError {
MigrateError::StoreError(e)
}
}

impl<T> From<sync::PoisonError<T>> for MigrateError {
fn from(_: sync::PoisonError<T>) -> MigrateError {
MigrateError::ManagerPoisonError
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub use error::{
StoreError,
};
pub use manager::Manager;
pub use migrator::Migrator;
pub use readwrite::{
Readable,
Reader,
Expand Down
45 changes: 44 additions & 1 deletion src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use lazy_static::lazy_static;

use crate::{
backend::{
BackendEnvironment,
BackendEnvironmentBuilder,
LmdbEnvironment,
SafeModeEnvironment,
},
Expand All @@ -51,7 +53,10 @@ pub struct Manager<E> {
environments: BTreeMap<PathBuf, SharedRkv<E>>,
}

impl<E> Manager<E> {
impl<'e, E> Manager<E>
where
E: BackendEnvironment<'e>,
{
fn new() -> Manager<E> {
Manager {
environments: Default::default(),
Expand Down Expand Up @@ -98,6 +103,44 @@ impl<E> Manager<E> {
},
})
}

/// Return a new Rkv environment from the builder, or create it by calling `f`.
pub fn get_or_create_from_builder<'p, F, P, B>(&mut self, path: P, builder: B, f: F) -> Result<SharedRkv<E>>
where
F: FnOnce(&Path, B) -> Result<Rkv<E>>,
P: Into<&'p Path>,
B: BackendEnvironmentBuilder<'e, Environment = E>,
{
let canonical = canonicalize_path(path)?;
Ok(match self.environments.entry(canonical) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(e) => {
let k = Arc::new(RwLock::new(f(e.key().as_path(), builder)?));
e.insert(k).clone()
},
})
}

/// Tries to close the specified environment and delete all its files from disk.
/// Doesn't delete the folder used when opening the environment.
/// This will only work if there's no other users of this environment.
pub fn try_close_and_delete<'p, P>(&mut self, path: P) -> Result<()>
where
P: Into<&'p Path>,
{
let canonical = canonicalize_path(path)?;
match self.environments.entry(canonical) {
Entry::Vacant(_) => {}, // noop
Entry::Occupied(e) => {
if Arc::strong_count(e.get()) == 1 {
if let Ok(env) = Arc::try_unwrap(e.remove()) {
env.into_inner()?.close_and_delete()?;
}
}
},
}
Ok(())
}
}

impl Manager<LmdbEnvironment> {
Expand Down
Loading

0 comments on commit bada6fe

Please sign in to comment.