Skip to content

Commit

Permalink
Fix bugs in fee accumulation (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizdave97 authored Sep 28, 2024
1 parent f879358 commit ca02004
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tesseract/evm/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl IsmpProvider for EvmClient {
let initial_height = self.client.get_block_number().await?.low_u64();
let client = self.clone();
let poll_interval = self.config.poll_interval.unwrap_or(10);
let challenge_period = self.query_challenge_period(counterparty_state_id).await?;

tokio::spawn(async move {
let mut latest_height = initial_height;
let state_machine = client.state_machine;
Expand Down Expand Up @@ -676,7 +676,7 @@ impl IsmpProvider for EvmClient {
let provider = Arc::new(client.clone());
// Yield if the challenge period elapses and the state commitment is not vetoed
tokio::select! {
_res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period, counterparty_state_id.state_id) => {
_res = wait_for_challenge_period(provider, state_machine_update_time, counterparty_state_id) => {
match _res {
Ok(_) => {
if let Err(err) = tx.send(Ok(event.clone())) {
Expand Down
2 changes: 1 addition & 1 deletion tesseract/integration-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn relay_get_response_message(
let get_resp_event = chain_a_client
.client
.events()
.at(tx_res_hash.unwrap())
.at(tx_res_hash.unwrap().0)
.await?
.find_first::<GetResponse>()?
.ok_or(anyhow!("Failed to fetch event"))?;
Expand Down
6 changes: 6 additions & 0 deletions tesseract/messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ async fn handle_update(
if let Some(sender) = fee_acc_sender {
// We should not store messages when they are delivered to hyperbridge
if chain_a.state_machine_id().state_id != coprocessor {
// Filter out receipts for transactions that originated from the coprocessor
let receipts = receipts
.into_iter()
.filter(|receipt| receipt.source() != coprocessor)
.collect::<Vec<_>>();
if !receipts.is_empty() {
// Store receipts in database before auto accumulation
tracing::trace!(target: "tesseract", "Persisting {} deliveries from {}->{} to the db", receipts.len(), chain_b.name(), chain_a.name());
Expand Down Expand Up @@ -442,6 +447,7 @@ async fn fee_accumulation<A: IsmpProvider + Clone + Clone + HyperbridgeClaim + '
let dest_height = match wait_for_state_machine_update(
dest.state_machine_id(),
hyperbridge.clone(),
dest.clone(),
delivery_height,
)
.await
Expand Down
24 changes: 13 additions & 11 deletions tesseract/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ impl TxReceipt {
TxReceipt::Response { height, .. } => *height,
}
}

pub fn source(&self) -> StateMachine {
match self {
TxReceipt::Request { query, .. } => query.source_chain,
TxReceipt::Response { query, .. } => query.source_chain,
}
}
}

/// A type that represents the location where state proof queries should be directed
Expand Down Expand Up @@ -462,13 +469,13 @@ impl NonceProvider {
pub async fn wait_for_challenge_period(
client: Arc<dyn IsmpProvider>,
last_consensus_update: Duration,
challenge_period: Duration,
counterparty_state_id: StateMachine,
counterparty_state_id: StateMachineId,
) -> anyhow::Result<()> {
let challenge_period = client.query_challenge_period(counterparty_state_id).await?;
if challenge_period != Duration::ZERO {
log::info!(
"Waiting for challenge period {challenge_period:?} for {} on {}",
counterparty_state_id,
counterparty_state_id.state_id,
client.name()
);
}
Expand All @@ -489,10 +496,12 @@ pub async fn wait_for_challenge_period(
pub async fn wait_for_state_machine_update(
state_id: StateMachineId,
hyperbridge: Arc<dyn IsmpProvider>,
counterparty: Arc<dyn IsmpProvider>,
height: u64,
) -> anyhow::Result<u64> {
let latest_height = hyperbridge.query_latest_height(state_id).await?.into();
if latest_height >= height {
observe_challenge_period(counterparty, hyperbridge, latest_height).await?;
return Ok(latest_height);
}

Expand All @@ -519,15 +528,8 @@ pub async fn observe_challenge_period(
hyperbridge: Arc<dyn IsmpProvider>,
height: u64,
) -> anyhow::Result<()> {
let challenge_period = hyperbridge.query_challenge_period(chain.state_machine_id()).await?;
let height = StateMachineHeight { id: chain.state_machine_id(), height };
let last_consensus_update = hyperbridge.query_state_machine_update_time(height).await?;
wait_for_challenge_period(
hyperbridge,
last_consensus_update,
challenge_period,
chain.state_machine_id().state_id,
)
.await?;
wait_for_challenge_period(hyperbridge, last_consensus_update, chain.state_machine_id()).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion tesseract/relayer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tesseract"
version = "0.7.0"
version = "0.8.0"
edition = "2021"
description = "Chain agnostic relayer implementation for Hyperbridge"
authors = ["Polytope Labs <[email protected]>"]
Expand Down
2 changes: 2 additions & 0 deletions tesseract/relayer/src/fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl AccumulateFees {
let height = wait_for_state_machine_update(
dest.state_machine_id(),
Arc::new(hyperbridge.clone()),
dest.clone(),
height,
)
.await?;
Expand Down Expand Up @@ -162,6 +163,7 @@ impl AccumulateFees {
let height = wait_for_state_machine_update(
source.state_machine_id(),
Arc::new(hyperbridge.clone()),
source.clone(),
height,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion tesseract/substrate/src/calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
let tx = Extrinsic::new("Relayer", "withdraw_fees", input_data.encode());
// Wait for finalization so we still get the correct block with the post request event even
// if a reorg happens
let hash = send_unsigned_extrinsic(&self.client, tx, true)
let (hash, _) = send_unsigned_extrinsic(&self.client, tx, true)
.await?
.ok_or_else(|| anyhow!("Transaction submission failed"))?;
let block_number = self
Expand Down
18 changes: 14 additions & 4 deletions tesseract/substrate/src/extrinsic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! Extrinsic utilities
use anyhow::Context;
use sp_core::H256;
use subxt::{
config::{extrinsic_params::BaseExtrinsicParamsBuilder, polkadot::PlainTip, ExtrinsicParams},
ext::sp_runtime::MultiSignature,
Expand All @@ -32,7 +33,7 @@ pub async fn send_unsigned_extrinsic<T: subxt::Config, Tx: TxPayload>(
client: &OnlineClient<T>,
payload: Tx,
wait_for_finalization: bool,
) -> Result<Option<T::Hash>, anyhow::Error>
) -> Result<Option<(T::Hash, Vec<H256>)>, anyhow::Error>
where
<T::ExtrinsicParams as ExtrinsicParams<T::Hash>>::OtherParams:
Default + Send + Sync + From<BaseExtrinsicParamsBuilder<T, PlainTip>>,
Expand Down Expand Up @@ -66,15 +67,24 @@ where
))?,
};

let hash = match extrinsic.wait_for_success().await {
let (hash, receipts) = match extrinsic.wait_for_success().await {
Ok(p) => {
log::info!("Successfully executed unsigned extrinsic {ext_hash:?}");
p.block_hash()
let mut receipts = p
.find::<subxt_utils::gargantua::api::ismp::events::PostRequestHandled>()
.filter_map(|ev| ev.ok().map(|e| e.0.commitment))
.collect::<Vec<_>>();
let temp_2 = p
.find::<subxt_utils::gargantua::api::ismp::events::PostResponseHandled>()
.filter_map(|ev| ev.ok().map(|e| e.0.commitment))
.collect::<Vec<_>>();
receipts.extend(temp_2);
(p.block_hash(), receipts)
},
Err(err) => Err(refine_subxt_error(err))
.context(format!("Error executing unsigned extrinsic {ext_hash:?}"))?,
};
Ok(Some(hash))
Ok(Some((hash, receipts)))
}

/// Dry run extrinsic
Expand Down
75 changes: 68 additions & 7 deletions tesseract/substrate/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use ismp::{
consensus::{ConsensusClientId, StateCommitment, StateMachineHeight, StateMachineId},
events::{Event, StateCommitmentVetoed},
host::StateMachine,
messaging::{CreateConsensusState, Message},
messaging::{hash_request, hash_response, CreateConsensusState, Message, ResponseMessage},
router::{Request, RequestResponse},
};
use pallet_ismp::{
child_trie::{
Expand Down Expand Up @@ -60,7 +61,7 @@ use subxt_utils::{
state_machine_update_time_storage_key,
};
use tesseract_primitives::{
wait_for_challenge_period, BoxStream, EstimateGasReturnParams, IsmpProvider, Query,
wait_for_challenge_period, BoxStream, EstimateGasReturnParams, Hasher, IsmpProvider, Query,
StateMachineUpdated, StateProofQueryType, TxReceipt,
};

Expand Down Expand Up @@ -509,7 +510,6 @@ where
(tx, recv)
};
let latest_height = client.query_finalized_height().await?;
let challenge_period = self.query_challenge_period(counterparty_state_id).await?;

if is_empty {
tokio::task::spawn(async move {
Expand Down Expand Up @@ -604,7 +604,7 @@ where

let provider = Arc::new(client.clone());
tokio::select! {
_res = wait_for_challenge_period(provider, state_machine_update_time, challenge_period, counterparty_state_id.state_id) => {
_res = wait_for_challenge_period(provider, state_machine_update_time, counterparty_state_id) => {
match _res {
Ok(_) => {
if let Err(err) = tx.send(Ok(event.clone())) {
Expand Down Expand Up @@ -649,7 +649,7 @@ where

async fn submit(&self, messages: Vec<Message>) -> Result<Vec<TxReceipt>, anyhow::Error> {
let mut futs = vec![];
for msg in messages {
for msg in messages.clone() {
let is_consensus_message = matches!(&msg, Message::Consensus(_));
let call = vec![msg].encode();
let extrinsic = Extrinsic::new("Ismp", "handle_unsigned", call);
Expand Down Expand Up @@ -678,11 +678,72 @@ where
futs.push(send_unsigned_extrinsic(&self.client, extrinsic, false))
}
}
futures::future::join_all(futs)
let results = futures::future::join_all(futs)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(Default::default())
let receipts = results
.into_iter()
.filter_map(|val| val.map(|(_, receipts)| receipts))
.flatten()
.collect::<Vec<_>>();

let mut results = vec![];
let height = {
let block = self
.client
.rpc()
.header(None)
.await?
.ok_or_else(|| anyhow!("Failed to get latest height"))?;
block.number().into()
};
for msg in messages {
match msg {
Message::Request(req_msg) =>
for post in req_msg.requests {
let req = Request::Post(post);
let commitment = hash_request::<Hasher>(&req);
if receipts.contains(&commitment) {
let tx_receipt = TxReceipt::Request {
query: Query {
source_chain: req.source_chain(),
dest_chain: req.dest_chain(),
nonce: req.nonce(),
commitment,
},
height,
};

results.push(tx_receipt);
}
},
Message::Response(ResponseMessage {
datagram: RequestResponse::Response(resp),
..
}) =>
for res in resp {
let commitment = hash_response::<Hasher>(&res);
let request_commitment = hash_request::<Hasher>(&res.request());
if receipts.contains(&commitment) {
let tx_receipt = TxReceipt::Response {
query: Query {
source_chain: res.source_chain(),
dest_chain: res.dest_chain(),
nonce: res.nonce(),
commitment,
},
request_commitment,
height,
};

results.push(tx_receipt);
}
},
_ => {},
}
}
Ok(results)
}

async fn query_challenge_period(&self, id: StateMachineId) -> Result<Duration, anyhow::Error> {
Expand Down

0 comments on commit ca02004

Please sign in to comment.