Skip to content

Commit

Permalink
Handle removing slots during account scans (solana-labs#17471)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Jun 15, 2021
1 parent 471b341 commit ccc013e
Show file tree
Hide file tree
Showing 15 changed files with 934 additions and 319 deletions.
42 changes: 26 additions & 16 deletions client/src/rpc_custom_error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Implementation defined RPC server errors
use thiserror::Error;
use {
crate::rpc_response::RpcSimulateTransactionResult,
jsonrpc_core::{Error, ErrorCode},
Expand All @@ -17,35 +17,40 @@ pub const JSON_RPC_SERVER_ERROR_NO_SNAPSHOT: i64 = -32008;
pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_SLOT_SKIPPED: i64 = -32009;
pub const JSON_RPC_SERVER_ERROR_KEY_EXCLUDED_FROM_SECONDARY_INDEX: i64 = -32010;
pub const JSON_RPC_SERVER_ERROR_TRANSACTION_HISTORY_NOT_AVAILABLE: i64 = -32011;
pub const JSON_RPC_SCAN_ERROR: i64 = -32012;

#[derive(Error, Debug)]
pub enum RpcCustomError {
#[error("BlockCleanedUp")]
BlockCleanedUp {
slot: Slot,
first_available_block: Slot,
},
#[error("SendTransactionPreflightFailure")]
SendTransactionPreflightFailure {
message: String,
result: RpcSimulateTransactionResult,
},
#[error("TransactionSignatureVerificationFailure")]
TransactionSignatureVerificationFailure,
BlockNotAvailable {
slot: Slot,
},
NodeUnhealthy {
num_slots_behind: Option<Slot>,
},
#[error("BlockNotAvailable")]
BlockNotAvailable { slot: Slot },
#[error("NodeUnhealthy")]
NodeUnhealthy { num_slots_behind: Option<Slot> },
#[error("TransactionPrecompileVerificationFailure")]
TransactionPrecompileVerificationFailure(solana_sdk::transaction::TransactionError),
SlotSkipped {
slot: Slot,
},
#[error("SlotSkipped")]
SlotSkipped { slot: Slot },
#[error("NoSnapshot")]
NoSnapshot,
LongTermStorageSlotSkipped {
slot: Slot,
},
KeyExcludedFromSecondaryIndex {
index_key: String,
},
#[error("LongTermStorageSlotSkipped")]
LongTermStorageSlotSkipped { slot: Slot },
#[error("KeyExcludedFromSecondaryIndex")]
KeyExcludedFromSecondaryIndex { index_key: String },
#[error("TransactionHistoryNotAvailable")]
TransactionHistoryNotAvailable,
#[error("ScanError")]
ScanError { message: String },
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -141,6 +146,11 @@ impl From<RpcCustomError> for Error {
message: "Transaction history is not available from this node".to_string(),
data: None,
},
RpcCustomError::ScanError { message } => Self {
code: ErrorCode::ServerError(JSON_RPC_SCAN_ERROR),
message,
data: None,
},
}
}
}
16 changes: 11 additions & 5 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ use solana_ledger::{
};
use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_sdk::{
clock::{BankId, Slot},
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::timestamp,
};
use std::{
collections::{HashMap, HashSet},
iter::Iterator,
Expand Down Expand Up @@ -559,7 +564,7 @@ impl RepairService {

#[allow(dead_code)]
fn process_new_duplicate_slots(
new_duplicate_slots: &[Slot],
new_duplicate_slots: &[(Slot, BankId)],
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
Expand All @@ -568,7 +573,7 @@ impl RepairService {
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
repair_validators: &Option<HashSet<Pubkey>>,
) {
for slot in new_duplicate_slots {
for (slot, bank_id) in new_duplicate_slots {
warn!(
"Cluster confirmed slot: {}, dumping our current version and repairing",
slot
Expand All @@ -577,7 +582,7 @@ impl RepairService {
root_bank.clear_slot_signatures(*slot);

// Clear the accounts for this slot
root_bank.remove_unrooted_slots(&[*slot]);
root_bank.remove_unrooted_slots(&[(*slot, *bank_id)]);

// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
Expand Down Expand Up @@ -1139,6 +1144,7 @@ mod test {
);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot);
let duplicate_bank_id = bank9.bank_id();
let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey());
bank9
.transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey())
Expand All @@ -1156,7 +1162,7 @@ mod test {
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some());

RepairService::process_new_duplicate_slots(
&[duplicate_slot],
&[(duplicate_slot, duplicate_bank_id)],
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&bank9,
Expand Down
6 changes: 0 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,6 @@ impl ReplayStage {
// Clear the duplicate banks from BankForks
{
let mut w_bank_forks = bank_forks.write().unwrap();
// Purging should have already been taken care of by logic
// in repair_service, so make sure drop implementation doesn't
// run
if let Some(b) = w_bank_forks.get(*d) {
b.skip_drop.store(true, Ordering::Relaxed)
}
w_bank_forks.remove(*d);
}
}
Expand Down
3 changes: 3 additions & 0 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2041,6 +2041,7 @@ fn main() {
if remove_stake_accounts {
for (address, mut account) in bank
.get_program_accounts(&solana_stake_program::id())
.unwrap()
.into_iter()
{
account.set_lamports(0);
Expand Down Expand Up @@ -2074,6 +2075,7 @@ fn main() {
// Delete existing vote accounts
for (address, mut account) in bank
.get_program_accounts(&solana_vote_program::id())
.unwrap()
.into_iter()
{
account.set_lamports(0);
Expand Down Expand Up @@ -2235,6 +2237,7 @@ fn main() {

let accounts: BTreeMap<_, _> = bank
.get_all_accounts_with_modified_slots()
.unwrap()
.into_iter()
.filter(|(pubkey, _account, _slot)| {
include_sysvars || !solana_sdk::sysvar::is_sysvar_id(pubkey)
Expand Down
90 changes: 56 additions & 34 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ use {
tokio::runtime::Runtime,
};

type RpcCustomResult<T> = std::result::Result<T, RpcCustomError>;

pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;

Expand Down Expand Up @@ -705,18 +707,23 @@ impl JsonRpcRequestProcessor {
fn get_largest_accounts(
&self,
config: Option<RpcLargestAccountsConfig>,
) -> RpcResponse<Vec<RpcAccountBalance>> {
) -> RpcCustomResult<RpcResponse<Vec<RpcAccountBalance>>> {
let config = config.unwrap_or_default();
let bank = self.bank(config.commitment);

if let Some((slot, accounts)) = self.get_cached_largest_accounts(&config.filter) {
Response {
Ok(Response {
context: RpcResponseContext { slot },
value: accounts,
}
})
} else {
let (addresses, address_filter) = if let Some(filter) = config.clone().filter {
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply =
calculate_non_circulating_supply(&bank).map_err(|e| {
RpcCustomError::ScanError {
message: e.to_string(),
}
})?;
let addresses = non_circulating_supply.accounts.into_iter().collect();
let address_filter = match filter {
RpcLargestAccountsFilter::Circulating => AccountAddressFilter::Exclude,
Expand All @@ -728,6 +735,9 @@ impl JsonRpcRequestProcessor {
};
let accounts = bank
.get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?
.into_iter()
.map(|(address, lamports)| RpcAccountBalance {
address: address.to_string(),
Expand All @@ -736,15 +746,21 @@ impl JsonRpcRequestProcessor {
.collect::<Vec<RpcAccountBalance>>();

self.set_cached_largest_accounts(&config.filter, bank.slot(), &accounts);
new_response(&bank, accounts)
Ok(new_response(&bank, accounts))
}
}

fn get_supply(&self, commitment: Option<CommitmentConfig>) -> RpcResponse<RpcSupply> {
fn get_supply(
&self,
commitment: Option<CommitmentConfig>,
) -> RpcCustomResult<RpcResponse<RpcSupply>> {
let bank = self.bank(commitment);
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply =
calculate_non_circulating_supply(&bank).map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?;
let total_supply = bank.capitalization();
new_response(
Ok(new_response(
&bank,
RpcSupply {
total: total_supply,
Expand All @@ -756,7 +772,7 @@ impl JsonRpcRequestProcessor {
.map(|pubkey| pubkey.to_string())
.collect(),
},
)
))
}

fn get_vote_accounts(
Expand Down Expand Up @@ -1738,7 +1754,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
program_id: &Pubkey,
filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
let filter_closure = |account: &AccountSharedData| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
Expand All @@ -1753,21 +1769,26 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(program_id) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: program_id.to_string(),
}
.into());
});
}
Ok(
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| {
// The program-id account index checks for Account owner on inclusion. However, due
// to the current AccountsDb implementation, an account may remain in storage as a
// zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
// updates. We include the redundant filters here to avoid returning these
// accounts.
account.owner() == program_id && filter_closure(account)
}),
)
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
Ok(bank.get_filtered_program_accounts(program_id, filter_closure))
Ok(bank
.get_filtered_program_accounts(program_id, filter_closure)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
}
}

Expand All @@ -1777,7 +1798,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
owner_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
// The by-owner accounts index checks for Token Account state and Owner address on
// inclusion. However, due to the current AccountsDb implementation, an account may remain
// in storage as a zero-lamport AccountSharedData::Default() after being wiped and reinitialized in
Expand All @@ -1802,19 +1823,19 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(owner_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: owner_key.to_string(),
}
.into());
});
}
Ok(bank.get_filtered_indexed_accounts(
&IndexKey::SplTokenOwner(*owner_key),
|account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::SplTokenOwner(*owner_key), |account| {
account.owner() == &spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()),
})
},
))
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
Expand All @@ -1826,7 +1847,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
mint_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
// The by-mint accounts index checks for Token Account state and Mint address on inclusion.
// However, due to the current AccountsDb implementation, an account may remain in storage
// as be zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
Expand All @@ -1850,18 +1871,19 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(mint_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: mint_key.to_string(),
}
.into());
});
}
Ok(
bank.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| {
account.owner() == &spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()),
})
}),
)
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
Expand Down Expand Up @@ -2872,7 +2894,7 @@ pub mod rpc_full {
config: Option<RpcLargestAccountsConfig>,
) -> Result<RpcResponse<Vec<RpcAccountBalance>>> {
debug!("get_largest_accounts rpc request received");
Ok(meta.get_largest_accounts(config))
Ok(meta.get_largest_accounts(config)?)
}

fn get_supply(
Expand All @@ -2881,7 +2903,7 @@ pub mod rpc_full {
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcSupply>> {
debug!("get_supply rpc request received");
Ok(meta.get_supply(commitment))
Ok(meta.get_supply(commitment)?)
}

fn request_airdrop(
Expand Down
1 change: 1 addition & 0 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ fn process_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> Option<Strin
let total_supply = bank.capitalization();
let non_circulating_supply =
solana_runtime::non_circulating_supply::calculate_non_circulating_supply(&bank)
.expect("Scan should not error on root banks")
.lamports;
Some(format!(
"{}",
Expand Down
Loading

0 comments on commit ccc013e

Please sign in to comment.