Skip to content

Commit

Permalink
Implement a basic migrator that works across backends
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Porof <[email protected]>
  • Loading branch information
victorporof committed Jul 23, 2020
1 parent 0fd756d commit ceda0d5
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct EnvironmentBuilderImpl {
builder: lmdb::EnvironmentBuilder,
envtype: EnvironmentType,
make_dir: bool,
check_env_exists: bool,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -52,6 +53,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
builder: lmdb::Environment::new(),
envtype: EnvironmentType::SingleDatabase,
make_dir: false,
check_env_exists: false,
}
}

Expand Down Expand Up @@ -86,7 +88,15 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_check_if_env_exists(&mut self, check_env_exists: bool) -> &mut Self {
self.check_env_exists = check_env_exists;
self
}

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
if self.check_env_exists && !path.join("data.mdb").exists() {
return Err(ErrorImpl::EnvironmentDoesNotExistError(path.into()));
}
if !path.is_dir() {
if !self.make_dir {
return Err(ErrorImpl::DirectoryDoesNotExistError(path.into()));
Expand Down
3 changes: 3 additions & 0 deletions src/backend/impl_lmdb/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
pub enum ErrorImpl {
LmdbError(lmdb::Error),
DirectoryDoesNotExistError(PathBuf),
EnvironmentDoesNotExistError(PathBuf),
IoError(io::Error),
}

Expand All @@ -33,6 +34,7 @@ impl fmt::Display for ErrorImpl {
match self {
ErrorImpl::LmdbError(e) => e.fmt(fmt),
ErrorImpl::DirectoryDoesNotExistError(_) => write!(fmt, "DirectoryDoesNotExistError"),
ErrorImpl::EnvironmentDoesNotExistError(_) => write!(fmt, "EnvironmentDoesNotExistError"),
ErrorImpl::IoError(e) => e.fmt(fmt),
}
}
Expand All @@ -50,6 +52,7 @@ impl Into<StoreError> for ErrorImpl {
ErrorImpl::LmdbError(lmdb::Error::ReadersFull) => StoreError::ReadersFull,
ErrorImpl::LmdbError(error) => StoreError::LmdbError(error),
ErrorImpl::DirectoryDoesNotExistError(path) => StoreError::DirectoryDoesNotExistError(path),
ErrorImpl::EnvironmentDoesNotExistError(path) => StoreError::EnvironmentDoesNotExistError(path),
ErrorImpl::IoError(error) => StoreError::IoError(error),
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct EnvironmentBuilderImpl {
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir: bool,
check_env_exists: bool,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -69,6 +70,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_dbs: None,
map_size: None,
make_dir: false,
check_env_exists: false,
}
}

Expand Down Expand Up @@ -100,7 +102,15 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_check_if_env_exists(&mut self, check_env_exists: bool) -> &mut Self {
self.check_env_exists = check_env_exists;
self
}

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
if self.check_env_exists && !path.join(DEFAULT_DB_FILENAME).exists() {
return Err(ErrorImpl::EnvironmentDoesNotExistError(path.into()));
}
if !path.is_dir() {
if !self.make_dir {
return Err(ErrorImpl::DirectoryDoesNotExistError(path.into()));
Expand Down
3 changes: 3 additions & 0 deletions src/backend/impl_safe/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum ErrorImpl {
DbNotFoundError,
DbIsForeignError,
DirectoryDoesNotExistError(PathBuf),
EnvironmentDoesNotExistError(PathBuf),
IoError(io::Error),
BincodeError(BincodeError),
}
Expand All @@ -46,6 +47,7 @@ impl fmt::Display for ErrorImpl {
ErrorImpl::DbNotFoundError => write!(fmt, "DbNotFoundError (safe mode)"),
ErrorImpl::DbIsForeignError => write!(fmt, "DbIsForeignError (safe mode)"),
ErrorImpl::DirectoryDoesNotExistError(_) => write!(fmt, "DirectoryDoesNotExistError (safe mode)"),
ErrorImpl::EnvironmentDoesNotExistError(_) => write!(fmt, "EnvironmentDoesNotExistError (safe mode)"),
ErrorImpl::IoError(e) => e.fmt(fmt),
ErrorImpl::BincodeError(e) => e.fmt(fmt),
}
Expand All @@ -63,6 +65,7 @@ impl Into<StoreError> for ErrorImpl {
ErrorImpl::BincodeError(_) => StoreError::FileInvalid,
ErrorImpl::DbsFull => StoreError::DbsFull,
ErrorImpl::DirectoryDoesNotExistError(path) => StoreError::DirectoryDoesNotExistError(path),
ErrorImpl::EnvironmentDoesNotExistError(path) => StoreError::EnvironmentDoesNotExistError(path),
ErrorImpl::IoError(error) => StoreError::IoError(error),
_ => StoreError::SafeModeError(self),
}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {

fn set_make_dir_if_needed(&mut self, make_dir: bool) -> &mut Self;

fn set_check_if_env_exists(&mut self, check_env: bool) -> &mut Self;

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}

Expand Down
21 changes: 21 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ pub enum StoreError {
#[fail(display = "directory does not exist or not a directory: {:?}", _0)]
DirectoryDoesNotExistError(PathBuf),

#[fail(display = "environment does not exist in directory: {:?}", _0)]
EnvironmentDoesNotExistError(PathBuf),

#[fail(display = "data error: {:?}", _0)]
DataError(DataError),

Expand Down Expand Up @@ -120,3 +123,21 @@ impl From<io::Error> for StoreError {
StoreError::IoError(e)
}
}

#[derive(Debug, Fail)]
pub enum MigrateError {
#[fail(display = "store error: {}", _0)]
StoreError(StoreError),

#[fail(display = "source is empty")]
SourceEmpty,

#[fail(display = "destination is not empty")]
DestinationNotEmpty,
}

impl From<StoreError> for MigrateError {
fn from(e: StoreError) -> MigrateError {
MigrateError::StoreError(e)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub use backend::{
pub use env::Rkv;
pub use error::{
DataError,
MigrateError,
StoreError,
};
pub use manager::Manager;
Expand Down
76 changes: 76 additions & 0 deletions src/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,84 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

//! A simple utility for migrating data from one RVK environment to another. Notably, this
//! tool can migrate data from an enviroment created with a different backend than the
//! current RKV consumer (e.g from Lmdb to SafeMode).
//!
//! The utility doesn't support migrating between 32-bit and 64-bit LMDB environments yet,
//! see `arch_migrator` if this is needed. However, this utility is ultimately intended to
//! handle all possible migrations.
//!
//! The destination environment should be empty of data, otherwise an error is returned.
//!
//! The tool currently has these limitations:
//!
//! 1. It doesn't support migration from environments created with
//! `EnvironmentFlags::NO_SUB_DIR`. To migrate such an environment, create a temporary
//! directory, copy the environment's data files in the temporary directory, then
//! migrate the temporary directory as the source environment.
//! 2. It doesn't support migration from databases created with DatabaseFlags::DUP_SORT`
//! (with or without `DatabaseFlags::DUP_FIXED`) nor with `DatabaseFlags::INTEGER_KEY`.
//! This effectively means that migration is limited to `SingleStore`s.
//! 3. It doesn't allow for existing data in the destination environment, which means that
//! it cannot overwrite nor append data.
use crate::{
backend::{
LmdbEnvironment,
SafeModeEnvironment,
},
error::MigrateError,
Rkv,
StoreOptions,
};

pub use crate::backend::{
LmdbArchMigrateError,
LmdbArchMigrateResult,
LmdbArchMigrator,
};

// FIXME: should parametrize this instead.
macro_rules! fn_migrator {
($name:tt, $src:ty, $dst:ty) => {
/// Migrate all data in all of databases from the source environment to the destination
/// environment. This includes all key/value pairs in the main database that aren't
/// metadata about subdatabases and all key/value pairs in all subdatabases.
///
/// Other backend-specific metadata such as map size or maximum databases left intact on
/// the given environments.
///
/// The destination environment should be empty of data, otherwise an error is returned.
pub fn $name(src_env: &Rkv<$src>, dst_env: &Rkv<$dst>) -> Result<(), MigrateError> {
let src_dbs = src_env.get_dbs().unwrap();
if src_dbs.is_empty() {
return Err(MigrateError::SourceEmpty);
}
let dst_dbs = dst_env.get_dbs().unwrap();
if !dst_dbs.is_empty() {
return Err(MigrateError::DestinationNotEmpty);
}
for name in src_dbs {
let src_store = src_env.open_single(name.as_deref(), StoreOptions::default())?;
let dst_store = dst_env.open_single(name.as_deref(), StoreOptions::create())?;
let reader = src_env.read()?;
let mut writer = dst_env.write()?;
let mut iter = src_store.iter_start(&reader)?;
while let Some(Ok((key, value))) = iter.next() {
dst_store.put(&mut writer, key, &value).expect("wrote");
}
writer.commit()?;
}
Ok(())
}
};
}

pub struct Migrator;

impl Migrator {
fn_migrator!(migrate_lmdb_to_safe_mode, LmdbEnvironment, SafeModeEnvironment);

fn_migrator!(migrate_safe_mode_to_lmdb, SafeModeEnvironment, LmdbEnvironment);
}
138 changes: 138 additions & 0 deletions tests/env-migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use std::fs;

use tempfile::Builder;

use rkv::{
backend::{
BackendEnvironmentBuilder,
Lmdb,
SafeMode,
},
migrator::Migrator,
Rkv,
StoreOptions,
Value,
};

macro_rules! populate_store {
($env:expr) => {
let store = $env.open_single("store", StoreOptions::create()).expect("opened");
let mut writer = $env.write().expect("writer");
store.put(&mut writer, "foo", &Value::I64(1234)).expect("wrote");
store.put(&mut writer, "bar", &Value::Bool(true)).expect("wrote");
store.put(&mut writer, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.commit().expect("committed");
};
}

#[test]
#[should_panic(expected = "new succeeded: EnvironmentDoesNotExistError")]
fn test_migrator_lmdb_to_safe_0() {
let mut builder = Lmdb::new();
builder.set_check_if_env_exists(true);

let root = Builder::new().prefix("test_migrate_lmdb_to_safe").tempdir().expect("tempdir");
let _ = Rkv::from_builder::<Lmdb>(root.path(), builder).expect("new succeeded");
}

#[test]
#[should_panic(expected = "migrated: SourceEmpty")]
fn test_migrator_lmdb_to_safe_1() {
let root = Builder::new().prefix("test_migrate_lmdb_to_safe").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
Migrator::migrate_lmdb_to_safe_mode(&src_env, &dst_env).expect("migrated");
}

#[test]
#[should_panic(expected = "migrated: DestinationNotEmpty")]
fn test_migrator_lmdb_to_safe_2() {
let root = Builder::new().prefix("test_migrate_lmdb_to_safe").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
populate_store!(&src_env);
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
populate_store!(&dst_env);
Migrator::migrate_lmdb_to_safe_mode(&src_env, &dst_env).expect("migrated");
}

#[test]
fn test_migrator_lmdb_to_safe_3() {
let root = Builder::new().prefix("test_migrate_lmdb_to_safe").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
populate_store!(&src_env);
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
Migrator::migrate_lmdb_to_safe_mode(&src_env, &dst_env).expect("migrated");

let store = dst_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = dst_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}

#[test]
#[should_panic(expected = "new succeeded: EnvironmentDoesNotExistError")]
fn test_migrator_safe_to_lmdb_0() {
let mut builder = SafeMode::new();
builder.set_check_if_env_exists(true);

let root = Builder::new().prefix("test_migrate_safe_to_lmdb").tempdir().expect("tempdir");
let _ = Rkv::from_builder::<SafeMode>(root.path(), builder).expect("new succeeded");
}

#[test]
#[should_panic(expected = "migrated: SourceEmpty")]
fn test_migrator_safe_to_lmdb_1() {
let root = Builder::new().prefix("test_migrate_safe_to_lmdb").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
Migrator::migrate_safe_mode_to_lmdb(&src_env, &dst_env).expect("migrated");
}

#[test]
#[should_panic(expected = "migrated: DestinationNotEmpty")]
fn test_migrator_safe_to_lmdb_2() {
let root = Builder::new().prefix("test_migrate_safe_to_lmdb").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
populate_store!(&src_env);
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
populate_store!(&dst_env);
Migrator::migrate_safe_mode_to_lmdb(&src_env, &dst_env).expect("migrated");
}

#[test]
fn test_migrator_safe_to_lmdb_3() {
let root = Builder::new().prefix("test_migrate_safe_to_lmdb").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let src_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
populate_store!(&src_env);
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
Migrator::migrate_safe_mode_to_lmdb(&src_env, &dst_env).expect("migrated");

let store = dst_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = dst_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}

0 comments on commit ceda0d5

Please sign in to comment.