From 980cd32f1489461606605ac34e988abdeaf96f08 Mon Sep 17 00:00:00 2001 From: gabriele-0201 Date: Wed, 10 Jan 2024 19:45:53 +0100 Subject: [PATCH] shim: add --no-retry flag --- sugondat/shim/src/cli.rs | 3 + sugondat/shim/src/cmd/query/mod.rs | 2 +- sugondat/shim/src/cmd/serve.rs | 6 +- sugondat/shim/src/sugondat_rpc/conn.rs | 58 ++++++++++ sugondat/shim/src/sugondat_rpc/mod.rs | 144 +++++++++++++------------ 5 files changed, 141 insertions(+), 72 deletions(-) diff --git a/sugondat/shim/src/cli.rs b/sugondat/shim/src/cli.rs index 89776241..360ca02e 100644 --- a/sugondat/shim/src/cli.rs +++ b/sugondat/shim/src/cli.rs @@ -87,6 +87,9 @@ pub struct SugondatRpcParams { /// The address of the sugondat-node to connect to. #[clap(long, default_value = "ws://localhost:9988", env = ENV_SUGONDAT_NODE_URL)] pub node_url: String, + + #[clap(long, default_value = "false", default_missing_value = "true")] + pub no_retry: bool, } impl DockParams { diff --git a/sugondat/shim/src/cmd/query/mod.rs b/sugondat/shim/src/cmd/query/mod.rs index d17b2afc..7b8980e8 100644 --- a/sugondat/shim/src/cmd/query/mod.rs +++ b/sugondat/shim/src/cmd/query/mod.rs @@ -19,5 +19,5 @@ pub async fn run(params: Params) -> anyhow::Result<()> { async fn connect_rpc( conn_params: crate::cli::SugondatRpcParams, ) -> anyhow::Result { - sugondat_rpc::Client::new(conn_params.node_url).await + sugondat_rpc::Client::new(conn_params.node_url, conn_params.no_retry).await } diff --git a/sugondat/shim/src/cmd/serve.rs b/sugondat/shim/src/cmd/serve.rs index 83dfb639..8f4e6452 100644 --- a/sugondat/shim/src/cmd/serve.rs +++ b/sugondat/shim/src/cmd/serve.rs @@ -16,7 +16,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix." ); } let server = Server::builder().build(listen_on).await?; - let client = connect_client(¶ms.rpc.node_url).await?; + let client = connect_client(¶ms.rpc.node_url, params.rpc.no_retry).await?; let methods = dock::init(dock::Config { // TODO: whenever there are more docks, the logic of checking if any at least one is enabled // and other similar stuff should be in CLI. @@ -30,7 +30,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix." Ok(()) } -async fn connect_client(url: &str) -> anyhow::Result { - let client = Client::new(url.to_string()).await?; +async fn connect_client(url: &str, no_retry: bool) -> anyhow::Result { + let client = Client::new(url.to_string(), no_retry).await?; Ok(client) } diff --git a/sugondat/shim/src/sugondat_rpc/conn.rs b/sugondat/shim/src/sugondat_rpc/conn.rs index db0951cb..cfae1a83 100644 --- a/sugondat/shim/src/sugondat_rpc/conn.rs +++ b/sugondat/shim/src/sugondat_rpc/conn.rs @@ -9,6 +9,64 @@ use subxt::backend::rpc::RpcClient; use sugondat_subxt::sugondat::is_codegen_valid_for; use tokio::sync::{oneshot, Mutex}; +#[derive(Clone)] +pub enum ConnectionType { + Persistent(Arc), + // TODO: thid Arc is useless, could be removed but + // a method like `connect_raw` should be added to Conn + Single(Arc), +} + +impl ConnectionType { + pub async fn new(rpc_url: String, no_retry: bool) -> anyhow::Result { + tracing::info!("connecting to sugondat node: {}", rpc_url); + + if no_retry { + // TODO: here conn_id id required but with no_retry flag no + // more than one connection will be attempted + let conn = Conn::connect(0, &rpc_url).await.map_err(|e| { + tracing::error!("failed to connect to sugondat node: {}\n", e); + e + })?; + return Ok(ConnectionType::Single(conn)); + } + + let rpc_url = Arc::new(rpc_url); + let connector = Arc::new(Connector::new(rpc_url)); + connector.ensure_connected().await; + + Ok(ConnectionType::Persistent(connector)) + } + + // Execute the given closure, if the connection type + // is Persistent then the connectio will be reset and + // the closure re executed + // + // tracing should be handled by the caller of this function + pub async fn exec>>( + &self, + action: impl Fn(Arc) -> Fut, + ) -> T { + match self { + ConnectionType::Persistent(connector) => loop { + let conn = connector.ensure_connected().await; + match action(conn).await { + Ok(res) => break res, + Err(_e) => { + // Reset the connection and retry + connector.reset().await; + } + }; + }, + ConnectionType::Single(conn) => action(conn.clone()).await.unwrap_or_else(|e| { + tracing::error!("connection to sugondat node interrupted: {}\n", e); + // TODO: is correct to panic here ? + panic!() + }), + } + } +} + // Contains the RPC client structures that are assumed to be connected. pub struct Conn { /// Connection id. For diagnostics purposes only. diff --git a/sugondat/shim/src/sugondat_rpc/mod.rs b/sugondat/shim/src/sugondat_rpc/mod.rs index 08f4c7eb..aac6d66f 100644 --- a/sugondat/shim/src/sugondat_rpc/mod.rs +++ b/sugondat/shim/src/sugondat_rpc/mod.rs @@ -10,6 +10,8 @@ use sugondat_subxt::{ use tokio::sync::watch; use tracing::Level; +use self::conn::ConnectionType; + // NOTE: we specifically avoid prolifiration of subxt types around the codebase. To that end, we // avoid returning H256 and instead return [u8; 32] directly. @@ -28,7 +30,7 @@ mod conn; /// This is a thin wrapper that can be cloned cheaply. #[derive(Clone)] pub struct Client { - connector: Arc, + connection: conn::ConnectionType, } impl Client { @@ -38,70 +40,66 @@ impl Client { /// The RPC URL must be a valid URL pointing to a sugondat node. If it's not a malformed URL, /// returns an error. #[tracing::instrument(level = Level::DEBUG)] - pub async fn new(rpc_url: String) -> anyhow::Result { + pub async fn new(rpc_url: String, no_retry: bool) -> anyhow::Result { anyhow::ensure!( url::Url::parse(&rpc_url).is_ok(), "invalid RPC URL: {}", rpc_url ); - tracing::info!("connecting to sugondat node: {}", rpc_url); - let rpc_url = Arc::new(rpc_url); - let me = Self { - connector: Arc::new(conn::Connector::new(rpc_url)), - }; - me.connector.ensure_connected().await; - Ok(me) + Ok(Self { + connection: conn::ConnectionType::new(rpc_url, no_retry).await?, + }) } /// Blocks until the sugondat node has finalized a block at the given height. Returns /// the block hash of the block at the given height. #[tracing::instrument(level = Level::DEBUG, skip(self))] pub async fn wait_finalized_height(&self, height: u64) -> [u8; 32] { - loop { - let conn = self.connector.ensure_connected().await; - match conn.finalized.wait_until_finalized(self, height).await { - Some(block_hash) => return block_hash, - None => { - // The watcher task has terminated. Reset the connection and retry. - self.connector.reset().await; - } - } - } + // TODO: add tracingg::error + self.connection + .exec(|conn| async move { + conn.finalized + .wait_until_finalized(self, height) + .await + .ok_or(anyhow::anyhow!("failed to wait for last finalized block")) + }) + .await } /// Returns the block hash of the block at the given height. /// /// If there is no block at the given height, returns `None`. #[tracing::instrument(level = Level::DEBUG, skip(self))] + // TODO: why here the return type is Result