Skip to content

Commit

Permalink
Prevent potential deadlocks in safe mode environments when opnening dbs
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Porof <[email protected]>
  • Loading branch information
victorporof committed Oct 9, 2020
1 parent 2d93f06 commit b51b5e0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 25 deletions.
70 changes: 48 additions & 22 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
borrow::Cow,
collections::HashMap,
fs,
ops::DerefMut,
path::{
Path,
PathBuf,
Expand Down Expand Up @@ -115,32 +116,51 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
}
}

#[derive(Debug)]
pub(crate) struct EnvironmentDbs {
pub(crate) arena: DatabaseArena,
pub(crate) name_map: DatabaseNameMap,
}

#[derive(Debug)]
pub(crate) struct EnvironmentDbsRefMut<'a> {
pub(crate) arena: &'a mut DatabaseArena,
pub(crate) name_map: &'a mut DatabaseNameMap,
}

impl<'a> From<&'a mut EnvironmentDbs> for EnvironmentDbsRefMut<'a> {
fn from(dbs: &mut EnvironmentDbs) -> EnvironmentDbsRefMut {
EnvironmentDbsRefMut {
arena: &mut dbs.arena,
name_map: &mut dbs.name_map,
}
}
}

#[derive(Debug)]
pub struct EnvironmentImpl {
path: PathBuf,
max_dbs: usize,
arena: RwLock<DatabaseArena>,
dbs: RwLock<DatabaseNameMap>,
dbs: RwLock<EnvironmentDbs>,
ro_txns: Arc<()>,
rw_txns: Arc<()>,
}

impl EnvironmentImpl {
fn serialize(&self) -> Result<Vec<u8>, ErrorImpl> {
let arena = self.arena.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let data: HashMap<_, _> = dbs.iter().map(|(name, id)| (name, &arena[id.0])).collect();
let data: HashMap<_, _> = dbs.name_map.iter().map(|(name, id)| (name, &dbs.arena[id.0])).collect();
Ok(bincode::serialize(&data)?)
}

fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut dbs = HashMap::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = bincode::deserialize(&bytes)?;
for (name, db) in data {
dbs.insert(name, DatabaseImpl(arena.alloc(db)));
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
Ok((arena, dbs))
Ok((arena, name_map))
}
}

Expand All @@ -165,8 +185,10 @@ impl EnvironmentImpl {
Ok(EnvironmentImpl {
path: path.to_path_buf(),
max_dbs: max_dbs.unwrap_or(std::usize::MAX),
arena: RwLock::new(DatabaseArena::new()),
dbs: RwLock::new(HashMap::new()),
dbs: RwLock::new(EnvironmentDbs {
arena: DatabaseArena::new(),
name_map: HashMap::new(),
}),
ro_txns: Arc::new(()),
rw_txns: Arc::new(()),
})
Expand All @@ -180,9 +202,11 @@ impl EnvironmentImpl {
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, dbs) = Self::deserialize(&fs::read(&path)?)?;
self.arena = RwLock::new(arena);
self.dbs = RwLock::new(dbs);
let (arena, name_map) = Self::deserialize(&fs::read(&path)?)?;
self.dbs = RwLock::new(EnvironmentDbs {
arena,
name_map,
});
Ok(())
}

Expand All @@ -195,12 +219,12 @@ impl EnvironmentImpl {
Ok(())
}

pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<DatabaseArena>, ErrorImpl> {
self.arena.read().map_err(|_| ErrorImpl::EnvPoisonError)
pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<EnvironmentDbs>, ErrorImpl> {
self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)
}

pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<DatabaseArena>, ErrorImpl> {
self.arena.write().map_err(|_| ErrorImpl::EnvPoisonError)
pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<EnvironmentDbs>, ErrorImpl> {
self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)
}
}

Expand All @@ -215,7 +239,7 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {

fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
Ok(dbs.keys().map(|key| key.to_owned()).collect())
Ok(dbs.name_map.keys().map(|key| key.to_owned()).collect())
}

fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
Expand All @@ -225,8 +249,8 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let id = dbs.get(&key).ok_or(ErrorImpl::DbNotFoundError)?;
Ok(*id)
let db = dbs.name_map.get(&key).ok_or(ErrorImpl::DbNotFoundError)?;
Ok(*db)
}

fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
Expand All @@ -236,11 +260,13 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
let mut arena = self.arena.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
if dbs.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name != None {
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name != None {
return Err(ErrorImpl::DbsFull);
}
let id = dbs.entry(key).or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None))));
let parts = EnvironmentDbsRefMut::from(dbs.deref_mut());
let arena = parts.arena;
let name_map = parts.name_map;
let id = name_map.entry(key).or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None))));
Ok(*id)
}

Expand Down
6 changes: 3 additions & 3 deletions src/backend/impl_safe/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct RoTransactionImpl<'t> {

impl<'t> RoTransactionImpl<'t> {
pub(crate) fn new(env: &'t EnvironmentImpl, idx: Arc<()>) -> Result<RoTransactionImpl<'t>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
let snapshots = env.dbs()?.arena.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
Ok(RoTransactionImpl {
env,
snapshots,
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct RwTransactionImpl<'t> {

impl<'t> RwTransactionImpl<'t> {
pub(crate) fn new(env: &'t EnvironmentImpl, idx: Arc<()>) -> Result<RwTransactionImpl<'t>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
let snapshots = env.dbs()?.arena.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
Ok(RwTransactionImpl {
env,
snapshots,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl<'t> BackendRwTransaction for RwTransactionImpl<'t> {
let mut dbs = self.env.dbs_mut()?;

for (id, snapshot) in self.snapshots {
let db = dbs.get_mut(id.0).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let db = dbs.arena.get_mut(id.0).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
db.replace(snapshot);
}

Expand Down
2 changes: 2 additions & 0 deletions tests/env-migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ fn test_easy_migrator_from_manager_failed_migration_1() {
let created_dst_arc_1 = dst_manager.get_or_create(root.path(), Rkv::new::<SafeMode>).unwrap();
let dst_env_1 = created_dst_arc_1.read().unwrap();
populate_store!(&dst_env_1);
dst_env_1.sync(true).expect("synced");
}

// Attempt to migrate again in a new env. This should *NOT* fail with DestinationNotEmpty.
Expand All @@ -453,6 +454,7 @@ fn test_easy_migrator_from_manager_failed_migration_2() {
let created_dst_arc_1 = dst_manager.get_or_create(root.path(), Rkv::new::<Lmdb>).unwrap();
let dst_env_1 = created_dst_arc_1.read().unwrap();
populate_store!(&dst_env_1);
dst_env_1.sync(true).expect("synced");
}

// Attempt to migrate again in a new env. This should *NOT* fail with DestinationNotEmpty.
Expand Down

0 comments on commit b51b5e0

Please sign in to comment.