Skip to content

Commit

Permalink
Revert "feat(storage): use mdbx_txn_reset to time out transactions … (
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Mar 1, 2024
1 parent 2009784 commit d32a8ef
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 292 deletions.
5 changes: 0 additions & 5 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ where
describe_gauge!("db.table_pages", "The number of database pages for a table");
describe_gauge!("db.table_entries", "The number of entries for a table");
describe_gauge!("db.freelist", "The number of pages on the freelist");
describe_gauge!(
"db.timed_out_not_aborted_transactions",
"Number of timed out transactions that were not aborted by the user yet"
);

describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
describe_gauge!(
Expand Down
16 changes: 4 additions & 12 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,17 @@ impl Database for DatabaseEnv {
type TXMut = tx::Tx<RW>;

fn tx(&self) -> Result<Self::TX, DatabaseError> {
Tx::new_with_metrics(
Ok(Tx::new_with_metrics(
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
))
}

fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Tx::new_with_metrics(
Ok(Tx::new_with_metrics(
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
)
.map_err(|e| DatabaseError::InitTx(e.into()))
))
}
}

Expand Down Expand Up @@ -204,12 +202,6 @@ impl DatabaseMetrics for DatabaseEnv {
metrics.push(("db.freelist", freelist as f64, vec![]));
}

metrics.push((
"db.timed_out_not_aborted_transactions",
self.timed_out_not_aborted_transactions() as f64,
vec![],
));

metrics
}
}
Expand Down
24 changes: 11 additions & 13 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ impl<K: TransactionKind> Tx<K> {
pub fn new_with_metrics(
inner: Transaction<K>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
Ok(handler)
})
.transpose()?;
Ok(Self::new_inner(inner, metrics_handler))
) -> Self {
let metrics_handler = env_metrics.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id(), env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
handler
});
Self::new_inner(inner, metrics_handler)
}

#[inline]
Expand All @@ -78,8 +76,8 @@ impl<K: TransactionKind> Tx<K> {
}

/// Gets this transaction ID.
pub fn id(&self) -> reth_libmdbx::Result<u64> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
pub fn id(&self) -> u64 {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id)
}

/// Gets a table database handle if it exists, otherwise creates it.
Expand Down Expand Up @@ -439,7 +437,7 @@ mod tests {

assert_eq!(
tx.get::<tables::Transactions>(0).err(),
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into()))
); // Transaction is timeout-ed
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
// Backtrace is recorded
Expand Down
26 changes: 12 additions & 14 deletions crates/storage/libmdbx-rs/benches/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,27 @@ fn bench_get_seq_raw(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);

let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
let txn = env.begin_ro_txn().unwrap();
let _txn = env.begin_ro_txn().unwrap();
let txn = _txn.txn();

let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut cursor: *mut MDBX_cursor = ptr::null_mut();

c.bench_function("bench_get_seq_raw", |b| {
b.iter(|| unsafe {
txn.txn_execute(|txn| {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;

while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}

black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
})
.unwrap();
black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
})
});
}
Expand Down
5 changes: 2 additions & 3 deletions crates/storage/libmdbx-rs/benches/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {

c.bench_function("bench_get_rand_raw", |b| {
b.iter(|| unsafe {
txn.txn_execute(|txn| {
txn.with_raw_tx_ptr(|txn| {
let mut i: size_t = 0;
for key in &keys {
key_val.iov_len = key.len() as size_t;
Expand All @@ -57,8 +57,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {
i += key_val.iov_len;
}
black_box(i);
})
.unwrap();
});
})
});
}
Expand Down
79 changes: 36 additions & 43 deletions crates/storage/libmdbx-rs/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, Error, Result},
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
Expand Down Expand Up @@ -30,26 +30,26 @@ where
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
txn.txn_execute(|txn_ptr| {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
})??;
mdbx_result_with_tx_kind::<K>(
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
txn.txn(),
txn.env().txn_manager(),
)?;
}
Ok(Self { txn, cursor })
}

fn new_at_position(other: &Self) -> Result<Self> {
unsafe {
other.txn.txn_execute(|_| {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());

let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);

let s = Self { txn: other.txn.clone(), cursor };
let s = Self { txn: other.txn.clone(), cursor };

mdbx_result(res)?;
mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;

Ok(s)
})?
Ok(s)
}
}

Expand Down Expand Up @@ -95,12 +95,11 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
self.txn.txn_execute(|txn| {
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
&mut data_val,
op,
))?;
let v = mdbx_result_with_tx_kind::<K>(
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
txn,
self.txn.env().txn_manager(),
)?;
assert_ne!(data_ptr, data_val.iov_base);
let key_out = {
// MDBX wrote in new key
Expand All @@ -112,7 +111,7 @@ where
};
let data_out = Value::decode_val::<K>(txn, data_val)?;
Ok((key_out, data_out, v))
})?
})
}
}

Expand Down Expand Up @@ -445,7 +444,7 @@ impl Cursor<RW> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})?
})
})?;

Ok(())
Expand All @@ -459,7 +458,7 @@ impl Cursor<RW> {
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
})?;

Ok(())
Expand All @@ -471,7 +470,7 @@ where
K: TransactionKind,
{
fn clone(&self) -> Self {
Self::new_at_position(self).unwrap()
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
}
}

Expand All @@ -489,7 +488,7 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
}
}

Expand Down Expand Up @@ -565,7 +564,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
let result = cursor.txn.txn_execute(|txn| {
cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -584,11 +583,7 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
})
}
}
Self::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -660,7 +655,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
let result = cursor.txn.txn_execute(|txn| {
cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -679,11 +674,7 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
})
}
}
Iter::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -761,15 +752,17 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);

let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
cursor.txn.txn_execute(|_| {
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
})
}
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
Expand Down
10 changes: 7 additions & 3 deletions crates/storage/libmdbx-rs/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, Result},
error::{mdbx_result_with_tx_kind, Result},
transaction::TransactionKind,
Environment, Transaction,
};
Expand Down Expand Up @@ -31,8 +31,12 @@ impl Database {
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
txn.txn_execute(|txn_ptr| {
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
})??;
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
txn_ptr,
txn.env().txn_manager(),
)
})?;
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
}

Expand Down
6 changes: 0 additions & 6 deletions crates/storage/libmdbx-rs/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ impl Environment {
&self.inner.txn_manager
}

/// Returns the number of timed out transactions that were not aborted by the user yet.
#[cfg(feature = "read-tx-timeouts")]
pub fn timed_out_not_aborted_transactions(&self) -> usize {
self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
}

/// Create a read-only transaction for use with the environment.
#[inline]
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
Expand Down
Loading

0 comments on commit d32a8ef

Please sign in to comment.