Skip to content

Commit

Permalink
refactor: split flat storage into multiple files and rename some stru…
Browse files Browse the repository at this point in the history
…cts (near#8664)

Part of near#8577

This includes 2 parts:
* split single-file `near_store::flat_state` module into multiple files under `flat` directory
* rename some structs to better reflect the responsibilities

Please note that this does not introduce any behaviour or API changes, it simply moves the code around.
  • Loading branch information
pugachAG authored Mar 3, 2023
1 parent 89d17a9 commit 725ddac
Show file tree
Hide file tree
Showing 27 changed files with 1,916 additions and 1,829 deletions.
35 changes: 16 additions & 19 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ use near_primitives::views::{
LightClientBlockView, SignedTransactionView,
};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::{store_helper, FlatStateDelta};
use near_store::flat_state::{FlatStorageCreationStatus, FlatStorageError};
use near_store::flat::{store_helper, FlatStateDelta};
use near_store::flat::{FlatStorageCreationStatus, FlatStorageError};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::{flat_state, StorageError};
use near_store::StorageError;
use near_store::{DBCol, ShardTries, StoreUpdate, WrappedTrieChanges};
use once_cell::sync::OnceCell;
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -628,12 +628,10 @@ impl Chain {
store_update.save_final_head(&header_head)?;

// Set the root block of flat state to be the genesis block. Later, when we
// init FlatStorageStates, we will read the from this column in storage, so it
// init FlatStorages, we will read the from this column in storage, so it
// must be set here.
let tmp_store_update = runtime_adapter.set_flat_storage_state_for_genesis(
genesis.hash(),
genesis.header().epoch_id(),
)?;
let tmp_store_update = runtime_adapter
.set_flat_storage_for_genesis(genesis.hash(), genesis.header().epoch_id())?;
store_update.merge(tmp_store_update);

info!(target: "chain", "Init: saved genesis: #{} {} / {:?}", block_head.height, block_head.last_block_hash, state_roots);
Expand Down Expand Up @@ -2101,15 +2099,13 @@ impl Chain {
block: &Block,
shard_id: ShardId,
) -> Result<(), Error> {
if let Some(flat_storage_state) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
{
if let Some(flat_storage) = self.runtime_adapter.get_flat_storage_for_shard(shard_id) {
let mut new_flat_head = *block.header().last_final_block();
if new_flat_head == CryptoHash::default() {
new_flat_head = *self.genesis.hash();
}
// Try to update flat head.
flat_storage_state.update_flat_head(&new_flat_head).unwrap_or_else(|err| {
flat_storage.update_flat_head(&new_flat_head).unwrap_or_else(|err| {
match &err {
FlatStorageError::BlockNotSupported(_) => {
// It's possible that new head is not a child of current flat head, e.g. when we have a
Expand Down Expand Up @@ -2192,8 +2188,8 @@ impl Chain {

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage_state need to be locked during the update of flat head, otherwise
// flat_storage_state is in an inconsistent state that could be accessed by the other
// because flat_storage need to be locked during the update of flat head, otherwise
// flat_storage is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
Expand Down Expand Up @@ -3140,7 +3136,7 @@ impl Chain {
) -> Result<(), Error> {
// Before working with state parts, remove existing flat storage data.
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
self.runtime_adapter.remove_flat_storage_state_for_shard(shard_id, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
Expand Down Expand Up @@ -3173,7 +3169,7 @@ impl Chain {
let block_hash = chunk.prev_block();

// Flat storage must not exist at this point because leftover keys corrupt its state.
assert!(self.runtime_adapter.get_flat_storage_state_for_shard(shard_id).is_none());
assert!(self.runtime_adapter.get_flat_storage_for_shard(shard_id).is_none());

// We synced shard state on top of _previous_ block for chunk in shard state header and applied state parts to
// flat storage. Now we can set flat head to hash of this block and create flat storage.
Expand All @@ -3194,7 +3190,7 @@ impl Chain {
if *block_hash != CryptoHash::default() {
let block_height = self.get_block_header(block_hash)?.height();

self.runtime_adapter.create_flat_storage_state_for_shard(
self.runtime_adapter.create_flat_storage_for_shard(
shard_id,
block_height,
self.store(),
Expand Down Expand Up @@ -4917,10 +4913,11 @@ impl<'a> ChainUpdate<'a> {
let delta = FlatStateDelta::from_state_changes(&trie_changes.state_changes());

if let Some(chain_flat_storage) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
self.runtime_adapter.get_flat_storage_for_shard(shard_id)
{
// If flat storage exists, we add a block to it.
let block_info = flat_state::BlockInfo { hash: block_hash, height, prev_hash };
let block_info =
near_store::flat::BlockInfo { hash: block_hash, height, prev_hash };
let store_update = chain_flat_storage
.add_block(&block_hash, delta, block_info)
.map_err(|e| StorageError::from(e))?;
Expand Down
15 changes: 7 additions & 8 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::state::ValueRef;
use near_primitives::state_part::PartId;
use near_primitives::types::{AccountId, BlockHeight, ShardId, StateRoot};
use near_store::flat_state::FlatStorageCreationStatus;
use near_store::flat::FlatStorageCreationStatus;
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::{store_helper, FetchingStateStatus};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::{NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT};
use near_store::flat::{
store_helper, FetchingStateStatus, FlatStateDelta, NUM_PARTS_IN_ONE_STEP,
STATE_PART_MEMORY_LIMIT,
};
use near_store::migrations::BatchedStoreUpdate;
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::DBCol;
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::FlatStateDelta;
use near_store::{Store, FLAT_STORAGE_HEAD_HEIGHT};
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
Expand Down Expand Up @@ -390,7 +389,7 @@ impl FlatStorageShardCreator {
);
store_helper::set_flat_head(&mut store_update, shard_id, &flat_head);
store_update.commit()?;
self.runtime_adapter.create_flat_storage_state_for_shard(
self.runtime_adapter.create_flat_storage_for_shard(
shard_id,
chain_store.head().unwrap().height,
chain_store,
Expand Down Expand Up @@ -440,7 +439,7 @@ impl FlatStorageCreator {
let status = runtime_adapter.get_flat_storage_creation_status(shard_id);
match status {
FlatStorageCreationStatus::Ready => {
runtime_adapter.create_flat_storage_state_for_shard(
runtime_adapter.create_flat_storage_for_shard(
shard_id,
chain_head.height,
chain_store,
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::chunks_store::ReadOnlyChunksStore;
use crate::types::{Block, BlockHeader, LatestKnown};
use crate::{byzantine_assert, RuntimeWithEpochManagerAdapter};
use near_store::db::StoreStatistics;
use near_store::flat_state::{BlockInfo, ChainAccessForFlatStorage};
use near_store::flat::{BlockInfo, ChainAccessForFlatStorage};
use std::sync::Arc;

/// lru cache size
Expand Down
11 changes: 5 additions & 6 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ use crate::{BlockHeader, RuntimeWithEpochManagerAdapter};

use near_primitives::epoch_manager::ShardConfig;

use near_store::flat_state::ChainAccessForFlatStorage;
use near_store::flat_state::{FlatStorageCreationStatus, FlatStorageState};
use near_store::flat::{ChainAccessForFlatStorage, FlatStorage, FlatStorageCreationStatus};

use super::ValidatorSchedule;

Expand Down Expand Up @@ -830,15 +829,15 @@ impl RuntimeAdapter for KeyValueRuntime {
))
}

fn get_flat_storage_state_for_shard(&self, _shard_id: ShardId) -> Option<FlatStorageState> {
fn get_flat_storage_for_shard(&self, _shard_id: ShardId) -> Option<FlatStorage> {
None
}

fn get_flat_storage_creation_status(&self, _shard_id: ShardId) -> FlatStorageCreationStatus {
FlatStorageCreationStatus::DontCreate
}

fn create_flat_storage_state_for_shard(
fn create_flat_storage_for_shard(
&self,
shard_id: ShardId,
_latest_block_height: BlockHeight,
Expand All @@ -847,15 +846,15 @@ impl RuntimeAdapter for KeyValueRuntime {
panic!("Flat storage state can't be created for shard {shard_id} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_state_for_shard(
fn remove_flat_storage_for_shard(
&self,
_shard_id: ShardId,
_epoch_id: &EpochId,
) -> Result<(), Error> {
Ok(())
}

fn set_flat_storage_state_for_genesis(
fn set_flat_storage_for_genesis(
&self,
_genesis_block: &CryptoHash,
_genesis_epoch_id: &EpochId,
Expand Down
11 changes: 5 additions & 6 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use near_primitives::version::{
MIN_PROTOCOL_VERSION_NEP_92_FIX,
};
use near_primitives::views::{QueryRequest, QueryResponse};
use near_store::flat_state::ChainAccessForFlatStorage;
use near_store::flat_state::{FlatStorageCreationStatus, FlatStorageState};
use near_store::flat::{ChainAccessForFlatStorage, FlatStorage, FlatStorageCreationStatus};
use near_store::{PartialStorage, ShardTries, Store, StoreUpdate, Trie, WrappedTrieChanges};

pub use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -298,15 +297,15 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: StateRoot,
) -> Result<Trie, Error>;

fn get_flat_storage_state_for_shard(&self, shard_id: ShardId) -> Option<FlatStorageState>;
fn get_flat_storage_for_shard(&self, shard_id: ShardId) -> Option<FlatStorage>;

/// Gets status of flat storage state background creation.
fn get_flat_storage_creation_status(&self, shard_id: ShardId) -> FlatStorageCreationStatus;

/// Creates flat storage state for given shard, assuming that all flat storage data
/// is already stored in DB.
/// TODO (#7327): consider returning flat storage creation errors here
fn create_flat_storage_state_for_shard(
fn create_flat_storage_for_shard(
&self,
shard_id: ShardId,
latest_block_height: BlockHeight,
Expand All @@ -315,13 +314,13 @@ pub trait RuntimeAdapter: Send + Sync {

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_state_for_shard(
fn remove_flat_storage_for_shard(
&self,
shard_id: ShardId,
epoch_id: &EpochId,
) -> Result<(), Error>;

fn set_flat_storage_state_for_genesis(
fn set_flat_storage_for_genesis(
&self,
genesis_block: &CryptoHash,
genesis_epoch_id: &EpochId,
Expand Down
46 changes: 46 additions & 0 deletions core/store/src/flat/chunk_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use near_primitives::hash::CryptoHash;
use near_primitives::state::ValueRef;

use crate::Store;

use super::FlatStorage;

/// Struct for getting value references from the flat storage, corresponding
/// to some block defined in `blocks_to_head`.
///
/// The main interface is the `get_ref` method, which is called in `Trie::get`
/// and `Trie::get_ref` because they are the same for each shard and they are
/// requested only once during applying chunk.
// TODO (#7327): lock flat state when `get_ref` is called or head is being updated. Otherwise, `apply_chunks` and
// `postprocess_block` parallel execution may corrupt the state.
#[derive(Clone)]
pub struct FlatStorageChunkView {
/// Used to access flat state stored at the head of flat storage.
/// It should store all trie keys and values/value refs for the state on top of
/// flat_storage.head, except for delayed receipt keys.
#[allow(unused)]
store: Store,
/// The block for which key-value pairs of its state will be retrieved. The flat state
/// will reflect the state AFTER the block is applied.
block_hash: CryptoHash,
/// Stores the state of the flat storage, for example, where the head is at and which
/// blocks' state are stored in flat storage.
#[allow(unused)]
flat_storage: FlatStorage,
}

impl FlatStorageChunkView {
pub fn new(store: Store, block_hash: CryptoHash, flat_storage: FlatStorage) -> Self {
Self { store, block_hash, flat_storage }
}
/// Returns value reference using raw trie key, taken from the state
/// corresponding to `FlatStorageChunkView::block_hash`.
///
/// To avoid duplication, we don't store values themselves in flat state,
/// they are stored in `DBCol::State`. Also the separation is done so we
/// could charge users for the value length before loading the value.
// TODO (#7327): consider inlining small values, so we could use only one db access.
pub fn get_ref(&self, key: &[u8]) -> Result<Option<ValueRef>, crate::StorageError> {
self.flat_storage.get_ref(&self.block_hash, key)
}
}
Loading

0 comments on commit 725ddac

Please sign in to comment.