Skip to content

Commit

Permalink
Configure Bigtable's timeout, enabling by default (solana-labs#14657)
Browse files Browse the repository at this point in the history
* Configure bigtable's timeout when read-only

* Review comments

* Apply nits (thanks!)

Co-authored-by: Michael Vines <[email protected]>

* Timeout in the streamed decoding as well

Co-authored-by: Michael Vines <[email protected]>
  • Loading branch information
ryoqun and mvines authored Jan 19, 2021
1 parent 8a604de commit dcaa025
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 24 deletions.
9 changes: 8 additions & 1 deletion core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use std::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLock,
},
time::Duration,
};
use tokio::runtime;

Expand Down Expand Up @@ -115,6 +116,7 @@ pub struct JsonRpcConfig {
pub max_multiple_accounts: Option<usize>,
pub account_indexes: HashSet<AccountIndex>,
pub rpc_threads: usize,
pub rpc_bigtable_timeout: Option<Duration>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -738,7 +740,12 @@ impl JsonRpcRequestProcessor {
bigtable_blocks.retain(|&slot| slot <= end_slot);
bigtable_blocks
})
.unwrap_or_else(|_| vec![]));
.map_err(|_| {
Error::invalid_params(
"BigTable query failed (maybe timeout due to too large range?)"
.to_string(),
)
})?);
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl JsonRpcService {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(
!config.enable_bigtable_ledger_upload,
config.rpc_bigtable_timeout,
))
.map(|bigtable_ledger_storage| {
info!("BigTable ledger storage initialized");
Expand Down
12 changes: 6 additions & 6 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn upload(
ending_slot: Option<Slot>,
allow_missing_metadata: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;

Expand All @@ -37,7 +37,7 @@ async fn upload(
}

async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?;
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
match bigtable.get_first_available_block().await? {
Some(block) => println!("{}", block),
None => println!("No blocks available"),
Expand All @@ -47,7 +47,7 @@ async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
}

async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;

Expand Down Expand Up @@ -75,7 +75,7 @@ async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> {
}

async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;

Expand All @@ -87,7 +87,7 @@ async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::er
}

async fn confirm(signature: &Signature, verbose: bool) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;

Expand Down Expand Up @@ -127,7 +127,7 @@ pub async fn transaction_history(
show_transactions: bool,
query_chunk_size: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?;
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;

let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None;
while limit > 0 {
Expand Down
59 changes: 44 additions & 15 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
root_ca_certificate,
};
use log::*;
use std::time::{Duration, Instant};
use thiserror::Error;
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};

Expand Down Expand Up @@ -68,6 +69,9 @@ pub enum Error {

#[error("RPC error: {0}")]
RpcError(tonic::Status),

#[error("Timeout error")]
TimeoutError,
}

impl std::convert::From<std::io::Error> for Error {
Expand Down Expand Up @@ -95,6 +99,7 @@ pub struct BigTableConnection {
access_token: Option<AccessToken>,
channel: tonic::transport::Channel,
table_prefix: String,
timeout: Option<Duration>,
}

impl BigTableConnection {
Expand All @@ -106,7 +111,11 @@ impl BigTableConnection {
///
/// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
///
pub async fn new(instance_name: &str, read_only: bool) -> Result<Self> {
pub async fn new(
instance_name: &str,
read_only: bool,
timeout: Option<Duration>,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Expand All @@ -117,6 +126,7 @@ impl BigTableConnection {
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
.connect_lazy()?,
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name),
timeout,
})
}

Expand All @@ -135,20 +145,29 @@ impl BigTableConnection {
instance_name
);

let endpoint = {
let endpoint =
tonic::transport::Channel::from_static("https://bigtable.googleapis.com")
.tls_config(
ClientTlsConfig::new()
.ca_certificate(
root_ca_certificate::load().map_err(Error::CertificateError)?,
)
.domain_name("bigtable.googleapis.com"),
)?;

if let Some(timeout) = timeout {
endpoint.timeout(timeout)
} else {
endpoint
}
};

Ok(Self {
access_token: Some(access_token),
channel: tonic::transport::Channel::from_static(
"https://bigtable.googleapis.com",
)
.tls_config(
ClientTlsConfig::new()
.ca_certificate(
root_ca_certificate::load().map_err(Error::CertificateError)?,
)
.domain_name("bigtable.googleapis.com"),
)?
.connect_lazy()?,
channel: endpoint.connect_lazy()?,
table_prefix,
timeout,
})
}
}
Expand Down Expand Up @@ -183,6 +202,7 @@ impl BigTableConnection {
access_token: self.access_token.clone(),
client,
table_prefix: self.table_prefix.clone(),
timeout: self.timeout,
}
}

Expand Down Expand Up @@ -225,10 +245,12 @@ pub struct BigTable {
access_token: Option<AccessToken>,
client: bigtable_client::BigtableClient<tonic::transport::Channel>,
table_prefix: String,
timeout: Option<Duration>,
}

impl BigTable {
async fn decode_read_rows_response(
&self,
mut rrr: tonic::codec::Streaming<ReadRowsResponse>,
) -> Result<Vec<(RowKey, RowData)>> {
let mut rows: Vec<(RowKey, RowData)> = vec![];
Expand All @@ -240,7 +262,14 @@ impl BigTable {
let mut cell_timestamp = 0;
let mut cell_value = vec![];
let mut cell_version_ok = true;
let started = Instant::now();

while let Some(res) = rrr.message().await? {
if let Some(timeout) = self.timeout {
if Instant::now().duration_since(started) > timeout {
return Err(Error::TimeoutError);
}
}
for (i, mut chunk) in res.chunks.into_iter().enumerate() {
// The comments for `read_rows_response::CellChunk` provide essential details for
// understanding how the below decoding works...
Expand Down Expand Up @@ -361,7 +390,7 @@ impl BigTable {
.await?
.into_inner();

let rows = Self::decode_read_rows_response(response).await?;
let rows = self.decode_read_rows_response(response).await?;
Ok(rows.into_iter().map(|r| r.0).collect())
}

Expand Down Expand Up @@ -410,7 +439,7 @@ impl BigTable {
.await?
.into_inner();

Self::decode_read_rows_response(response).await
self.decode_read_rows_response(response).await
}

/// Get latest data from a single row of `table`, if that row exists. Returns an error if that
Expand Down Expand Up @@ -443,7 +472,7 @@ impl BigTable {
.await?
.into_inner();

let rows = Self::decode_read_rows_response(response).await?;
let rows = self.decode_read_rows_response(response).await?;
rows.into_iter()
.next()
.map(|r| r.1)
Expand Down
5 changes: 3 additions & 2 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ pub struct LedgerStorage {
}

impl LedgerStorage {
pub async fn new(read_only: bool) -> Result<Self> {
let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?;
pub async fn new(read_only: bool, timeout: Option<std::time::Duration>) -> Result<Self> {
let connection =
bigtable::BigTableConnection::new("solana-ledger", read_only, timeout).await?;
Ok(Self { connection })
}

Expand Down
12 changes: 12 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,15 @@ pub fn main() {
.default_value(&default_rpc_threads)
.help("Number of threads to use for servicing RPC requests"),
)
.arg(
Arg::with_name("rpc_bigtable_timeout")
.long("rpc-bigtable-timeout")
.value_name("SECONDS")
.validator(is_parsable::<u64>)
.takes_value(true)
.default_value("30")
.help("Number of seconds before timing out RPC requests backed by BigTable"),
)
.arg(
Arg::with_name("rpc_pubsub_enable_vote_subscription")
.long("rpc-pubsub-enable-vote-subscription")
Expand Down Expand Up @@ -1561,6 +1570,9 @@ pub fn main() {
u64
),
rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize),
rpc_bigtable_timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
.ok()
.map(Duration::from_secs),
account_indexes: account_indexes.clone(),
},
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
Expand Down

0 comments on commit dcaa025

Please sign in to comment.