Skip to content

Commit

Permalink
Proper signer Pub-Sub for pending requests. (openethereum#5594)
Browse files Browse the repository at this point in the history
* Signer subscription.

* Fixing RPC tests.

* Improve notification performance.
  • Loading branch information
tomusdrw authored and gavofyork committed May 17, 2017
1 parent da8be07 commit 240704f
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 127 deletions.
5 changes: 0 additions & 5 deletions js/src/api/transport/ws/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,6 @@ export default class Ws extends JsonRpcBase {
}

_onMessage = (event) => {
// Event sent by Signer Broadcaster
if (event.data === 'new_message') {
return false;
}

try {
const result = JSON.parse(event.data);
const { method, params, json, resolve, reject } = this._messages[result.id];
Expand Down
5 changes: 3 additions & 2 deletions parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl FullDependencies {
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());
},
Api::Signer => {
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
},
Api::Parity => {
let signer = match self.signer_service.is_enabled() {
Expand Down Expand Up @@ -352,6 +352,7 @@ pub struct LightDependencies {
pub dapps_port: Option<u16>,
pub fetch: FetchClient,
pub geth_compatibility: bool,
pub remote: parity_reactor::Remote,
}

impl Dependencies for LightDependencies {
Expand Down Expand Up @@ -415,7 +416,7 @@ impl Dependencies for LightDependencies {
},
Api::Signer => {
let secret_store = Some(self.secret_store.clone());
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
},
Api::Parity => {
let signer = match self.signer_service.is_enabled() {
Expand Down
1 change: 1 addition & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
},
fetch: fetch,
geth_compatibility: cmd.geth_compatibility,
remote: event_loop.remote(),
});

let dependencies = rpc::Dependencies {
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/v1/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod poll_filter;
mod requests;
mod signer;
mod signing_queue;
mod subscribers;
mod subscription_manager;

pub use self::dispatch::{Dispatcher, FullDispatcher};
Expand All @@ -47,4 +48,5 @@ pub use self::signing_queue::{
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
};
pub use self::signer::SignerService;
pub use self::subscribers::Subscribers;
pub use self::subscription_manager::GenericPollManager;
77 changes: 18 additions & 59 deletions rpc/src/v1/helpers/signing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::mem;
use std::cell::RefCell;
use std::sync::{mpsc, Arc};
use std::sync::Arc;
use std::collections::BTreeMap;
use jsonrpc_core;
use util::{Mutex, RwLock, U256, Address};
Expand All @@ -27,7 +27,6 @@ use v1::types::{ConfirmationResponse, H160 as RpcH160, Origin, DappId as RpcDapp
/// Result that can be returned from JSON RPC.
pub type RpcResult = Result<ConfirmationResponse, jsonrpc_core::Error>;


/// Type of default account
pub enum DefaultAccount {
/// Default account is known
Expand All @@ -49,7 +48,7 @@ impl From<RpcH160> for DefaultAccount {
}

/// Possible events happening in the queue that can be listened to.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum QueueEvent {
/// Receiver should stop work upon receiving `Finish` message.
Finish,
Expand All @@ -61,15 +60,6 @@ pub enum QueueEvent {
RequestConfirmed(U256),
}

/// Defines possible errors returned from queue receiving method.
#[derive(Debug, PartialEq)]
pub enum QueueError {
/// Returned when method has been already used (no receiver available).
AlreadyUsed,
/// Returned when receiver encounters an error.
ReceiverError(mpsc::RecvError),
}

/// Defines possible errors when inserting to queue
#[derive(Debug, PartialEq)]
pub enum QueueAddError {
Expand Down Expand Up @@ -184,59 +174,31 @@ impl ConfirmationPromise {


/// Queue for all unconfirmed requests.
#[derive(Default)]
pub struct ConfirmationsQueue {
id: Mutex<U256>,
queue: RwLock<BTreeMap<U256, ConfirmationToken>>,
sender: Mutex<mpsc::Sender<QueueEvent>>,
receiver: Mutex<Option<mpsc::Receiver<QueueEvent>>>,
}

impl Default for ConfirmationsQueue {
fn default() -> Self {
let (send, recv) = mpsc::channel();

ConfirmationsQueue {
id: Mutex::new(U256::from(0)),
queue: RwLock::new(BTreeMap::new()),
sender: Mutex::new(send),
receiver: Mutex::new(Some(recv)),
}
}
on_event: RwLock<Vec<Box<Fn(QueueEvent) -> () + Send + Sync>>>,
}

impl ConfirmationsQueue {

/// Blocks the thread and starts listening for notifications regarding all actions in the queue.
/// For each event, `listener` callback will be invoked.
/// This method can be used only once (only single consumer of events can exist).
pub fn start_listening<F>(&self, listener: F) -> Result<(), QueueError>
where F: Fn(QueueEvent) -> () {
let recv = self.receiver.lock().take();
if let None = recv {
return Err(QueueError::AlreadyUsed);
}
let recv = recv.expect("Check for none is done earlier.");

loop {
let message = recv.recv().map_err(|e| QueueError::ReceiverError(e))?;
if let QueueEvent::Finish = message {
return Ok(());
}

listener(message);
}
/// Adds a queue listener. For each event, `listener` callback will be invoked.
pub fn on_event<F: Fn(QueueEvent) -> () + Send + Sync + 'static>(&self, listener: F) {
self.on_event.write().push(Box::new(listener));
}

/// Notifies consumer that the communcation is over.
/// No more events will be sent after this function is invoked.
pub fn finish(&self) {
self.notify(QueueEvent::Finish);
self.on_event.write().clear();
}

/// Notifies receiver about the event happening in this queue.
fn notify(&self, message: QueueEvent) {
// We don't really care about the result
let _ = self.sender.lock().send(message);
for listener in &*self.on_event.read() {
listener(message.clone())
}
}

/// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result.
Expand Down Expand Up @@ -384,26 +346,23 @@ mod test {
#[test]
fn should_receive_notification() {
// given
let received = Arc::new(Mutex::new(None));
let received = Arc::new(Mutex::new(vec![]));
let queue = Arc::new(ConfirmationsQueue::default());
let request = request();

// when
let q = queue.clone();
let r = received.clone();
let handle = thread::spawn(move || {
q.start_listening(move |notification| {
let mut v = r.lock();
*v = Some(notification);
}).expect("Should be closed nicely.")
queue.on_event(move |notification| {
r.lock().push(notification);
});
queue.add_request(request, Default::default()).unwrap();
queue.finish();

// then
handle.join().expect("Thread should finish nicely");
let r = received.lock().take();
assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1))));
let r = received.lock();
assert_eq!(r[0], QueueEvent::NewRequest(U256::from(1)));
assert_eq!(r[1], QueueEvent::Finish);
assert_eq!(r.len(), 2);
}

#[test]
Expand Down
86 changes: 86 additions & 0 deletions rpc/src/v1/helpers/subscribers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! A map of subscribers.
use std::ops;
use std::collections::HashMap;
use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId};

#[derive(Clone, Debug)]
pub struct Subscribers<T> {
next_id: u64,
subscriptions: HashMap<u64, T>,
}

impl<T> Default for Subscribers<T> {
fn default() -> Self {
Subscribers {
next_id: 0,
subscriptions: HashMap::new(),
}
}
}

impl<T> Subscribers<T> {
fn next_id(&mut self) -> u64 {
self.next_id += 1;
self.next_id
}

/// Insert new subscription and return assigned id.
pub fn insert(&mut self, val: T) -> SubscriptionId {
let id = self.next_id();
debug!(target: "pubsub", "Adding subscription id={}", id);
self.subscriptions.insert(id, val);
SubscriptionId::Number(id)
}

/// Removes subscription with given id and returns it (if any).
pub fn remove(&mut self, id: &SubscriptionId) -> Option<T> {
trace!(target: "pubsub", "Removing subscription id={:?}", id);
match *id {
SubscriptionId::Number(id) => {
self.subscriptions.remove(&id)
},
_ => None,
}
}
}

impl <T> Subscribers<Sink<T>> {
/// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>) {
let id = self.next_id();
match sub.assign_id(SubscriptionId::Number(id)) {
Ok(sink) => {
debug!(target: "pubsub", "Adding subscription id={:?}", id);
self.subscriptions.insert(id, sink);
},
Err(_) => {
self.next_id -= 1;
},
}
}
}

impl<T> ops::Deref for Subscribers<T> {
type Target = HashMap<u64, T>;

fn deref(&self) -> &Self::Target {
&self.subscriptions
}
}
30 changes: 12 additions & 18 deletions rpc/src/v1/helpers/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
//! Generic poll manager for Pub-Sub.
use std::sync::Arc;
use std::collections::HashMap;
use util::Mutex;

use jsonrpc_core::futures::future::{self, Either};
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::futures::{Sink, Future, BoxFuture};
use jsonrpc_core::{self as core, MetaIoHandler};
use jsonrpc_pubsub::SubscriptionId;

use v1::helpers::Subscribers;
use v1::metadata::Metadata;

#[derive(Debug)]
Expand All @@ -40,52 +41,44 @@ struct Subscription {
/// TODO [ToDr] Depending on the method decide on poll interval.
/// For most of the methods it will be enough to poll on new block instead of time-interval.
pub struct GenericPollManager<S: core::Middleware<Metadata>> {
next_id: usize,
poll_subscriptions: HashMap<usize, Subscription>,
subscribers: Subscribers<Subscription>,
rpc: MetaIoHandler<Metadata, S>,
}

impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
/// Creates new poll manager
pub fn new(rpc: MetaIoHandler<Metadata, S>) -> Self {
GenericPollManager {
next_id: 1,
poll_subscriptions: Default::default(),
subscribers: Default::default(),
rpc: rpc,
}
}

/// Subscribes to update from polling given method.
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
-> (usize, mpsc::Receiver<Result<core::Value, core::Error>>)
-> (SubscriptionId, mpsc::Receiver<Result<core::Value, core::Error>>)
{
let id = self.next_id;
self.next_id += 1;

let (sink, stream) = mpsc::channel(1);

let subscription = Subscription {
metadata: metadata,
method: method,
params: params,
sink: sink,
last_result: Default::default(),
};

debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription);
self.poll_subscriptions.insert(id, subscription);
let id = self.subscribers.insert(subscription);
(id, stream)
}

pub fn unsubscribe(&mut self, id: usize) -> bool {
pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool {
debug!(target: "pubsub", "Removing subscription: {:?}", id);
self.poll_subscriptions.remove(&id).is_some()
self.subscribers.remove(id).is_some()
}

pub fn tick(&self) -> BoxFuture<(), ()> {
let mut futures = Vec::new();
// poll all subscriptions
for (id, subscription) in self.poll_subscriptions.iter() {
for (id, subscription) in self.subscribers.iter() {
let call = core::MethodCall {
jsonrpc: Some(core::Version::V2),
id: core::Id::Num(*id as u64),
Expand Down Expand Up @@ -130,6 +123,7 @@ mod tests {

use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params};
use jsonrpc_core::futures::{Future, Stream};
use jsonrpc_pubsub::SubscriptionId;
use http::tokio_core::reactor;

use super::GenericPollManager;
Expand All @@ -154,7 +148,7 @@ mod tests {
let mut el = reactor::Core::new().unwrap();
let mut poll_manager = poll_manager();
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
assert_eq!(id, 1);
assert_eq!(id, SubscriptionId::Number(1));

// then
poll_manager.tick().wait().unwrap();
Expand All @@ -169,7 +163,7 @@ mod tests {
// and no more notifications
poll_manager.tick().wait().unwrap();
// we need to unsubscribe otherwise the future will never finish.
poll_manager.unsubscribe(1);
poll_manager.unsubscribe(&id);
assert_eq!(el.run(rx.into_future()).unwrap().0, None);
}
}
Loading

0 comments on commit 240704f

Please sign in to comment.