Skip to content

Commit

Permalink
Merge pull request #200 from mozilla/migrator
Browse files Browse the repository at this point in the history
Implement a simple migrator between multiple backends
  • Loading branch information
Victor Porof authored Jul 23, 2020
2 parents 42c5c2c + ceda0d5 commit a6a616a
Show file tree
Hide file tree
Showing 25 changed files with 632 additions and 196 deletions.
2 changes: 1 addition & 1 deletion examples/simple-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn getput<'w, 's>(store: MultiStore, writer: &'w mut Writer, ids: &'s mut Vec<St
// this is a multi-valued database, so get returns an iterator
let mut iter = store.get(writer, k).unwrap();
while let Some(Ok((_key, val))) = iter.next() {
if let Value::Str(s) = val.unwrap() {
if let Value::Str(s) = val {
ids.push(s.to_owned());
} else {
panic!("didn't get a string back!");
Expand Down
3 changes: 3 additions & 0 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub use common::*;
pub use traits::*;

pub use impl_lmdb::{
ArchMigrateError as LmdbArchMigrateError,
ArchMigrateResult as LmdbArchMigrateResult,
ArchMigrator as LmdbArchMigrator,
DatabaseFlagsImpl as LmdbDatabaseFlags,
DatabaseImpl as LmdbDatabase,
EnvironmentBuilderImpl as Lmdb,
Expand Down
7 changes: 7 additions & 0 deletions src/backend/impl_lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

mod arch_migrator;
mod arch_migrator_error;
mod cursor;
mod database;
mod environment;
Expand All @@ -18,6 +20,11 @@ mod iter;
mod stat;
mod transaction;

pub use arch_migrator::{
MigrateError as ArchMigrateError,
MigrateResult as ArchMigrateResult,
Migrator as ArchMigrator,
};
pub use cursor::{
RoCursorImpl,
RwCursorImpl,
Expand Down
4 changes: 2 additions & 2 deletions src/migrate.rs → src/backend/impl_lmdb/arch_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
//! new environment in a temporary directory:
//!
//! ```
//! use rkv::migrate::Migrator;
//! use rkv::migrator::LmdbArchMigrator as Migrator;
//! use std::path::Path;
//! use tempfile::tempdir;
//! let mut migrator = Migrator::new(Path::new("tests/envs/ref_env_32")).unwrap();
Expand Down Expand Up @@ -89,7 +89,7 @@ use lmdb::{
WriteFlags,
};

pub use crate::error::MigrateError;
pub use super::arch_migrator_error::MigrateError;

const PAGESIZE: u16 = 4096;

Expand Down
107 changes: 107 additions & 0 deletions src/backend/impl_lmdb/arch_migrator_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use std::{
io,
num,
str,
};

use failure::Fail;

#[derive(Debug, Fail)]
pub enum MigrateError {
#[fail(display = "database not found: {:?}", _0)]
DatabaseNotFound(String),

#[fail(display = "{}", _0)]
FromString(String),

#[fail(display = "couldn't determine bit depth")]
IndeterminateBitDepth,

#[fail(display = "I/O error: {:?}", _0)]
IoError(io::Error),

#[fail(display = "invalid DatabaseFlags bits")]
InvalidDatabaseBits,

#[fail(display = "invalid data version")]
InvalidDataVersion,

#[fail(display = "invalid magic number")]
InvalidMagicNum,

#[fail(display = "invalid NodeFlags bits")]
InvalidNodeBits,

#[fail(display = "invalid PageFlags bits")]
InvalidPageBits,

#[fail(display = "invalid page number")]
InvalidPageNum,

#[fail(display = "lmdb backend error: {}", _0)]
LmdbError(lmdb::Error),

#[fail(display = "string conversion error")]
StringConversionError,

#[fail(display = "TryFromInt error: {:?}", _0)]
TryFromIntError(num::TryFromIntError),

#[fail(display = "unexpected Page variant")]
UnexpectedPageVariant,

#[fail(display = "unexpected PageHeader variant")]
UnexpectedPageHeaderVariant,

#[fail(display = "unsupported PageHeader variant")]
UnsupportedPageHeaderVariant,

#[fail(display = "UTF8 error: {:?}", _0)]
Utf8Error(str::Utf8Error),
}

impl From<io::Error> for MigrateError {
fn from(e: io::Error) -> MigrateError {
MigrateError::IoError(e)
}
}

impl From<str::Utf8Error> for MigrateError {
fn from(e: str::Utf8Error) -> MigrateError {
MigrateError::Utf8Error(e)
}
}

impl From<num::TryFromIntError> for MigrateError {
fn from(e: num::TryFromIntError) -> MigrateError {
MigrateError::TryFromIntError(e)
}
}

impl From<&str> for MigrateError {
fn from(e: &str) -> MigrateError {
MigrateError::FromString(e.to_string())
}
}

impl From<String> for MigrateError {
fn from(e: String) -> MigrateError {
MigrateError::FromString(e)
}
}

impl From<lmdb::Error> for MigrateError {
fn from(e: lmdb::Error) -> MigrateError {
MigrateError::LmdbError(e)
}
}
45 changes: 43 additions & 2 deletions src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ use crate::backend::traits::{
BackendEnvironment,
BackendEnvironmentBuilder,
BackendInfo,
BackendIter,
BackendRoCursor,
BackendRoCursorTransaction,
BackendStat,
};

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct EnvironmentBuilderImpl {
builder: lmdb::EnvironmentBuilder,
envtype: EnvironmentType,
make_dir: bool,
check_env_exists: bool,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -46,7 +51,9 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
fn new() -> EnvironmentBuilderImpl {
EnvironmentBuilderImpl {
builder: lmdb::Environment::new(),
envtype: EnvironmentType::SingleDatabase,
make_dir: false,
check_env_exists: false,
}
}

Expand All @@ -65,6 +72,9 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {

fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self {
self.builder.set_max_dbs(max_dbs);
if max_dbs > 0 {
self.envtype = EnvironmentType::MultipleNamedDatabases
}
self
}

Expand All @@ -78,19 +88,33 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_check_if_env_exists(&mut self, check_env_exists: bool) -> &mut Self {
self.check_env_exists = check_env_exists;
self
}

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
if self.check_env_exists && !path.join("data.mdb").exists() {
return Err(ErrorImpl::EnvironmentDoesNotExistError(path.into()));
}
if !path.is_dir() {
if !self.make_dir {
return Err(ErrorImpl::DirectoryDoesNotExistError(path.into()));
}
fs::create_dir_all(path).map_err(ErrorImpl::IoError)?;
}
self.builder.open(path).map(EnvironmentImpl).map_err(ErrorImpl::LmdbError)
self.builder.open(path).map(|env| EnvironmentImpl(env, self.envtype)).map_err(ErrorImpl::LmdbError)
}
}

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum EnvironmentType {
SingleDatabase,
MultipleNamedDatabases,
}

#[derive(Debug)]
pub struct EnvironmentImpl(lmdb::Environment);
pub struct EnvironmentImpl(lmdb::Environment, EnvironmentType);

impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type Database = DatabaseImpl;
Expand All @@ -101,6 +125,23 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type RwTransaction = RwTransactionImpl<'e>;
type Stat = StatImpl;

fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
if self.1 == EnvironmentType::SingleDatabase {
return Ok(vec![None]);
}
let db = self.0.open_db(None).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)?;
let reader = self.begin_ro_txn()?;
let cursor = reader.open_ro_cursor(&db)?;
let mut iter = cursor.into_iter();
let mut store = vec![];
while let Some(result) = iter.next() {
let (key, _) = result?;
let name = String::from_utf8(key.to_owned()).map_err(|_| ErrorImpl::LmdbError(lmdb::Error::Corrupted))?;
store.push(Some(name));
}
Ok(store)
}

fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
self.0.open_db(name).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
}
Expand Down
3 changes: 3 additions & 0 deletions src/backend/impl_lmdb/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
pub enum ErrorImpl {
LmdbError(lmdb::Error),
DirectoryDoesNotExistError(PathBuf),
EnvironmentDoesNotExistError(PathBuf),
IoError(io::Error),
}

Expand All @@ -33,6 +34,7 @@ impl fmt::Display for ErrorImpl {
match self {
ErrorImpl::LmdbError(e) => e.fmt(fmt),
ErrorImpl::DirectoryDoesNotExistError(_) => write!(fmt, "DirectoryDoesNotExistError"),
ErrorImpl::EnvironmentDoesNotExistError(_) => write!(fmt, "EnvironmentDoesNotExistError"),
ErrorImpl::IoError(e) => e.fmt(fmt),
}
}
Expand All @@ -50,6 +52,7 @@ impl Into<StoreError> for ErrorImpl {
ErrorImpl::LmdbError(lmdb::Error::ReadersFull) => StoreError::ReadersFull,
ErrorImpl::LmdbError(error) => StoreError::LmdbError(error),
ErrorImpl::DirectoryDoesNotExistError(path) => StoreError::DirectoryDoesNotExistError(path),
ErrorImpl::EnvironmentDoesNotExistError(path) => StoreError::EnvironmentDoesNotExistError(path),
ErrorImpl::IoError(error) => StoreError::IoError(error),
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct EnvironmentBuilderImpl {
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir: bool,
check_env_exists: bool,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -69,6 +70,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_dbs: None,
map_size: None,
make_dir: false,
check_env_exists: false,
}
}

Expand Down Expand Up @@ -100,7 +102,15 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_check_if_env_exists(&mut self, check_env_exists: bool) -> &mut Self {
self.check_env_exists = check_env_exists;
self
}

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
if self.check_env_exists && !path.join(DEFAULT_DB_FILENAME).exists() {
return Err(ErrorImpl::EnvironmentDoesNotExistError(path.into()));
}
if !path.is_dir() {
if !self.make_dir {
return Err(ErrorImpl::DirectoryDoesNotExistError(path.into()));
Expand Down Expand Up @@ -211,6 +221,11 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type RwTransaction = RwTransactionImpl<'e>;
type Stat = StatImpl;

fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
Ok(dbs.keys().map(|key| key.to_owned()).collect())
}

fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
if Arc::strong_count(&self.ro_txns) > 1 {
return Err(ErrorImpl::DbsIllegalOpen);
Expand Down
3 changes: 3 additions & 0 deletions src/backend/impl_safe/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum ErrorImpl {
DbNotFoundError,
DbIsForeignError,
DirectoryDoesNotExistError(PathBuf),
EnvironmentDoesNotExistError(PathBuf),
IoError(io::Error),
BincodeError(BincodeError),
}
Expand All @@ -46,6 +47,7 @@ impl fmt::Display for ErrorImpl {
ErrorImpl::DbNotFoundError => write!(fmt, "DbNotFoundError (safe mode)"),
ErrorImpl::DbIsForeignError => write!(fmt, "DbIsForeignError (safe mode)"),
ErrorImpl::DirectoryDoesNotExistError(_) => write!(fmt, "DirectoryDoesNotExistError (safe mode)"),
ErrorImpl::EnvironmentDoesNotExistError(_) => write!(fmt, "EnvironmentDoesNotExistError (safe mode)"),
ErrorImpl::IoError(e) => e.fmt(fmt),
ErrorImpl::BincodeError(e) => e.fmt(fmt),
}
Expand All @@ -63,6 +65,7 @@ impl Into<StoreError> for ErrorImpl {
ErrorImpl::BincodeError(_) => StoreError::FileInvalid,
ErrorImpl::DbsFull => StoreError::DbsFull,
ErrorImpl::DirectoryDoesNotExistError(path) => StoreError::DirectoryDoesNotExistError(path),
ErrorImpl::EnvironmentDoesNotExistError(path) => StoreError::EnvironmentDoesNotExistError(path),
ErrorImpl::IoError(error) => StoreError::IoError(error),
_ => StoreError::SafeModeError(self),
}
Expand Down
4 changes: 4 additions & 0 deletions src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {

fn set_make_dir_if_needed(&mut self, make_dir: bool) -> &mut Self;

fn set_check_if_env_exists(&mut self, check_env: bool) -> &mut Self;

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}

Expand All @@ -102,6 +104,8 @@ pub trait BackendEnvironment<'e>: Debug {
type RoTransaction: BackendRoCursorTransaction<'e, Database = Self::Database>;
type RwTransaction: BackendRwCursorTransaction<'e, Database = Self::Database>;

fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error>;

fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error>;

fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error>;
Expand Down
Loading

0 comments on commit a6a616a

Please sign in to comment.