Skip to content

Commit

Permalink
try-runtime::follow-chain - execute all blocks (#12048)
Browse files Browse the repository at this point in the history
* extract subscription

* FinalizedHeaders

* Fool of a Took

* testability

* tests

* review comments

* clippy
  • Loading branch information
pmikolajczyk41 authored Sep 5, 2022
1 parent 78bff0e commit 1a15071
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions utils/frame/try-runtime/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ sp-runtime = { version = "6.0.0", path = "../../../../primitives/runtime" }
sp-state-machine = { version = "0.12.0", path = "../../../../primitives/state-machine" }
sp-version = { version = "5.0.0", path = "../../../../primitives/version" }
frame-try-runtime = { path = "../../../../frame/try-runtime" }

[dev-dependencies]
tokio = "1.17.0"
264 changes: 235 additions & 29 deletions utils/frame/try-runtime/cli/src/commands/follow_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ use crate::{
state_machine_call_with_proof, SharedParams, LOG_TARGET,
};
use jsonrpsee::{
core::client::{Subscription, SubscriptionClientT},
core::{
async_trait,
client::{Client, Subscription, SubscriptionClientT},
},
ws_client::WsClientBuilder,
};
use parity_scale_codec::{Decode, Encode};
use remote_externalities::{rpc_api, Builder, Mode, OnlineConfig};
use sc_executor::NativeExecutionDispatch;
use sc_service::Configuration;
use serde::de::DeserializeOwned;
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use std::{fmt::Debug, str::FromStr};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, str::FromStr};

const SUB: &str = "chain_subscribeFinalizedHeads";
const UN_SUB: &str = "chain_unsubscribeFinalizedHeads";
Expand Down Expand Up @@ -58,51 +62,182 @@ pub struct FollowChainCmd {
try_state: frame_try_runtime::TryStateSelect,
}

/// Start listening for with `SUB` at `url`.
///
/// Returns a pair `(client, subscription)` - `subscription` alone will be useless, because it
/// relies on the related alive `client`.
async fn start_subscribing<Header: DeserializeOwned>(
url: &str,
) -> sc_cli::Result<(Client, Subscription<Header>)> {
let client = WsClientBuilder::default()
.connection_timeout(std::time::Duration::new(20, 0))
.max_notifs_per_subscription(1024)
.max_request_body_size(u32::MAX)
.build(url)
.await
.map_err(|e| sc_cli::Error::Application(e.into()))?;

log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB);

let sub = client
.subscribe(SUB, None, UN_SUB)
.await
.map_err(|e| sc_cli::Error::Application(e.into()))?;
Ok((client, sub))
}

/// Abstraction over RPC calling for headers.
#[async_trait]
trait HeaderProvider<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Awaits for the header of the block with hash `hash`.
async fn get_header(&mut self, hash: Block::Hash) -> Block::Header;
}

struct RpcHeaderProvider<Block: BlockT> {
uri: String,
_phantom: PhantomData<Block>,
}

#[async_trait]
impl<Block: BlockT> HeaderProvider<Block> for RpcHeaderProvider<Block>
where
Block::Header: DeserializeOwned,
{
async fn get_header(&mut self, hash: Block::Hash) -> Block::Header {
rpc_api::get_header::<Block, _>(&self.uri, hash).await.unwrap()
}
}

/// Abstraction over RPC subscription for finalized headers.
#[async_trait]
trait HeaderSubscription<Block: BlockT>
where
Block::Header: HeaderT,
{
/// Await for the next finalized header from the subscription.
///
/// Returns `None` if either the subscription has been closed or there was an error when reading
/// an object from the client.
async fn next_header(&mut self) -> Option<Block::Header>;
}

#[async_trait]
impl<Block: BlockT> HeaderSubscription<Block> for Subscription<Block::Header>
where
Block::Header: DeserializeOwned,
{
async fn next_header(&mut self) -> Option<Block::Header> {
match self.next().await {
Some(Ok(header)) => Some(header),
None => {
log::warn!("subscription closed");
None
},
Some(Err(why)) => {
log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
None
},
}
}
}

/// Stream of all finalized headers.
///
/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of
/// them lack justification).
struct FinalizedHeaders<Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>> {
header_provider: HP,
subscription: HS,
fetched_headers: VecDeque<Block::Header>,
last_returned: Option<<Block::Header as HeaderT>::Hash>,
}

