Skip to content

Commit

Permalink
grandpa-rpc don't share subscription manager, only executor (parityte…
Browse files Browse the repository at this point in the history
…ch#7039)

* service builder: fix todo about jsonrpc Option workaround

* grandpa-rpc: only share executor instead of sub manager

* grandpa-rpc: fix compilation

* grandpa-rpc: rename to subscription_executor

* node/cli: remove another unused jsonrpc dependency

* grandpa: apply style fixes from code review

Co-authored-by: André Silva <[email protected]>
  • Loading branch information
octol and andresilva authored Sep 14, 2020
1 parent 0326939 commit 55d55f5
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 35 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ codec = { package = "parity-scale-codec", version = "1.3.4" }
serde = { version = "1.0.102", features = ["derive"] }
futures = { version = "0.3.1", features = ["compat"] }
hex-literal = "0.3.1"
jsonrpc-core = "14.2.0"
jsonrpc-pubsub = "14.2.0"
log = "0.4.8"
rand = "0.7.2"
structopt = { version = "0.3.8", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
(
impl Fn(
node_rpc::DenyUnsafe,
jsonrpc_pubsub::manager::SubscriptionManager
sc_rpc::SubscriptionTaskExecutor,
) -> node_rpc::IoHandler,
(
sc_consensus_babe::BabeBlockImport<Block, FullClient, FullGrandpaBlockImport>,
Expand Down Expand Up @@ -119,7 +119,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
let select_chain = select_chain.clone();
let keystore = keystore.clone();

let rpc_extensions_builder = move |deny_unsafe, subscriptions| {
let rpc_extensions_builder = move |deny_unsafe, subscription_executor| {
let deps = node_rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
Expand All @@ -134,7 +134,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen
shared_voter_state: shared_voter_state.clone(),
shared_authority_set: shared_authority_set.clone(),
justification_stream: justification_stream.clone(),
subscriptions,
subscription_executor,
},
};

Expand Down
1 change: 0 additions & 1 deletion bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpc-core = "14.2.0"
jsonrpc-pubsub = "14.2.0"
node-primitives = { version = "2.0.0-rc6", path = "../primitives" }
node-runtime = { version = "2.0.0-rc6", path = "../runtime" }
pallet-contracts-rpc = { version = "0.8.0-rc6", path = "../../../frame/contracts/rpc/" }
Expand Down
10 changes: 5 additions & 5 deletions bin/node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

use std::sync::Arc;

use jsonrpc_pubsub::manager::SubscriptionManager;
use node_primitives::{Block, BlockNumber, AccountId, Index, Balance, Hash};
use sc_consensus_babe::{Config, Epoch};
use sc_consensus_babe_rpc::BabeRpcHandler;
Expand All @@ -46,6 +45,7 @@ use sp_block_builder::BlockBuilder;
use sp_blockchain::{Error as BlockChainError, HeaderMetadata, HeaderBackend};
use sp_consensus::SelectChain;
use sp_consensus_babe::BabeApi;
use sc_rpc::SubscriptionTaskExecutor;
use sp_transaction_pool::TransactionPool;

/// Light client extra dependencies.
Expand Down Expand Up @@ -78,8 +78,8 @@ pub struct GrandpaDeps {
pub shared_authority_set: SharedAuthoritySet<Hash, BlockNumber>,
/// Receives notifications about justification events from Grandpa.
pub justification_stream: GrandpaJustificationStream<Block>,
/// Subscription manager to keep track of pubsub subscribers.
pub subscriptions: SubscriptionManager,
/// Executor to drive the subscription manager in the Grandpa RPC handler.
pub subscription_executor: SubscriptionTaskExecutor,
}

/// Full client dependencies.
Expand Down Expand Up @@ -139,7 +139,7 @@ pub fn create_full<C, P, SC>(
shared_voter_state,
shared_authority_set,
justification_stream,
subscriptions,
subscription_executor,
} = grandpa;

io.extend_with(
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn create_full<C, P, SC>(
shared_authority_set,
shared_voter_state,
justification_stream,
subscriptions,
subscription_executor,
)
)
);
Expand Down
15 changes: 10 additions & 5 deletions client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! RPC API for GRANDPA.
#![warn(missing_docs)]

use std::sync::Arc;
use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt};
use log::warn;
use jsonrpc_derive::rpc;
Expand All @@ -27,6 +28,7 @@ use jsonrpc_core::futures::{
sink::Sink as Sink01,
stream::Stream as Stream01,
future::Future as Future01,
future::Executor as Executor01,
};

mod error;
Expand Down Expand Up @@ -92,12 +94,16 @@ pub struct GrandpaRpcHandler<AuthoritySet, VoterState, Block: BlockT> {

impl<AuthoritySet, VoterState, Block: BlockT> GrandpaRpcHandler<AuthoritySet, VoterState, Block> {
/// Creates a new GrandpaRpcHandler instance.
pub fn new(
pub fn new<E>(
authority_set: AuthoritySet,
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
manager: SubscriptionManager,
) -> Self {
executor: E,
) -> Self
where
E: Executor01<Box<dyn Future01<Item = (), Error = ()> + Send>> + Send + Sync + 'static,
{
let manager = SubscriptionManager::new(Arc::new(executor));
Self {
authority_set,
voter_state,
Expand Down Expand Up @@ -232,13 +238,12 @@ mod tests {
VoterState: ReportVoterState + Send + Sync + 'static,
{
let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
let manager = SubscriptionManager::new(Arc::new(sc_rpc::testing::TaskExecutor));

let handler = GrandpaRpcHandler::new(
TestAuthoritySet,
voter_state,
justification_stream,
manager,
sc_rpc::testing::TaskExecutor,
);

let mut io = jsonrpc_core::MetaIoHandler::default();
Expand Down
40 changes: 24 additions & 16 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use sp_runtime::traits::{
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::MaintainedTransactionPool;
Expand All @@ -73,17 +73,25 @@ pub trait RpcExtensionBuilder {

/// Returns an instance of the RPC extension for a particular `DenyUnsafe`
/// value, e.g. the RPC extension might not expose some unsafe methods.
fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output;
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output;
}

impl<F, R> RpcExtensionBuilder for F where
F: Fn(sc_rpc::DenyUnsafe, SubscriptionManager) -> R,
F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> R,
R: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{
type Output = R;

fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output {
(*self)(deny, subscriptions)
fn build(
&self,
deny: sc_rpc::DenyUnsafe,
subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
(*self)(deny, subscription_executor)
}
}

Expand All @@ -97,7 +105,11 @@ impl<R> RpcExtensionBuilder for NoopRpcExtensionBuilder<R> where
{
type Output = R;

fn build(&self, _deny: sc_rpc::DenyUnsafe, _subscriptions: SubscriptionManager) -> Self::Output {
fn build(
&self,
_deny: sc_rpc::DenyUnsafe,
_subscription_executor: sc_rpc::SubscriptionTaskExecutor,
) -> Self::Output {
self.0.clone()
}
}
Expand Down Expand Up @@ -694,7 +706,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
};

let task_executor = sc_rpc::SubscriptionTaskExecutor::new(spawn_handle);
let subscriptions = SubscriptionManager::new(Arc::new(task_executor));
let subscriptions = SubscriptionManager::new(Arc::new(task_executor.clone()));

let (chain, state, child_state) = if let (Some(remote_blockchain), Some(on_demand)) =
(remote_blockchain, on_demand) {
Expand Down Expand Up @@ -723,20 +735,16 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
let author = sc_rpc::author::Author::new(
client,
transaction_pool,
subscriptions.clone(),
subscriptions,
keystore,
deny_unsafe,
);
let system = system::System::new(system_info, system_rpc_tx, deny_unsafe);

let maybe_offchain_rpc = offchain_storage
.map(|storage| {
let maybe_offchain_rpc = offchain_storage.map(|storage| {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe);
// FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1
// https://github.com/paritytech/jsonrpc/commit/20485387ed06a48f1a70bf4d609a7cde6cf0accf
let delegate = offchain::OffchainApi::to_delegate(offchain);
delegate.into_iter().collect::<HashMap<_, _>>()
}).unwrap_or_default();
offchain::OffchainApi::to_delegate(offchain)
});

sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
Expand All @@ -745,7 +753,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, subscriptions),
rpc_extensions_builder.build(deny_unsafe, task_executor),
))
}

Expand Down

0 comments on commit 55d55f5

Please sign in to comment.