Skip to content

Commit

Permalink
pending transactions: replace mutex with optimistic tx (MystenLabs#6971)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o authored Dec 22, 2022
1 parent 9b024c3 commit 5cf072c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
27 changes: 14 additions & 13 deletions crates/sui-storage/src/write_path_pending_tx_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
//! 1. At one time, a transaction is only processed once.
//! 2. When Fullnode crashes and restarts, the pending transaction will be loaded and retried.
use crate::mutex_table::MutexTable;
use crate::write_ahead_log::{MUTEX_TABLE_SHARD_SIZE, MUTEX_TABLE_SIZE};
use std::path::PathBuf;
use sui_types::base_types::TransactionDigest;
use sui_types::error::{SuiError, SuiResult};
Expand All @@ -25,16 +23,14 @@ struct WritePathPendingTransactionTable {

pub struct WritePathPendingTransactionLog {
pending_transactions: WritePathPendingTransactionTable,
mutex_table: MutexTable<TransactionDigest>,
}

impl WritePathPendingTransactionLog {
pub fn new(path: PathBuf) -> Self {
let pending_transactions =
WritePathPendingTransactionTable::open_tables_read_write(path, None, None);
WritePathPendingTransactionTable::open_tables_transactional(path, None, None);
Self {
pending_transactions,
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE, MUTEX_TABLE_SHARD_SIZE),
}
}

Expand All @@ -48,15 +44,20 @@ impl WritePathPendingTransactionLog {
tx: &VerifiedTransaction,
) -> SuiResult<IsFirstRecord> {
let tx_digest = tx.digest();
let _guard = self.mutex_table.acquire_lock(*tx_digest).await;
if self.pending_transactions.logs.contains_key(tx_digest)? {
Ok(false)
} else {
self.pending_transactions
.logs
.insert(tx_digest, tx.serializable_ref())?;
Ok(true)
let transaction = self.pending_transactions.logs.transaction()?;
if transaction
.get(&self.pending_transactions.logs, tx_digest)?
.is_some()
{
return Ok(false);
}
let result = transaction
.insert_batch(
&self.pending_transactions.logs,
[(tx_digest, tx.serializable_ref())],
)?
.commit();
Ok(result.is_ok())
}

// This function does not need to be behind a lock because:
Expand Down
4 changes: 2 additions & 2 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,13 +851,13 @@ impl<'a> DBTransaction<'a> {
pub fn get_for_update<K: Serialize, V: DeserializeOwned>(
&self,
db: &DBMap<K, V>,
key: K,
key: &K,
) -> Result<Option<V>, TypedStoreError> {
if !Arc::ptr_eq(&db.rocksdb, &self.rocksdb) {
return Err(TypedStoreError::CrossDBBatch);
}
let k_buf = be_fix_int_ser(key.borrow())?;
match self.transaction.get_for_update(k_buf, true)? {
match self.transaction.get_for_update_cf(&db.cf(), k_buf, true)? {
Some(data) => Ok(Some(bincode::deserialize(&data)?)),
None => Ok(None),
}
Expand Down

0 comments on commit 5cf072c

Please sign in to comment.