Skip to content

Commit 99d9bcc

Browse files
gabriele-0201rphmeier
authored andcommitted
shim: add wait flag
1 parent 7357955 commit 99d9bcc

File tree

7 files changed

+151
-98
lines changed

7 files changed

+151
-98
lines changed

sugondat/shim/src/cli.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,25 @@ pub mod query {
171171
Hash([u8; 32]),
172172
}
173173

174+
#[derive(Debug, Args)]
175+
pub struct BlockParams {
176+
/// The block containing the blob to query.
177+
///
178+
/// Possible values: ["best", number, hash]
179+
///
180+
/// "best" is the highest finalized block.
181+
///
182+
/// Hashes must be 32 bytes, hex-encoded, and prefixed with "0x".
183+
#[arg(value_name = "BLOCK_REF")]
184+
pub block_ref: BlockRef,
185+
186+
/// By default, if the block is not available (e.g. not yet produced), the shim will return immediately.
187+
///
188+
/// By specifying this flag, the shim will wait until the block is available.
189+
#[clap(long)]
190+
pub wait: bool,
191+
}
192+
174193
impl std::fmt::Display for BlockRef {
175194
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
176195
match *self {
@@ -221,22 +240,15 @@ pub mod query {
221240
pub mod blob {
222241
use clap::Args;
223242

224-
use super::{BlockRef, SugondatRpcParams};
243+
use super::{BlockParams, SugondatRpcParams};
225244

226245
#[derive(Debug, Args)]
227246
pub struct Params {
228247
#[clap(flatten)]
229248
pub rpc: SugondatRpcParams,
230249

231-
/// The block containing the blob to query.
232-
///
233-
/// Possible values: ["best", number, hash]
234-
///
235-
/// "best" is the highest finalized block.
236-
///
237-
/// Hashes must be 32 bytes, hex-encoded, and prefixed with "0x".
238-
#[arg(value_name = "BLOCK_REF")]
239-
pub block: BlockRef,
250+
#[clap(flatten)]
251+
pub block: BlockParams,
240252

241253
/// The index of the extrinsic (transaction) containing the blob.
242254
#[arg(value_name = "INDEX")]
@@ -254,22 +266,15 @@ pub mod query {
254266
255267
use clap::Args;
256268

257-
use super::{BlockRef, SugondatRpcParams};
269+
use super::{BlockParams, SugondatRpcParams};
258270

259271
#[derive(Debug, Args)]
260272
pub struct Params {
261273
#[clap(flatten)]
262274
pub rpc: SugondatRpcParams,
263275

264-
/// The block to query information about.
265-
///
266-
/// Possible values: ["best", number, hash]
267-
///
268-
/// "best" is the highest finalized block.
269-
///
270-
/// Hashes must be 32 bytes, hex-encoded, and prefixed with "0x".
271-
#[arg(default_value_t = BlockRef::Best, value_name = "BLOCK_REF")]
272-
pub block: BlockRef,
276+
#[clap(flatten)]
277+
pub block: BlockParams,
273278
}
274279
}
275280

sugondat/shim/src/cmd/query/blob.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use super::connect_rpc;
2-
use crate::cli::query::{blob::Params, BlockRef};
1+
use super::{connect_rpc, get_block_at};
2+
use crate::cli::query::blob::Params;
33

44
use std::io::Write;
55

@@ -12,20 +12,7 @@ pub async fn run(params: Params) -> anyhow::Result<()> {
1212
} = params;
1313

1414
let client = connect_rpc(rpc).await?;
15-
16-
let maybe_hash = match block {
17-
BlockRef::Best => None,
18-
BlockRef::Hash(h) => Some(h),
19-
BlockRef::Number(n) => Some(
20-
client
21-
.block_hash(n)
22-
.await?
23-
.ok_or_else(|| anyhow::anyhow!("No block with number {}", n))?
24-
.0,
25-
),
26-
};
27-
28-
let block = client.get_block_at(maybe_hash).await?;
15+
let block = get_block_at(&client, block).await?;
2916

3017
let i = block
3118
.blobs

sugondat/shim/src/cmd/query/block.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
1-
use super::connect_rpc;
2-
use crate::cli::query::{block::Params, BlockRef};
1+
use super::{connect_rpc, get_block_at};
2+
use crate::cli::query::block::Params;
33

44
pub async fn run(params: Params) -> anyhow::Result<()> {
55
let Params { rpc, block } = params;
66

77
let client = connect_rpc(rpc).await?;
8-
9-
let maybe_hash = match block {
10-
BlockRef::Best => None,
11-
BlockRef::Hash(h) => Some(h),
12-
BlockRef::Number(n) => Some(
13-
client
14-
.block_hash(n)
15-
.await?
16-
.ok_or_else(|| anyhow::anyhow!("No block with number {}", n))?
17-
.0,
18-
),
19-
};
20-
21-
let block = client.get_block_at(maybe_hash).await?;
8+
let block = get_block_at(&client, block).await?;
229

2310
println!("Block: #{}", block.number);
2411
println!(" Hash: 0x{}", hex::encode(&block.hash[..]));

sugondat/shim/src/cmd/query/mod.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
cli::query::{Commands, Params},
2+
cli::query::{BlockParams, BlockRef, Commands, Params},
33
sugondat_rpc,
44
};
55

@@ -16,6 +16,30 @@ pub async fn run(params: Params) -> anyhow::Result<()> {
1616
Ok(())
1717
}
1818

19+
/// Given the BlockParams and the client to be used, try to fetch
20+
/// the corresponding block. It will wait until the block is avaiable if specified.
21+
pub async fn get_block_at(
22+
client: &sugondat_rpc::Client,
23+
block: BlockParams,
24+
) -> anyhow::Result<sugondat_rpc::Block> {
25+
let maybe_hash = match block.block_ref {
26+
BlockRef::Best => None,
27+
BlockRef::Hash(h) => Some(h),
28+
BlockRef::Number(n) => Some(match block.wait {
29+
true => client.wait_block_hash(n).await,
30+
false => client
31+
.block_hash(n)
32+
.await?
33+
.ok_or_else(|| anyhow::anyhow!("No block with number {}", n))?,
34+
}),
35+
};
36+
37+
match block.wait {
38+
true => client.wait_block_at(maybe_hash).await,
39+
false => client.get_block_at(maybe_hash).await,
40+
}
41+
}
42+
1943
async fn connect_rpc(
2044
conn_params: crate::cli::SugondatRpcParams,
2145
) -> anyhow::Result<sugondat_rpc::Client> {

sugondat/shim/src/dock/rollkit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl RollkitRPCServer for RollkitDock {
3434
);
3535
let namespace = parse_namespace(&namespace).map_err(|_| err::bad_namespace())?;
3636
let block_hash = self.client.wait_finalized_height(height).await;
37-
let block = self.client.get_block_at(Some(block_hash)).await.unwrap();
37+
let block = self.client.wait_block_at(Some(block_hash)).await.unwrap();
3838
let mut blobs = vec![];
3939
for blob in block.blobs {
4040
if blob.namespace == namespace {

sugondat/shim/src/dock/sovereign.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl SovereignRPCServer for SovereignDock {
3434
) -> Result<Block, ErrorObjectOwned> {
3535
info!("get_block({})", height);
3636
let block_hash = self.client.wait_finalized_height(height).await;
37-
let block = self.client.get_block_at(Some(block_hash)).await.unwrap();
37+
let block = self.client.wait_block_at(Some(block_hash)).await.unwrap();
3838
let proof = make_namespace_proof(&block, namespace);
3939
let blobs = block
4040
.blobs

sugondat/shim/src/sugondat_rpc/mod.rs

Lines changed: 93 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{fmt, sync::Arc};
22

33
use crate::key::Keypair;
44
use anyhow::Context;
5-
use subxt::{config::Header as _, rpc_params, utils::H256};
5+
use subxt::{config::Header as _, error::BlockError, rpc_params, utils::H256};
66
use sugondat_nmt::Namespace;
77
use sugondat_subxt::{
88
sugondat::runtime_types::pallet_sugondat_blobs::namespace_param::UnvalidatedNamespace, Header,
@@ -63,6 +63,7 @@ impl Client {
6363
/// the block hash of the block at the given height.
6464
#[tracing::instrument(level = Level::DEBUG, skip(self))]
6565
pub async fn wait_finalized_height(&self, height: u64) -> [u8; 32] {
66+
tracing::info!("Waiting for block at height: {}", height);
6667
loop {
6768
let conn = self.connector.ensure_connected().await;
6869
match conn.finalized.wait_until_finalized(self, height).await {
@@ -79,7 +80,7 @@ impl Client {
7980
///
8081
/// If there is no block at the given height, returns `None`.
8182
#[tracing::instrument(level = Level::DEBUG, skip(self))]
82-
pub async fn block_hash(&self, height: u64) -> anyhow::Result<Option<H256>> {
83+
pub async fn block_hash(&self, height: u64) -> anyhow::Result<Option<[u8; 32]>> {
8384
loop {
8485
let conn = self.connector.ensure_connected().await;
8586
let block_hash: Option<H256> = match conn
@@ -102,43 +103,75 @@ impl Client {
102103
// at the given height.
103104
Ok(None)
104105
}
105-
Some(block_hash) => Ok(Some(block_hash)),
106+
Some(block_hash) => Ok(Some(block_hash.into())),
106107
};
107108
}
108109
}
109110

110-
/// Returns the header and the body of the block with the given hash, automatically retrying
111-
/// until it succeeds. `None` indicates the best block.
111+
/// Returns the block hash of the block at the given height,
112+
/// automatically retrying until it succeeds.
113+
/// `None` indicates the best block.
114+
pub async fn wait_block_hash(&self, height: u64) -> [u8; 32] {
115+
loop {
116+
match self.block_hash(height).await {
117+
Ok(Some(res)) => break res,
118+
Ok(None) => {
119+
tracing::info!("Block hash not available yet");
120+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
121+
}
122+
Err(e) => {
123+
// any other error is treated as a failure, resulting in the connection being reset
124+
tracing::error!(?e, "failed to query block hash");
125+
self.connector.reset().await;
126+
}
127+
}
128+
}
129+
}
130+
131+
/// Returns the header and body of the block with the given hash.
132+
/// If it is not available, it returns the reason of the failure.
133+
/// `None` indicates the best block.
112134
async fn get_header_and_extrinsics(
113135
&self,
114136
block_hash: Option<[u8; 32]>,
115-
) -> anyhow::Result<(Header, Vec<sugondat_subxt::ExtrinsicDetails>)> {
137+
) -> Result<(Header, Vec<sugondat_subxt::ExtrinsicDetails>), subxt::error::Error> {
116138
let block_hash = block_hash.map(H256::from);
139+
140+
let conn = self.connector.ensure_connected().await;
141+
let res = match block_hash {
142+
Some(h) => conn.subxt.blocks().at(h).await,
143+
None => conn.subxt.blocks().at_latest().await,
144+
}?;
145+
146+
let header = res.header();
147+
let body = res
148+
.extrinsics()
149+
.await?
150+
.iter()
151+
.collect::<Result<Vec<_>, _>>()?;
152+
Ok((header.clone(), body))
153+
}
154+
155+
/// Returns the header and the body of the block with the given hash,
156+
/// automatically retrying until it succeeds.
157+
/// `None` indicates the best block.
158+
async fn wait_header_and_extrinsics(
159+
&self,
160+
block_hash: Option<[u8; 32]>,
161+
) -> (Header, Vec<sugondat_subxt::ExtrinsicDetails>) {
117162
loop {
118-
let conn = self.connector.ensure_connected().await;
119-
let res = match block_hash {
120-
Some(h) => conn.subxt.blocks().at(h).await,
121-
None => conn.subxt.blocks().at_latest().await,
122-
};
123-
let err = match res {
124-
Ok(it) => {
125-
let header = it.header();
126-
let body = match it.extrinsics().await {
127-
Ok(it) => it,
128-
Err(err) => {
129-
tracing::error!(?err, "failed to query block");
130-
self.connector.reset().await;
131-
continue;
132-
}
133-
}
134-
.iter()
135-
.collect::<Result<Vec<_>, _>>()?;
136-
return Ok((header.clone(), body));
163+
match self.get_header_and_extrinsics(block_hash).await {
164+
Ok(res) => break res,
165+
Err(subxt::Error::Block(BlockError::NotFound(_hash))) => {
166+
tracing::info!("Block not available yet");
167+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
137168
}
138-
Err(err) => err,
139-
};
140-
tracing::error!(?err, "failed to query block");
141-
self.connector.reset().await;
169+
Err(e) => {
170+
// any other error is treated as a failure, resulting in the connection being reset
171+
tracing::error!(?e, "failed to query block");
172+
self.connector.reset().await;
173+
}
174+
}
142175
}
143176
}
144177

@@ -148,19 +181,16 @@ impl Client {
148181
/// `None` indicates that the best block should be used.
149182
#[tracing::instrument(level = Level::DEBUG, skip(self))]
150183
pub async fn get_block_at(&self, block_hash: Option<[u8; 32]>) -> anyhow::Result<Block> {
151-
let (header, extrinsics) = self.get_header_and_extrinsics(block_hash).await?;
152-
let tree_root = tree_root(&header).ok_or_else(err::no_tree_root)?;
153-
let timestamp = extract_timestamp(&extrinsics)?;
154-
let blobs = extract_blobs(extrinsics);
155-
tracing::debug!(?blobs, "found {} blobs in block", blobs.len());
156-
Ok(Block {
157-
number: header.number as u64,
158-
hash: header.hash().0,
159-
parent_hash: header.parent_hash.0,
160-
tree_root,
161-
timestamp,
162-
blobs,
163-
})
184+
Block::from_header_and_extrinsics(self.get_header_and_extrinsics(block_hash).await?)
185+
}
186+
187+
/// Returns the data of the block identified by the given block hash.
188+
/// It retries until the block is present
189+
///
190+
/// `None` indicates that the best block should be used.
191+
#[tracing::instrument(level = Level::DEBUG, skip(self))]
192+
pub async fn wait_block_at(&self, block_hash: Option<[u8; 32]>) -> anyhow::Result<Block> {
193+
Block::from_header_and_extrinsics(self.wait_header_and_extrinsics(block_hash).await)
164194
}
165195

166196
/// Submit a blob with the given namespace and signed with the given key. The block is submitted
@@ -319,7 +349,7 @@ impl FinalizedHeadWatcher {
319349
// TODO: throttle the retries. // TODO: return an error
320350
continue;
321351
};
322-
break Some(block_hash.0);
352+
break Some(block_hash);
323353
}
324354
}
325355
}
@@ -347,6 +377,26 @@ pub struct Block {
347377
pub blobs: Vec<Blob>,
348378
}
349379

380+
impl Block {
381+
fn from_header_and_extrinsics(
382+
value: (Header, Vec<sugondat_subxt::ExtrinsicDetails>),
383+
) -> anyhow::Result<Self> {
384+
let (header, extrinsics) = value;
385+
let tree_root = tree_root(&header).ok_or_else(err::no_tree_root)?;
386+
let timestamp = extract_timestamp(&extrinsics)?;
387+
let blobs = extract_blobs(extrinsics);
388+
tracing::debug!(?blobs, "found {} blobs in block", blobs.len());
389+
Ok(Block {
390+
number: header.number as u64,
391+
hash: header.hash().0,
392+
parent_hash: header.parent_hash.0,
393+
tree_root,
394+
timestamp,
395+
blobs,
396+
})
397+
}
398+
}
399+
350400
/// Represents a blob in a sugondat block.
351401
pub struct Blob {
352402
pub extrinsic_index: u32,

0 commit comments

Comments
 (0)