impl<Block: BlockT, HP: HeaderProvider<Block>, HS: HeaderSubscription<Block>>
FinalizedHeaders<Block, HP, HS>
where
<Block as BlockT>::Header: DeserializeOwned,
{
pub fn new(header_provider: HP, subscription: HS) -> Self {
Self {
header_provider,
subscription,
fetched_headers: VecDeque::new(),
last_returned: None,
}
}

/// Reads next finalized header from the subscription. If some headers (without justification)
/// have been skipped, fetches them as well. Returns number of headers that have been fetched.
///
/// All fetched headers are stored in `self.fetched_headers`.
async fn fetch(&mut self) -> usize {
let last_finalized = match self.subscription.next_header().await {
Some(header) => header,
None => return 0,
};

self.fetched_headers.push_front(last_finalized.clone());

let mut last_finalized_parent = *last_finalized.parent_hash();
let last_returned = self.last_returned.unwrap_or(last_finalized_parent);

while last_finalized_parent != last_returned {
let parent_header = self.header_provider.get_header(last_finalized_parent).await;
self.fetched_headers.push_front(parent_header.clone());
last_finalized_parent = *parent_header.parent_hash();
}

self.fetched_headers.len()
}

/// Get the next finalized header.
pub async fn next(&mut self) -> Option<Block::Header> {
if self.fetched_headers.is_empty() {
self.fetch().await;
}

if let Some(header) = self.fetched_headers.pop_front() {
self.last_returned = Some(header.hash());
Some(header)
} else {
None
}
}
}

pub(crate) async fn follow_chain<Block, ExecDispatch>(
shared: SharedParams,
command: FollowChainCmd,
config: Configuration,
) -> sc_cli::Result<()>
where
Block: BlockT<Hash = H256> + serde::de::DeserializeOwned,
Block: BlockT<Hash = H256> + DeserializeOwned,
Block::Hash: FromStr,
Block::Header: serde::de::DeserializeOwned,
Block::Header: DeserializeOwned,
<Block::Hash as FromStr>::Err: Debug,
NumberFor<Block>: FromStr,
<NumberFor<Block> as FromStr>::Err: Debug,
ExecDispatch: NativeExecutionDispatch + 'static,
{
let mut maybe_state_ext = None;

let client = WsClientBuilder::default()
.connection_timeout(std::time::Duration::new(20, 0))
.max_notifs_per_subscription(1024)
.max_request_body_size(u32::MAX)
.build(&command.uri)
.await
.unwrap();

log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB);
let mut subscription: Subscription<Block::Header> =
client.subscribe(SUB, None, UN_SUB).await.unwrap();
let (_client, subscription) = start_subscribing::<Block::Header>(&command.uri).await?;

let (code_key, code) = extract_code(&config.chain_spec)?;
let executor = build_executor::<ExecDispatch>(&shared, &config);
let execution = shared.execution;

loop {
let header = match subscription.next().await {
Some(Ok(header)) => header,
None => {
log::warn!("subscription closed");
break
},
Some(Err(why)) => {
log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why);
continue
},
};
let header_provider: RpcHeaderProvider<Block> =
RpcHeaderProvider { uri: command.uri.clone(), _phantom: PhantomData {} };
let mut finalized_headers: FinalizedHeaders<
Block,
RpcHeaderProvider<Block>,
Subscription<Block::Header>,
> = FinalizedHeaders::new(header_provider, subscription);

while let Some(header) = finalized_headers.next().await {
let hash = header.hash();
let number = header.number();

Expand Down Expand Up @@ -193,3 +328,74 @@ where
log::error!(target: LOG_TARGET, "ws subscription must have terminated.");
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header};

type Block = TBlock<ExtrinsicWrapper<()>>;
type BlockNumber = u64;
type Hash = H256;

struct MockHeaderProvider(pub VecDeque<BlockNumber>);

fn headers() -> Vec<Header> {
let mut headers = vec![Header::new_from_number(0)];
for n in 1..11 {
headers.push(Header {
parent_hash: headers.last().unwrap().hash(),
..Header::new_from_number(n)
})
}
headers
}

#[async_trait]
impl HeaderProvider<Block> for MockHeaderProvider {
async fn get_header(&mut self, _hash: Hash) -> Header {
let height = self.0.pop_front().unwrap();
headers()[height as usize].clone()
}
}

struct MockHeaderSubscription(pub VecDeque<BlockNumber>);

#[async_trait]
impl HeaderSubscription<Block> for MockHeaderSubscription {
async fn next_header(&mut self) -> Option<Header> {
self.0.pop_front().map(|h| headers()[h as usize].clone())
}
}

#[tokio::test]
async fn finalized_headers_works_when_every_block_comes_from_subscription() {
let heights = vec![4, 5, 6, 7];

let provider = MockHeaderProvider(vec![].into());
let subscription = MockHeaderSubscription(heights.clone().into());
let mut headers = FinalizedHeaders::new(provider, subscription);

for h in heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}

#[tokio::test]
async fn finalized_headers_come_from_subscription_and_provider_if_in_need() {
let all_heights = 3..11;
let heights_in_subscription = vec![3, 4, 6, 10];
// Consecutive headers will be requested in the reversed order.
let heights_not_in_subscription = vec![5, 9, 8, 7];

let provider = MockHeaderProvider(heights_not_in_subscription.into());
let subscription = MockHeaderSubscription(heights_in_subscription.into());
let mut headers = FinalizedHeaders::new(provider, subscription);

for h in all_heights {
assert_eq!(h, headers.next().await.unwrap().number);
}
assert_eq!(None, headers.next().await);
}
}

0 comments on commit 1a15071

Please sign in to comment.