Skip to content

Commit

Permalink
Multi-version snapshot support (solana-labs#9980)
Browse files Browse the repository at this point in the history
* Multi-version snapshot support

* rustfmt

* Remove CLI options and runtime support for selection output snapshot version.
Address some clippy complaints.

* Muzzle clippy type complexity warning.

Despite clippy's suggestion, it is not currently possible to create type aliases
for traits and so everything within the 'Box<...>' cannot be type aliased.

This then leaves creating full blown traits, and either implementing
said traits by closure (somehow) or moving the closures into new structs
implementing said traits which seems a bit of a palaver.

Alternatively it is possible to define and use the type alias 'type ResultBox<T> = Result<Box<T>>'
which does seems rather pointless and not a great reduction in complexity but is enough to keep clippy happy.

In the end I simply went with squelching the clippy warning.

* Remove now unused Serialize/Deserialize trait implementations for AccountStorageEntry and AppendVec

* refactor versioned de/serialisers

* rename serde_utils to serde_snapshot

* move call to accounts_db.generate_index() back down to context_accountsdb_from_stream()

* update version 1.1.1 to 1.2.0
remove nested use of serialize_bytes

* cleanups

* Add back measurement of account storage entry serialization.
Remove construction of Vec and HashMap temporaries during serialization.

* consolidate serialisation test cases into serde_snapshot.
clean up leakage of implementation details in serde_snapshot.

* move short term / legacy snapshot code into child module

* add serialize_iter_as_tuple

* preliminary integration of following commit

commit 6d58b73
Author: Ryo Onodera <[email protected]>
Date:   Wed May 20 14:02:02 2020 +0900

    Confine snapshot 1.1 relic to versioned codepath

* refactored serde_snapshot, rustfmt
legacy accounts_db format now "owns" both leading u64s, legacy bank_rc format has none

* reduce type complexity (clippy)
  • Loading branch information
svenski123 authored May 22, 2020
1 parent 967320a commit b7a32f0
Show file tree
Hide file tree
Showing 14 changed files with 1,217 additions and 626 deletions.
5 changes: 1 addition & 4 deletions core/src/snapshot_packager_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ mod tests {
snapshot_package::AccountsPackage,
snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME},
};
use solana_runtime::{
accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
};
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
use solana_sdk::hash::Hash;
use std::{
fs::{self, remove_dir_all, OpenOptions},
Expand Down Expand Up @@ -186,7 +184,6 @@ mod tests {
let dummy_slot_deltas: Vec<BankSlotDelta> = vec![];
snapshot_utils::serialize_snapshot_data_file(
&snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion core/tests/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ mod tests {
&saved_snapshots_dir
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME),
solana_runtime::bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())
Expand Down
219 changes: 142 additions & 77 deletions ledger/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use log::*;
use regex::Regex;
use solana_measure::measure::Measure;
use solana_runtime::{
accounts_db::{SnapshotStorage, SnapshotStorages},
bank::{
self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta,
MAX_SNAPSHOT_DATA_FILE_SIZE,
bank::{Bank, BankSlotDelta},
serde_snapshot::{
bankrc_from_stream, bankrc_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
},
};
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
Expand All @@ -22,6 +21,8 @@ use std::{
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
process::ExitStatus,
str::FromStr,
sync::Arc,
};
use tar::Archive;
use tempfile::TempDir;
Expand All @@ -32,7 +33,47 @@ pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub const TAR_VERSION_FILE: &str = "version";

pub const SNAPSHOT_VERSION: &str = "1.1.0";
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
const VERSION_STRING_V1_1_0: &str = "1.1.0";
const VERSION_STRING_V1_2_0: &str = "1.2.0";
const OUTPUT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SnapshotVersion {
V1_1_0,
V1_2_0,
}

impl From<SnapshotVersion> for &'static str {
fn from(snapshot_version: SnapshotVersion) -> &'static str {
match snapshot_version {
SnapshotVersion::V1_1_0 => VERSION_STRING_V1_1_0,
SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
}
}
}

impl FromStr for SnapshotVersion {
type Err = &'static str;

fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
match version_string {
VERSION_STRING_V1_1_0 => Ok(SnapshotVersion::V1_1_0),
VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
_ => Err("unsupported snapshot version"),
}
}
}

impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}

fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}

#[derive(PartialEq, Ord, Eq, Debug)]
pub struct SlotSnapshotPaths {
Expand Down Expand Up @@ -192,7 +233,7 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
// Write version file
{
let mut f = std::fs::File::create(staging_version_file)?;
f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?;
f.write_all(OUTPUT_SNAPSHOT_VERSION.as_str().as_bytes())?;
}

let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression);
Expand Down Expand Up @@ -289,13 +330,35 @@ where
}
}

pub fn serialize_snapshot_data_file<F>(
pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
where
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
serialize_snapshot_data_file_capped::<F>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
serializer,
)
}

pub fn deserialize_snapshot_data_file<F, T>(data_file_path: &Path, deserializer: F) -> Result<T>
where
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
deserialize_snapshot_data_file_capped::<F, T>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
deserializer,
)
}

