Skip to content

Commit

Permalink
Optimise memory usage while loading genesis (near#4269)
Browse files Browse the repository at this point in the history
1. Adding option of streaming records to Genesis
2. Applying genesis records in chunks (not only in case of streaming)

* opt out of records loading while loading config
* create GenesisValidator to allow validation while streaming records
* adding streaming records from genesis and records files
* adding records file path and streaming to Genesis
* stream records from file in json_hash if records field is empty
* stream records from file in genesis validation
* encapsulate records iteration vs streaming logic
* apply genesis state in chunks
* refactor process_records usage
* switch to streaming genesis in neard run
* remove unused import
* increase chunk size
* punctuation
* refactor genesis streaming
* refactor ref
* remove pub from GenesisValidator
* Borrow<StateRecord> -> &StateRecord
* process_records -> for_each_record
* move genesis processing to separate file & genesis chunk -> genesis batch
* make GenesisStateApplier stateless
* pass error from File::open
* make RecordsProcessor visitor universal
* fix imports
* move imports to tests
* new_as_is -> new_with_path
* minor fixes: imports and visibility
  • Loading branch information
posvyatokum authored May 7, 2021
1 parent a0177cd commit 884c6d1
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 280 deletions.
116 changes: 109 additions & 7 deletions core/chain-configs/src/genesis_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
//! contains `RuntimeConfig`, but we keep it here for now until we figure
//! out the better place.
use std::fs::File;
use std::io::BufReader;
use std::io::{BufReader, Read};
use std::marker::PhantomData;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::{fmt, io};

use chrono::{DateTime, Utc};
use num_rational::Rational;
use serde::{Deserialize, Serialize};
use serde::de::{self, DeserializeSeed, IgnoredAny, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Serializer;
use smart_default::SmartDefault;

Expand Down Expand Up @@ -180,6 +182,12 @@ pub struct Genesis {
#[serde(flatten)]
pub config: GenesisConfig,
pub records: GenesisRecords,
/// Genesis object may not contain records.
/// In this case records can be found in records_file.
/// The idea is that all records consume too much memory,
/// so they should be processed in streaming fashion with for_each_record.
#[serde(skip)]
pub records_file: PathBuf,
/// Using zero-size PhantomData is a Rust pattern preventing a structure being constructed
/// without calling `new` method, which has some initialization routine.
#[serde(skip)]
Expand Down Expand Up @@ -266,6 +274,74 @@ impl GenesisRecords {
}
}

/// Visitor for records.
/// Reads records one by one and passes them to sink.
/// If full genesis file is passed, reads records from "records" field and
/// IGNORES OTHER FIELDS.
struct RecordsProcessor<F> {
sink: F,
}

impl<'de, F: FnMut(StateRecord)> Visitor<'de> for RecordsProcessor<&'_ mut F> {
type Value = ();

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"either:\
1. array of StateRecord\
2. map with records field which is array of StateRecord",
)
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
while let Some(record) = seq.next_element::<StateRecord>()? {
(self.sink)(record)
}
Ok(())
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"records" => {
map.next_value_seed(self)?;
return Ok(());
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
Err(de::Error::custom("missing field: records"))
}
}

impl<'de, F: FnMut(StateRecord)> DeserializeSeed<'de> for RecordsProcessor<&'_ mut F> {
type Value = ();

fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(self)
}
}

fn stream_records_from_file(
reader: impl Read,
mut callback: impl FnMut(StateRecord),
) -> serde_json::Result<()> {
let mut deserializer = serde_json::Deserializer::from_reader(reader);
let records_processor = RecordsProcessor { sink: &mut callback };
deserializer.deserialize_any(records_processor)
}

