Skip to content

Commit

Permalink
[0.30 cherrypick][perf improvement] - use multi get in get balance an…
Browse files Browse the repository at this point in the history
…d get all balances (MystenLabs#10485) (MystenLabs#10517)
  • Loading branch information
patrickkuo authored Apr 7, 2023
1 parent de21275 commit 18c0ea9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 61 deletions.
129 changes: 72 additions & 57 deletions crates/sui-json-rpc/src/coin_api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use jsonrpsee::RpcModule;
Expand All @@ -18,11 +18,12 @@ use sui_open_rpc::Module;
use sui_types::balance::Supply;
use sui_types::base_types::{MoveObjectType, ObjectID, ObjectRef, ObjectType, SuiAddress};
use sui_types::coin::{Coin, CoinMetadata, TreasuryCap};
use sui_types::error::SuiError;
use sui_types::error::{SuiError, UserInputError};
use sui_types::gas_coin::GAS;
use sui_types::messages::TransactionEffectsAPI;
use sui_types::object::{Object, Owner};
use sui_types::parse_sui_struct_tag;
use sui_types::storage::ObjectKey;

use crate::api::{cap_page_limit, CoinReadApiServer};
use crate::error::Error;
Expand All @@ -37,45 +38,52 @@ impl CoinReadApi {
Self { state }
}

fn get_object(&self, object_id: &ObjectID) -> Result<Object, Error> {
Ok(self
.state
.get_object_read(object_id)?
.into_object()
.map_err(SuiError::from)?)
}

fn get_coin(&self, coin_id: &ObjectID) -> Result<SuiCoin, Error> {
let o = self.get_object(coin_id)?;
if let Some(move_object) = o.data.try_as_move() {
let (balance, locked_until_epoch) = if move_object.type_().is_coin() {
let coin: Coin = bcs::from_bytes(move_object.contents())?;
(coin.balance.value(), None)
} else {
return Err(Error::SuiError(SuiError::ObjectDeserializationError {
error: format!("{:?} is not a supported coin type", move_object.type_()),
}));
};

Ok(SuiCoin {
coin_type: move_object
.type_()
.type_params()
.first()
.unwrap()
.to_string(),
coin_object_id: o.id(),
version: o.version(),
digest: o.digest(),
balance,
locked_until_epoch,
previous_transaction: o.previous_transaction,
fn multi_get_coin(&self, coins: &[ObjectKey]) -> Result<Vec<Result<SuiCoin, Error>>, Error> {
let o = self.state.database.multi_get_object_by_key(coins)?;

Ok(o.into_iter()
.zip(coins)
.map(|(o, ObjectKey(id, version))| {
let o = o.ok_or(UserInputError::ObjectNotFound {
object_id: *id,
version: Some(*version),
})?;

if let Some(move_object) = o.data.try_as_move() {
let (balance, locked_until_epoch) = if move_object.type_().is_coin() {
let coin: Coin = bcs::from_bytes(move_object.contents())?;
(coin.balance.value(), None)
} else {
return Err(Error::SuiError(SuiError::ObjectDeserializationError {
error: format!(
"{:?} is not a supported coin type",
move_object.type_()
),
}));
};

Ok(SuiCoin {
coin_type: move_object
.type_()
.type_params()
.first()
.unwrap()
.to_string(),
coin_object_id: o.id(),
version: o.version(),
digest: o.digest(),
balance,
locked_until_epoch,
previous_transaction: o.previous_transaction,
})
} else {
Err(Error::UnexpectedError(format!(
"Provided object : [{}] is not a Move object.",
o.id()
)))
}
})
} else {
Err(Error::UnexpectedError(format!(
"Provided object : [{coin_id}] is not a Move object."
)))
}
.collect())
}

async fn get_coins_internal(
Expand All @@ -90,20 +98,21 @@ impl CoinReadApi {
let limit = cap_page_limit(limit);
let mut coins = self
.get_owner_coin_iterator(owner, &coin_type)?
.skip_while(|o| matches!(&cursor, Some(cursor) if cursor != o))
.skip_while(|o| matches!(&cursor, Some(cursor) if cursor != &o.0))
// skip an extra b/c the cursor is exclusive
.skip(usize::from(cursor.is_some()))
.take(limit + 1)
.collect::<Vec<_>>();

let has_next_page = coins.len() > limit;
coins.truncate(limit);
let next_cursor = coins.last().cloned().map_or(cursor, Some);
let next_cursor = coins.last().cloned().map_or(cursor, |(id, _, _)| Some(id));

let data = self
.multi_get_coin(&coins.into_iter().map(ObjectKey::from).collect::<Vec<_>>())?
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

let mut data = vec![];
for coin in coins {
data.push(self.get_coin(&coin)?)
}
Ok(CoinPage {
data,
next_cursor,
Expand All @@ -115,20 +124,24 @@ impl CoinReadApi {
&'a self,
owner: SuiAddress,
coin_type: &'a Option<StructTag>,
) -> Result<impl Iterator<Item = ObjectID> + '_, Error> {
) -> Result<impl Iterator<Item = ObjectRef> + '_, Error> {
Ok(self
.state
.get_owner_objects_iterator(owner, None, None)?
.filter(move |o| matches!(&o.type_, ObjectType::Struct(type_) if is_coin_type(type_, coin_type)))
.map(|info|info.object_id))
.map(|info|(info.object_id, info.version, info.digest)))
}

async fn find_package_object(
&self,
package_id: &ObjectID,
object_struct_tag: StructTag,
) -> Result<Object, Error> {
let publish_txn_digest = self.get_object(package_id)?.previous_transaction;
let publish_txn_digest = self
.state
.get_object_read(package_id)?
.into_object()?
.previous_transaction;
let (_, effect) = self
.state
.get_executed_transaction_and_effects(publish_txn_digest)
Expand All @@ -152,7 +165,7 @@ impl CoinReadApi {
))
}
.await?;
self.get_object(&object_id)
Ok(self.state.get_object_read(&object_id)?.into_object()?)
}
}

Expand Down Expand Up @@ -206,9 +219,10 @@ impl CoinReadApiServer for CoinReadApi {
let mut total_balance = 0u128;
let mut locked_balance = HashMap::new();
let mut coin_object_count = 0;
let coins = coins.map(ObjectKey::from).collect::<Vec<_>>();

for coin in coins {
let coin = self.get_coin(&coin)?;
for coin in self.multi_get_coin(&coins)? {
let coin = coin?;
if let Some(lock) = coin.locked_until_epoch {
*locked_balance.entry(lock).or_default() += coin.balance as u128
} else {
Expand All @@ -226,12 +240,14 @@ impl CoinReadApiServer for CoinReadApi {
}

fn get_all_balances(&self, owner: SuiAddress) -> RpcResult<Vec<Balance>> {
// TODO: Add index to improve performance?
let coins = self.get_owner_coin_iterator(owner, &None)?;
let coins = self
.get_owner_coin_iterator(owner, &None)?
.map(ObjectKey::from)
.collect::<Vec<_>>();
let mut balances: HashMap<String, Balance> = HashMap::new();

for coin in coins {
let coin = self.get_coin(&coin)?;
for coin in self.multi_get_coin(&coins)? {
let coin = coin?;
let balance = balances.entry(coin.coin_type.clone()).or_insert(Balance {
coin_type: coin.coin_type,
coin_object_count: 0,
Expand All @@ -245,7 +261,6 @@ impl CoinReadApiServer for CoinReadApi {
}
balance.coin_object_count += 1;
}

Ok(balances.into_values().collect())
}

Expand Down
5 changes: 1 addition & 4 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1816,12 +1816,9 @@ pub fn base_db_options() -> DBOptions {
opt.set_max_open_files((limit / 8) as i32);
}

let row_cache = rocksdb::Cache::new_lru_cache(300_000).expect("Cache is ok");
opt.set_row_cache(&row_cache);

// The table cache is locked for updates and this determines the number
// of shards, ie 2^6. Increase in case of lock contentions.
opt.set_table_cache_num_shard_bits(6);
opt.set_table_cache_num_shard_bits(10);

opt.set_min_level_to_compress(2);
opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
Expand Down

0 comments on commit 18c0ea9

Please sign in to comment.