Skip to content

Commit

Permalink
[Storage] Serialize JMT updates in multiple threads since it is CPU h…
Browse files Browse the repository at this point in the history
…eavy. (aptos-labs#2115)
  • Loading branch information
grao1991 authored Jul 25, 2022
1 parent ecd0af4 commit 890ed24
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions consensus/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ impl ConsensusDB {
}

pub fn save_highest_2chain_timeout_certificate(&self, tc: Vec<u8>) -> Result<(), DbError> {
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
batch.put::<SingleEntrySchema>(&SingleEntryKey::Highest2ChainTimeoutCert, &tc)?;
self.commit(batch)?;
Ok(())
}

pub fn save_vote(&self, last_vote: Vec<u8>) -> Result<(), DbError> {
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
batch.put::<SingleEntrySchema>(&SingleEntryKey::LastVote, &last_vote)?;
self.commit(batch)
}
Expand All @@ -103,7 +103,7 @@ impl ConsensusDB {
if block_data.is_empty() && qc_data.is_empty() {
return Err(anyhow::anyhow!("Consensus block and qc data is empty!").into());
}
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
block_data
.iter()
.try_for_each(|block| batch.put::<BlockSchema>(&block.id(), block))?;
Expand All @@ -120,7 +120,7 @@ impl ConsensusDB {
if block_ids.is_empty() {
return Err(anyhow::anyhow!("Consensus block ids is empty!").into());
}
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
block_ids.iter().try_for_each(|hash| {
batch.delete::<BlockSchema>(hash)?;
batch.delete::<QCSchema>(hash)
Expand All @@ -143,7 +143,7 @@ impl ConsensusDB {
}

pub fn delete_highest_2chain_timeout_certificate(&self) -> Result<(), DbError> {
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
batch.delete::<SingleEntrySchema>(&SingleEntryKey::Highest2ChainTimeoutCert)?;
self.commit(batch)
}
Expand All @@ -155,7 +155,7 @@ impl ConsensusDB {
}

pub fn delete_last_vote_msg(&self) -> Result<(), DbError> {
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
batch.delete::<SingleEntrySchema>(&SingleEntryKey::LastVote)?;
self.commit(batch)?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num-traits = "0.2.15"
once_cell = "1.10.0"
proptest = { version = "1.0.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
rayon = "1.5.2"
serde = "1.0.137"
thiserror = "1.0.31"

Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn confirm_or_save_frozen_subtrees(
num_leaves: LeafCount,
frozen_subtrees: &[HashValue],
) -> Result<()> {
let mut cs = ChangeSet::new();
let cs = ChangeSet::new();
let positions: Vec<_> = FrozenSubTreeIterator::new(num_leaves).collect();

ensure!(
Expand Down
8 changes: 5 additions & 3 deletions storage/aptosdb/src/pruner/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use aptos_jellyfish_merkle::StaleNodeIndex;
use aptos_logger::error;
use aptos_types::transaction::{AtomicVersion, Version};
use schemadb::{ReadOptions, SchemaBatch, DB};
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::Ordering, Arc};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -126,7 +128,7 @@ impl StateStorePruner {
.start_timer();
let new_min_readable_version =
indices.last().expect("Should exist.").stale_since_version;
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
// Delete stale nodes.
indices.into_iter().try_for_each(|index| {
batch.delete::<JellyfishMerkleNodeSchema>(&index.node_key)?;
Expand Down
26 changes: 16 additions & 10 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! This file defines state store APIs that are related account state Merkle tree.
use crate::jellyfish_merkle_node::JellyfishMerkleNodeSchema;
use anyhow::{anyhow, ensure, format_err, Result};
use aptos_crypto::{
hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
Expand All @@ -25,6 +26,7 @@ use aptos_types::{
transaction::Version,
};
use executor_types::in_memory_state_calculator::InMemoryStateCalculator;
use rayon::prelude::*;
use schemadb::{ReadOptions, SchemaBatch, DB};
use std::{collections::HashMap, sync::Arc};
use storage_interface::{
Expand All @@ -35,7 +37,7 @@ use storage_interface::{
use crate::{
change_set::ChangeSet,
schema::{stale_node_index::StaleNodeIndexSchema, state_value::StateValueSchema},
state_merkle_db::{add_node_batch, StateMerkleDb},
state_merkle_db::StateMerkleDb,
AptosDbError, LedgerStore, TransactionStore, OTHER_TIMERS_SECONDS,
};

Expand Down Expand Up @@ -342,25 +344,29 @@ impl StateStore {
.batch_put_value_set(value_set, node_hashes, base_version, version)
}?;

let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
{
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["serialize_jmt_commit"])
.start_timer();

add_node_batch(
&mut batch,
tree_update_batch
.node_batch
.iter()
.flatten()
.map(|(k, v)| (k, v)),
)?;
tree_update_batch
.node_batch
.iter()
.flatten()
.collect::<Vec<_>>()
.par_iter()
.with_min_len(128)
.map(|(node_key, node)| batch.put::<JellyfishMerkleNodeSchema>(node_key, node))
.collect::<Result<Vec<_>>>()?;

tree_update_batch
.stale_node_index_batch
.iter()
.flatten()
.collect::<Vec<_>>()
.par_iter()
.with_min_len(128)
.map(|row| batch.put::<StaleNodeIndexSchema>(row, &()))
.collect::<Result<Vec<()>>>()?;
}
Expand Down
1 change: 1 addition & 0 deletions storage/schemadb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = "1.0.57"
once_cell = "1.10.0"
proptest = { version = "1.0.0", optional = true }

aptos-infallible = { path = "../../crates/aptos-infallible" }
aptos-logger = { path = "../../crates/aptos-logger" }
aptos-metrics-core = { path = "../../crates/aptos-metrics-core" }

Expand Down
36 changes: 23 additions & 13 deletions storage/schemadb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec},
};
use anyhow::{format_err, Result};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use std::{collections::HashMap, iter::Iterator, marker::PhantomData, path::Path};

Expand All @@ -48,9 +49,17 @@ enum WriteOp {

/// `SchemaBatch` holds a collection of updates that can be applied to a DB atomically. The updates
/// will be applied in the order in which they are added to the `SchemaBatch`.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct SchemaBatch {
rows: HashMap<ColumnFamilyName, Vec<WriteOp>>,
rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
}

impl Default for SchemaBatch {
fn default() -> Self {
Self {
rows: Mutex::new(HashMap::new()),
}
}
}

impl SchemaBatch {
Expand All @@ -60,13 +69,14 @@ impl SchemaBatch {
}

/// Adds an insert/update operation to the batch.
pub fn put<S: Schema>(&mut self, key: &S::Key, value: &S::Value) -> Result<()> {
pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
let _timer = APTOS_SCHEMADB_BATCH_PUT_LATENCY_SECONDS
.with_label_values(&["unknown"])
.start_timer();
let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
let value = <S::Value as ValueCodec<S>>::encode_value(value)?;
self.rows
.lock()
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::Value { key, value });
Expand All @@ -75,9 +85,10 @@ impl SchemaBatch {
}

/// Adds a delete operation to the batch.
pub fn delete<S: Schema>(&mut self, key: &S::Key) -> Result<()> {
pub fn delete<S: Schema>(&self, key: &S::Key) -> Result<()> {
let key = <S::Key as KeyCodec<S>>::encode_key(key)?;
self.rows
.lock()
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::Deletion { key });
Expand All @@ -86,25 +97,23 @@ impl SchemaBatch {
}

/// Adds a delete range operation that delete a range [start, end)
pub fn delete_range<S: Schema>(&mut self, begin: &S::Key, end: &S::Key) -> Result<()> {
pub fn delete_range<S: Schema>(&self, begin: &S::Key, end: &S::Key) -> Result<()> {
let begin = <S::Key as KeyCodec<S>>::encode_key(begin)?;
let end = <S::Key as KeyCodec<S>>::encode_key(end)?;
self.rows
.lock()
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::DeletionRange { begin, end });
Ok(())
}

/// Adds a delete range operation that delete a range [start, end] including end
pub fn delete_range_inclusive<S: Schema>(
&mut self,
begin: &S::Key,
end: &S::Key,
) -> Result<()> {
pub fn delete_range_inclusive<S: Schema>(&self, begin: &S::Key, end: &S::Key) -> Result<()> {
let begin = <S::Key as KeyCodec<S>>::encode_key(begin)?;
let end = <S::Key as KeyCodec<S>>::encode_key(end)?;
self.rows
.lock()
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::DeletionRangeInclusive { begin, end });
Expand Down Expand Up @@ -305,7 +314,7 @@ impl DB {
pub fn put<S: Schema>(&self, key: &S::Key, value: &S::Value) -> Result<()> {
// Not necessary to use a batch, but we'd like a central place to bump counters.
// Used in tests only anyway.
let mut batch = SchemaBatch::new();
let batch = SchemaBatch::new();
batch.put::<S>(key, value)?;
self.write_schemas(batch)
}
Expand Down Expand Up @@ -355,9 +364,10 @@ impl DB {
let _timer = APTOS_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
.with_label_values(&[self.name])
.start_timer();
let rows_locked = batch.rows.lock();

let mut db_batch = rocksdb::WriteBatch::default();
for (cf_name, rows) in &batch.rows {
for (cf_name, rows) in rows_locked.iter() {
let cf_handle = self.get_cf_handle(cf_name)?;
for write_op in rows {
match write_op {
Expand All @@ -378,7 +388,7 @@ impl DB {
self.inner.write_opt(db_batch, &default_write_options())?;

// Bump counters only after DB write succeeds.
for (cf_name, rows) in &batch.rows {
for (cf_name, rows) in rows_locked.iter() {
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => {
Expand Down
10 changes: 5 additions & 5 deletions storage/schemadb/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn test_schema_put_get() {

fn test_schemabatch_delete_range_util(begin: u32, end: u32, is_inclusive: bool) {
let db = TestDB::new();
let mut db_batch = SchemaBatch::new();
let db_batch = SchemaBatch::new();
for i in 0..100u32 {
db_batch
.put::<TestSchema1>(&TestField(i), &TestField(i))
Expand Down Expand Up @@ -275,7 +275,7 @@ fn gen_expected_values(values: &[(u32, u32)]) -> Vec<(TestField, TestField)> {
fn test_single_schema_batch() {
let db = TestDB::new();

let mut db_batch = SchemaBatch::new();
let db_batch = SchemaBatch::new();
db_batch
.put::<TestSchema1>(&TestField(0), &TestField(0))
.unwrap();
Expand Down Expand Up @@ -313,7 +313,7 @@ fn test_single_schema_batch() {
fn test_two_schema_batches() {
let db = TestDB::new();

let mut db_batch1 = SchemaBatch::new();
let db_batch1 = SchemaBatch::new();
db_batch1
.put::<TestSchema1>(&TestField(0), &TestField(0))
.unwrap();
Expand All @@ -331,7 +331,7 @@ fn test_two_schema_batches() {
gen_expected_values(&[(0, 0), (1, 1)]),
);

let mut db_batch2 = SchemaBatch::new();
let db_batch2 = SchemaBatch::new();
db_batch2.delete::<TestSchema2>(&TestField(3)).unwrap();
db_batch2
.put::<TestSchema2>(&TestField(3), &TestField(3))
Expand Down Expand Up @@ -411,7 +411,7 @@ fn test_report_size() {
let db = TestDB::new();

for i in 0..1000 {
let mut db_batch = SchemaBatch::new();
let db_batch = SchemaBatch::new();
db_batch
.put::<TestSchema1>(&TestField(i), &TestField(i))
.unwrap();
Expand Down

0 comments on commit 890ed24

Please sign in to comment.