fn serialize_snapshot_data_file_capped<F>(
data_file_path: &Path,
maximum_file_size: u64,
mut serializer: F,
serializer: F,
) -> Result<u64>
where
F: FnMut(&mut BufWriter<File>) -> Result<()>,
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
let data_file = File::create(data_file_path)?;
let mut data_file_stream = BufWriter::new(data_file);
Expand All @@ -313,13 +376,13 @@ where
Ok(consumed_size)
}

pub fn deserialize_snapshot_data_file<F, T>(
fn deserialize_snapshot_data_file_capped<F, T>(
data_file_path: &Path,
maximum_file_size: u64,
mut deserializer: F,
deserializer: F,
) -> Result<T>
where
F: FnMut(&mut BufReader<File>) -> Result<T>,
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
let file_size = fs::metadata(&data_file_path)?.len();

Expand Down Expand Up @@ -367,21 +430,18 @@ pub fn add_snapshot<P: AsRef<Path>>(
);

let mut bank_serialize = Measure::start("bank-serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_bank_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream.by_ref(), &*bank)?;
serialize_into(
stream.by_ref(),
&BankRcSerialize {
bank_rc: &bank.rc,
snapshot_storages,
},
)?;
Ok(())
},
)?;
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
serialize_into(stream.by_ref(), bank)?;
bankrc_to_stream(
SerdeStyle::NEWER,
stream.by_ref(),
&bank.rc,
snapshot_storages,
)?;
Ok(())
};
let consumed_size =
serialize_snapshot_data_file(&snapshot_bank_file_path, bank_snapshot_serializer)?;
bank_serialize.stop();

// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
Expand Down Expand Up @@ -414,14 +474,10 @@ pub fn serialize_status_cache(
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);

let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_status_cache_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
},
)?;
let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
})?;
status_cache_serialize.stop();

// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
Expand Down Expand Up @@ -624,6 +680,13 @@ where
{
info!("snapshot version: {}", snapshot_version);

let snapshot_version_enum =
SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
))
})?;
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
if snapshot_paths.len() > 1 {
return Err(get_io_error("invalid snapshot format"));
Expand All @@ -633,45 +696,47 @@ where
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;

info!("Loading bank from {:?}", &root_paths.snapshot_file_path);
let bank = deserialize_snapshot_data_file(
&root_paths.snapshot_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
let mut bank: Bank = match snapshot_version {
SNAPSHOT_VERSION => deserialize_from_snapshot(stream.by_ref())?,
_ => {
return Err(get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
)));
}
};
bank.operating_mode = Some(genesis_config.operating_mode);
info!("Rebuilding accounts...");
let rc = bank::BankRc::from_stream(
let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| {
let mut bank: Bank = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(&mut stream)?;

info!("Rebuilding accounts...");

let mut bankrc = match snapshot_version_enum {
SnapshotVersion::V1_1_0 => bankrc_from_stream(
SerdeStyle::OLDER,
account_paths,
bank.slot(),
&bank.ancestors,
frozen_account_pubkeys,
stream.by_ref(),
&mut stream,
&append_vecs_path,
)?;
bank.set_bank_rc(rc, bank::StatusCacheRc::default());
bank.finish_init();
Ok(bank)
},
)?;
),
SnapshotVersion::V1_2_0 => bankrc_from_stream(
SerdeStyle::NEWER,
account_paths,
bank.slot(),
&mut stream,
&append_vecs_path,
),
}?;
Arc::get_mut(&mut Arc::get_mut(&mut bankrc.accounts).unwrap().accounts_db)
.unwrap()
.freeze_accounts(&bank.ancestors, frozen_account_pubkeys);

bank.rc = bankrc;
bank.operating_mode = Some(genesis_config.operating_mode);
bank.finish_init();
Ok(bank)
})?;

let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let slot_deltas = deserialize_snapshot_data_file(
&status_cache_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = deserialize_from_snapshot(stream)?;
Ok(slot_deltas)
},
)?;
let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(stream)?;
Ok(slot_deltas)
})?;

bank.src.append(&slot_deltas);

Expand Down Expand Up @@ -726,7 +791,7 @@ mod tests {
fn test_serialize_snapshot_data_file_under_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let consumed_size = serialize_snapshot_data_file(
let consumed_size = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -742,7 +807,7 @@ mod tests {
fn test_serialize_snapshot_data_file_over_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let result = serialize_snapshot_data_file(
let result = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| {
Expand All @@ -759,7 +824,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -769,7 +834,7 @@ mod tests {
)
.unwrap();

let actual_data = deserialize_snapshot_data_file(
let actual_data = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand All @@ -784,7 +849,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -794,7 +859,7 @@ mod tests {
)
.unwrap();

let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand All @@ -808,7 +873,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| {
Expand All @@ -819,7 +884,7 @@ mod tests {
)
.unwrap();

let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand Down
7 changes: 6 additions & 1 deletion programs/bpf/tests/programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ mod bpf {

assert!(bank_client
.send_message(
&[&mint_keypair, &argument_keypair, &invoked_argument_keypair, &from_keypair],
&[
&mint_keypair,
&argument_keypair,
&invoked_argument_keypair,
&from_keypair
],
message,
)
.is_ok());
Expand Down
Loading

0 comments on commit b7a32f0

Please sign in to comment.