pub struct GenesisJsonHasher {
digest: sha2::Sha256,
}
Expand All @@ -287,9 +363,9 @@ impl GenesisJsonHasher {

pub fn process_genesis(&mut self, genesis: &Genesis) {
self.process_config(&genesis.config);
for record in genesis.records.as_ref() {
self.process_record(record)
}
genesis.for_each_record(|record: &StateRecord| {
self.process_record(record);
});
}

pub fn finalize(self) -> CryptoHash {
Expand All @@ -299,11 +375,16 @@ impl GenesisJsonHasher {

impl Genesis {
pub fn new(config: GenesisConfig, records: GenesisRecords) -> Self {
let mut genesis = Self { config, records, phantom: PhantomData };
let mut genesis =
Self { config, records, records_file: PathBuf::new(), phantom: PhantomData };
genesis.config.total_supply = get_initial_supply(&genesis.records.as_ref());
genesis
}

pub fn new_with_path(config: GenesisConfig, records_file: PathBuf) -> Self {
Self { config, records: GenesisRecords(vec![]), records_file, phantom: PhantomData }
}

/// Reads Genesis from a single file.
pub fn from_file<P: AsRef<Path>>(path: P) -> Self {
let reader = BufReader::new(File::open(path).expect("Could not open genesis config file."));
Expand Down Expand Up @@ -337,6 +418,27 @@ impl Genesis {
hasher.process_genesis(self);
hasher.finalize()
}

fn stream_records_with_callback(&self, callback: impl FnMut(StateRecord)) -> io::Result<()> {
let reader = BufReader::new(File::open(&self.records_file)?);
stream_records_from_file(reader, callback).map_err(io::Error::from)
}

/// If records vector is empty processes records stream from records_file.
/// May panic if records_file is removed or is in wrong format.
pub fn for_each_record(&self, mut callback: impl FnMut(&StateRecord)) {
if self.records.as_ref().is_empty() {
let callback_move = |record: StateRecord| {
callback(&record);
};
self.stream_records_with_callback(callback_move)
.expect("error while streaming records");
} else {
for record in self.records.as_ref() {
callback(record);
}
}
}
}

pub fn get_initial_supply(records: &[StateRecord]) -> Balance {
Expand Down
13 changes: 13 additions & 0 deletions core/primitives/src/state_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,16 @@ fn to_printable(blob: &[u8]) -> String {
}
}
}

pub fn state_record_to_account_id(state_record: &StateRecord) -> &AccountId {
match state_record {
StateRecord::Account { account_id, .. }
| StateRecord::AccessKey { account_id, .. }
| StateRecord::Contract { account_id, .. }
| StateRecord::ReceivedData { account_id, .. }
| StateRecord::Data { account_id, .. } => account_id,
StateRecord::PostponedReceipt(receipt) | StateRecord::DelayedReceipt(receipt) => {
&receipt.receiver_id
}
}
}
30 changes: 25 additions & 5 deletions neard/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,12 +1046,13 @@ pub fn download_genesis(url: &String, path: &PathBuf) {
});
}

pub fn load_config(dir: &Path) -> NearConfig {
pub fn load_config_without_genesis_records(dir: &Path) -> NearConfig {
let config = Config::from_file(&dir.join(CONFIG_FILENAME));
let genesis = if let Some(ref genesis_records_file) = config.genesis_records_file {
Genesis::from_files(&dir.join(&config.genesis_file), &dir.join(genesis_records_file))
let genesis_config = GenesisConfig::from_file(&dir.join(&config.genesis_file));
let genesis_records_file = if let Some(genesis_records_file) = &config.genesis_records_file {
dir.join(genesis_records_file)
} else {
Genesis::from_file(&dir.join(&config.genesis_file))
dir.join(&config.genesis_file)
};
let validator_signer = if dir.join(&config.validator_key_file).exists() {
let signer =
Expand All @@ -1062,7 +1063,26 @@ pub fn load_config(dir: &Path) -> NearConfig {
None
};
let network_signer = InMemorySigner::from_file(&dir.join(&config.node_key_file));
NearConfig::new(config, genesis, (&network_signer).into(), validator_signer)
NearConfig::new(
config,
Genesis::new_with_path(genesis_config, genesis_records_file),
(&network_signer).into(),
validator_signer,
)
}

pub fn load_config(dir: &Path) -> NearConfig {
let mut near_config = load_config_without_genesis_records(dir);
near_config.genesis =
if let Some(ref genesis_records_file) = near_config.config.genesis_records_file {
Genesis::from_files(
&dir.join(&near_config.config.genesis_file),
&dir.join(genesis_records_file),
)
} else {
Genesis::from_file(&dir.join(&near_config.config.genesis_file))
};
near_config
}

pub fn load_test_config(seed: &str, port: u16, genesis: Genesis) -> NearConfig {
Expand Down
Loading

0 comments on commit 884c6d1

Please sign in to comment.