Skip to content

Commit

Permalink
Extrinsics PubSub (paritytech#349)
Browse files Browse the repository at this point in the history
* Extrinsic subscriptions.

* Handle RPC errors better.

* Add tests for extrinsics and unignored others.

* Handle client errors.

* Fix compilation.
  • Loading branch information
tomusdrw authored and gavofyork committed Jul 17, 2018
1 parent 3dbaa54 commit c55765e
Show file tree
Hide file tree
Showing 19 changed files with 278 additions and 71 deletions.
16 changes: 9 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for D
Err("unimplemented".into())
}

fn submit_and_watch(&self, _block: BlockId, _: UncheckedExtrinsic)
-> Result<extrinsic_pool::watcher::Watcher<Hash>, Self::Error>
{
Err("unimplemented".into())
}

fn light_status(&self) -> extrinsic_pool::txpool::LightStatus {
unreachable!()
}
Expand Down Expand Up @@ -169,7 +175,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let _rpc_servers = {
let handler = || {
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool));
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
info!("Starting collator");
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
// maybe link to libpolkadot to get a light-client.
service::Roles::LIGHT
} else if matches.is_present("light") {
info!("Starting (light)");
Expand Down Expand Up @@ -478,9 +478,9 @@ fn run_until_exit<C, W>(
let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;

let handler = || {
let client = (&service as &substrate_service::Service<C>).client();
let client = substrate_service::Service::client(&service);
let chain = rpc::apis::chain::Chain::new(client.clone(), executor.clone());
let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool());
let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool(), executor.clone());
rpc::rpc_handler::<service::ComponentBlock<C>, _, _, _, _>(
client,
chain,
Expand Down
25 changes: 21 additions & 4 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ use std::{
};

use codec::{Decode, Encode};
use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}};
use extrinsic_pool::api::{ExtrinsicPool, EventStream};
use extrinsic_pool::{
api::{ExtrinsicPool, EventStream},
txpool::{self, Readiness, scoring::{Change, Choice}},
watcher::Watcher,
Pool,
Listener,
};
use polkadot_api::PolkadotApi;
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
use runtime::{Address, UncheckedExtrinsic};
Expand Down Expand Up @@ -385,15 +390,15 @@ impl<A> Deref for TransactionPool<A> {
}
}

// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
// even when runtime is out of date.
impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for TransactionPool<A> where
A: Send + Sync + 'static,
A: PolkadotApi,
{
type Error = Error;

fn submit(&self, block: BlockId, xts: Vec<FutureProofUncheckedExtrinsic>) -> Result<Vec<Hash>> {
// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
// even when runtime is out of date.
xts.into_iter()
.map(|xt| xt.encode())
.map(|encoded| {
Expand All @@ -404,6 +409,18 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact
.collect()
}

fn submit_and_watch(&self, block: BlockId, xt: FutureProofUncheckedExtrinsic) -> Result<Watcher<Hash>> {
let encoded = xt.encode();
let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?;

let verifier = Verifier {
api: &*self.api,
at_block: block,
};

self.inner.submit_and_watch(verifier, decoded)
}

fn light_status(&self) -> LightStatus {
self.inner.light_status()
}
Expand Down
2 changes: 2 additions & 0 deletions substrate/extrinsic-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]

[dependencies]
serde = "1.0"
serde_derive = "1.0"
error-chain = "0.12"
futures = "0.1"
log = "0.3"
Expand Down
5 changes: 5 additions & 0 deletions substrate/extrinsic-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
use txpool;
use futures::sync::mpsc;

use watcher::Watcher;

/// Extrinsic pool error.
pub trait Error: ::std::error::Error + Send + Sized {
/// Try to extract original `txpool::Error`
Expand All @@ -44,6 +46,9 @@ pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {
/// Submit a collection of extrinsics to the pool.
fn submit(&self, block: BlockId, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;

/// Submit an extrinsic to the pool and start watching it's progress.
fn submit_and_watch(&self, block: BlockId, xt: Ex) -> Result<Watcher<Hash>, Self::Error>;

/// Returns light status of the pool.
fn light_status(&self) -> txpool::LightStatus;

Expand Down
6 changes: 4 additions & 2 deletions substrate/extrinsic-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
extern crate futures;
extern crate parking_lot;
extern crate serde;

#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;

pub extern crate transaction_pool as txpool;

pub mod api;
pub mod watcher;

mod listener;
mod pool;
mod watcher;

pub use self::listener::Listener;
pub use self::pool::Pool;
pub use self::watcher::Watcher;
24 changes: 21 additions & 3 deletions substrate/extrinsic-pool/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::sync::mpsc;
//! Extrinsics status updates.
use futures::{
Stream,
sync::mpsc,
};

/// Possible extrinsic status events
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H> {
/// Extrinsic has been finalised in block with given hash.
Finalised(H),
Expand All @@ -37,8 +43,19 @@ pub struct Watcher<H> {
receiver: mpsc::UnboundedReceiver<Status<H>>,
}

impl<H> Watcher<H> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
}
}

/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug, Default)]
pub(crate) struct Sender<H> {
pub struct Sender<H> {
receivers: Vec<mpsc::UnboundedSender<Status<H>>>,
finalised: bool,
}
Expand Down Expand Up @@ -74,6 +91,7 @@ impl<H: Clone> Sender<H> {
self.send(Status::Broadcast(peers))
}


/// Returns true if the are no more listeners for this extrinsic or it was finalised.
pub fn is_done(&self) -> bool {
self.finalised || self.receivers.is_empty()
Expand Down
2 changes: 1 addition & 1 deletion substrate/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn rpc_handler<Block: BlockT, S, C, A, Y>(
Block: 'static,
S: apis::state::StateApi<Block::Hash>,
C: apis::chain::ChainApi<Block::Hash, Block::Header, Metadata=Metadata>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic, Metadata=Metadata>,
Y: apis::system::SystemApi,
{
let mut io = pubsub::PubSubHandler::default();
Expand Down
32 changes: 22 additions & 10 deletions substrate/rpc/src/author/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@

//! Authoring RPC module errors.
use client;
use extrinsic_pool::txpool;
use rpc;

use errors;


error_chain! {
links {
Pool(txpool::Error, txpool::ErrorKind) #[doc = "Pool error"];
Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"];
}
errors {
/// Incorrect transaction format.
BadFormat {
description("bad format"),
display("Invalid transaction format"),
}
/// Not implemented yet
Unimplemented {
description("not yet implemented"),
display("Method Not Implemented"),
}
/// Incorrect extrinsic format.
BadFormat {
description("bad format"),
display("Invalid extrinsic format"),
}
/// Verification error
Verification(e: Box<::std::error::Error + Send>) {
description("extrinsic verification error"),
Expand All @@ -42,16 +47,23 @@ error_chain! {
}
}

const ERROR: i64 = 1000;

impl From<Error> for rpc::Error {
fn from(e: Error) -> Self {
match e {
Error(ErrorKind::Unimplemented, _) => rpc::Error {
code: rpc::ErrorCode::ServerError(-1),
message: "Not implemented yet".into(),
Error(ErrorKind::Unimplemented, _) => errors::unimplemented(),
Error(ErrorKind::BadFormat, _) => rpc::Error {
code: rpc::ErrorCode::ServerError(ERROR + 1),
message: "Extrinsic has invalid format.".into(),
data: None,
},
// TODO [ToDr] Unwrap Pool errors.
_ => rpc::Error::internal_error(),
Error(ErrorKind::Verification(e), _) => rpc::Error {
code: rpc::ErrorCode::ServerError(ERROR + 2),
message: e.description().into(),
data: Some(format!("{:?}", e).into()),
},
e => errors::internal(e),
}
}
}
Loading

0 comments on commit c55765e

Please sign in to comment.