Skip to content

Commit

Permalink
[wormhole-attester] Increase accuracy and improve logging (pyth-netwo…
Browse files Browse the repository at this point in the history
…rk#653)

* Tiltfile: TILT_DOCKER_REGISTRY def behavior, re-add namespace_create

* wormhole_attester/client v5.0.0: accuracy and logging opimisations

* [BREAKING CHANGE] min_interval_secs switches to milliseconds under
  min_interval_ms
* attestation jobs no longer use preflight checks - this includes a
custom variant of send_and_confirm_transaction(), see util.rs for details
* attestation error logging no longer pretty-prints the error
  structs ({:#?} became {:?})
  • Loading branch information
drozdziak1 authored Mar 3, 2023
1 parent 53c9654 commit bbc140f
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 23 deletions.
5 changes: 4 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ namespace = os.environ.get("TILT_NAMESPACE", "development")
load("ext://namespace", "namespace_create", "namespace_inject")
load("ext://secret", "secret_yaml_generic")

default_registry(image_registry, single_name="development")
namespace_create(namespace)

if image_registry:
default_registry(image_registry, single_name="development")

allow_k8s_contexts(k8s_context())

Expand Down
6 changes: 3 additions & 3 deletions third_party/pyth/p2w_autoattest.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@
min_rpc_interval_ms: 0 # RIP RPC
max_batch_jobs: 1000 # Where we're going there's no oomkiller
default_attestation_conditions:
min_interval_secs: 10
min_interval_ms: 10000
symbol_groups:
- group_name: fast_interval_rate_limited
conditions:
min_interval_secs: 1
min_interval_ms: 1000
rate_limit_interval_secs: 2
symbols:
"""
Expand All @@ -144,7 +144,7 @@
cfg_yaml += f"""
- group_name: longer_interval_sensitive_changes
conditions:
min_interval_secs: 3
min_interval_ms: 3000
price_changed_bps: 300
symbols:
"""
Expand Down
2 changes: 1 addition & 1 deletion wormhole_attester/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion wormhole_attester/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-wormhole-attester-client"
version = "4.1.0"
version = "5.0.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
20 changes: 10 additions & 10 deletions wormhole_attester/client/src/attestation_cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ pub const fn default_min_rpc_interval_ms() -> u64 {
150
}

pub const fn default_min_interval_secs() -> u64 {
60
pub const fn default_min_interval_ms() -> u64 {
60_000
}

pub const fn default_rate_limit_interval_secs() -> u32 {
Expand All @@ -335,8 +335,8 @@ pub struct AttestationConditions {
/// Lower bound on attestation rate. Attestation is triggered
/// unconditionally whenever the specified interval elapses since
/// last attestation.
#[serde(default = "default_min_interval_secs")]
pub min_interval_secs: u64,
#[serde(default = "default_min_interval_ms")]
pub min_interval_ms: u64,

/// Upper bound on attestation rate. Attesting the same batch
/// before this many seconds pass fails the tx. This limit is
Expand Down Expand Up @@ -370,7 +370,7 @@ impl AttestationConditions {
// Bug trap for new fields that also need to be included in
// the returned expression
let AttestationConditions {
min_interval_secs: _min_interval_secs,
min_interval_ms: _min_interval_ms,
max_batch_jobs: _max_batch_jobs,
price_changed_bps,
publish_time_min_delta_secs,
Expand All @@ -384,7 +384,7 @@ impl AttestationConditions {
impl Default for AttestationConditions {
fn default() -> Self {
Self {
min_interval_secs: default_min_interval_secs(),
min_interval_ms: default_min_interval_ms(),
max_batch_jobs: default_max_batch_jobs(),
price_changed_bps: None,
publish_time_min_delta_secs: None,
Expand Down Expand Up @@ -471,7 +471,7 @@ mod tests {
let fastbois = SymbolGroupConfig {
group_name: "fast bois".to_owned(),
conditions: Some(AttestationConditions {
min_interval_secs: 5,
min_interval_ms: 5,
..Default::default()
}),
symbols: vec![
Expand All @@ -489,7 +489,7 @@ mod tests {
let slowbois = SymbolGroupConfig {
group_name: "slow bois".to_owned(),
conditions: Some(AttestationConditions {
min_interval_secs: 200,
min_interval_ms: 200,
..Default::default()
}),
symbols: vec![
Expand Down Expand Up @@ -541,7 +541,7 @@ mod tests {
let eth_dup_price_key = Pubkey::new_unique();

let attestation_conditions_1 = AttestationConditions {
min_interval_secs: 5,
min_interval_ms: 5,
..Default::default()
};

Expand Down Expand Up @@ -584,7 +584,7 @@ mod tests {
};

let default_attestation_conditions = AttestationConditions {
min_interval_secs: 1,
min_interval_ms: 1,
..Default::default()
};

Expand Down
4 changes: 2 additions & 2 deletions wormhole_attester/client/src/batch_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ impl<'a> BatchState {

// min interval
if self.last_job_finished_at.elapsed()
> Duration::from_secs(self.conditions.min_interval_secs)
> Duration::from_millis(self.conditions.min_interval_ms)
{
ret = Some(format!(
"minimum interval of {}s elapsed since last state change",
self.conditions.min_interval_secs
self.conditions.min_interval_ms
));
}

Expand Down
10 changes: 8 additions & 2 deletions wormhole_attester/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use {
pyth_wormhole_attester::error::AttesterCustomError,
pyth_wormhole_attester_client::util::send_and_confirm_transaction_with_config,
solana_client::rpc_config::RpcSendTransactionConfig,
solana_program::instruction::InstructionError,
solana_sdk::transaction::TransactionError,
};
Expand Down Expand Up @@ -683,7 +685,11 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {

let tx_processing_start_time = Instant::now();

let sig = match rpc.send_and_confirm_transaction(&tx).await {
let sig = match send_and_confirm_transaction_with_config(&rpc, &tx, RpcSendTransactionConfig {
// Decreases probability of rate limit race conditions
skip_preflight: true,
..Default::default()
}).await {
Ok(s) => Ok(s),
Err(e) => match e.get_transaction_error() {
Some(TransactionError::InstructionError(_idx, InstructionError::Custom(code)))
Expand Down Expand Up @@ -750,7 +756,7 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
.or_else(move |e| async move {
// log any errors coming from the job
warn!(
"Batch {}/{}, group {:?} ERR: {:#?}",
"Batch {}/{}, group {:?} ERR: {:?}",
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
);

Expand Down
91 changes: 88 additions & 3 deletions wormhole_attester/client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ use {
trace,
},
prometheus::TextEncoder,
solana_client::{
client_error::Result as SolClientResult,
nonblocking::rpc_client::RpcClient,
rpc_config::RpcSendTransactionConfig,
rpc_request::RpcError,
},
solana_sdk::{
commitment_config::CommitmentConfig,
signature::Signature,
transaction::{
uses_durable_nonce,
Transaction,
},
},
std::{
net::SocketAddr,
ops::{
Expand All @@ -17,9 +31,12 @@ use {
Instant,
},
},
tokio::sync::{
Mutex,
MutexGuard,
tokio::{
sync::{
Mutex,
MutexGuard,
},
time::sleep,
},
warp::{
reply,
Expand Down Expand Up @@ -179,3 +196,71 @@ pub async fn start_metrics_server(addr: impl Into<SocketAddr> + 'static) {
.bind(addr)
.await;
}

/// WARNING: Copied verbatim from v1.10.31, be careful when bumping
/// solana crate versions!
///
/// TODO(2023-03-02): Use an upstream method when
/// it's available.
///
/// This method is almost identical to
/// RpcClient::send_and_confirm_transaction(). The only difference is
/// that we let the user specify the config and replace
/// send_transaction() inside with
/// send_transaction_with_config(). This variant is currently missing
/// from solana_client.
pub async fn send_and_confirm_transaction_with_config(
client: &RpcClient,
transaction: &Transaction,
config: RpcSendTransactionConfig,
) -> SolClientResult<Signature> {
const SEND_RETRIES: usize = 1;
const GET_STATUS_RETRIES: usize = usize::MAX;

'sending: for _ in 0..SEND_RETRIES {
let signature = client
.send_transaction_with_config(transaction, config)
.await?;

let recent_blockhash = if uses_durable_nonce(transaction).is_some() {
let (recent_blockhash, ..) = client
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.await?;
recent_blockhash
} else {
transaction.message.recent_blockhash
};

for status_retry in 0..GET_STATUS_RETRIES {
match client.get_signature_status(&signature).await? {
Some(Ok(_)) => return Ok(signature),
Some(Err(e)) => return Err(e.into()),
None => {
if !client
.is_blockhash_valid(&recent_blockhash, CommitmentConfig::processed())
.await?
{
// Block hash is not found by some reason
break 'sending;
} else if cfg!(not(test))
// Ignore sleep at last step.
&& status_retry < GET_STATUS_RETRIES
{
// Retry twice a second
sleep(Duration::from_millis(500)).await;

continue;
}
}
}
}
}

Err(RpcError::ForUser(
"unable to confirm transaction. \
This can happen in situations such as transaction expiration \
and insufficient fee-payer funds"
.to_string(),
)
.into())
}

0 comments on commit bbc140f

Please sign in to comment.