Skip to content

Commit

Permalink
import a local candidate when it is available
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Jan 13, 2018
1 parent 17f489f commit 4159bea
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
12 changes: 2 additions & 10 deletions candidate-agreement/src/handle_incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::prelude::*;
use futures::stream::{Fuse, FuturesUnordered};
use futures::sync::mpsc;

use table::{self, Statement, SignedStatement, Context as TableContext};
use table::{self, Statement, Context as TableContext};

use super::{Context, CheckedMessage, SharedTable, TypeResolve};

Expand Down Expand Up @@ -94,16 +94,8 @@ impl<C: Context, I> HandleIncoming<C, I> {
CheckResult::Unavailable => return, // no such statement and not provable.
};

let signature = self.table.context().sign_table_statement(&statement);

let statement = SignedStatement {
statement,
signature,
sender: self.local_id.clone(),
};

// TODO: trigger broadcast to peers immediately?
self.table.import_statement(statement, None);
self.table.sign_and_import(statement);
}

fn import_message(&mut self, origin: C::ValidatorId, message: CheckedMessage<C>) {
Expand Down
34 changes: 30 additions & 4 deletions candidate-agreement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,20 @@ impl<C: Context> SharedTable<C> {
self.inner.lock().import_statement(&*self.context, statement, received_from)
}

/// Sign and import a local statement.
pub fn sign_and_import(
&self,
statement: table::Statement<C::ParachainCandidate, C::Digest>,
) -> Option<table::Summary<C::Digest, C::GroupId>> {
let signed_statement = table::SignedStatement {
signature: self.context.sign_table_statement(&statement),
sender: self.context.local_id(),
statement,
};

self.import_statement(signed_statement, None)
}

/// Import many statements at once.
///
/// Provide an iterator yielding pairs of (statement, received_from).
Expand Down Expand Up @@ -486,13 +500,15 @@ pub fn agree<
NetOut,
Recovery,
PropagateStatements,
LocalCandidate,
Err,
>(
params: AgreementParams<Context>,
net_in: NetIn,
net_out: NetOut,
recovery: Recovery,
propagate_statements: PropagateStatements,
local_candidate: LocalCandidate,
)
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
where
Expand All @@ -503,6 +519,7 @@ pub fn agree<
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
Recovery: MessageRecovery<Context> + 'static,
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
LocalCandidate: Future<Item=Context::ParachainCandidate> + 'static
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded();
Expand Down Expand Up @@ -537,7 +554,6 @@ pub fn agree<
).map_err(|_| Error::IoTerminated)
};


let route_messages_out = {
let table = params.table.clone();
let periodic_table_statements = propagate_statements
Expand All @@ -554,6 +570,15 @@ pub fn agree<
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
};

let import_local_candidate = {
let table = params.table.clone();
local_candidate
.map(table::Statement::Candidate)
.map(Some)
.or_else(|_| Ok(None))
.map(move |s| if let Some(s) = s { table.sign_and_import(s); })
};

let create_proposal_on_interval = {
let table = params.table;
params.timer.interval(params.form_proposal_interval)
Expand All @@ -562,11 +587,12 @@ pub fn agree<
};

// TODO: avoid having errors take down everything.
let future = agreement.join4(
let future = agreement.join5(
route_messages_in,
route_messages_out,
create_proposal_on_interval
).map(|(agreed, _, _, _)| agreed);
create_proposal_on_interval,
import_local_candidate,
).map(|(agreed, _, _, _, _)| agreed);

Box::new(future)
}

0 comments on commit 4159bea

Please sign in to comment.