Skip to content

Commit

Permalink
fill batch statements from table
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Jan 12, 2018
1 parent e6c0d09 commit e2dd5a4
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 15 deletions.
82 changes: 67 additions & 15 deletions candidate-agreement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ mod handle_incoming;
mod round_robin;
mod table;

#[cfg(test)]
pub mod tests;

/// Context necessary for agreement.
pub trait Context: Send + Clone {
/// A validator ID
Expand All @@ -78,6 +81,12 @@ pub trait Context: Send + Clone {
/// data is checked.
type CheckAvailability: IntoFuture<Item=bool>;

/// The statement batch type.
type StatementBatch: StatementBatch<
Self::ValidatorId,
table::SignedStatement<Self::ParachainCandidate, Self::Digest, Self::ValidatorId, Self::Signature>,
>;

/// Get the digest of a candidate.
fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;

Expand Down Expand Up @@ -128,12 +137,14 @@ pub trait Context: Send + Clone {
pub trait TypeResolve {
type SignedTableStatement;
type BftCommunication;
type BftCommitted;
type Misbehavior;
}

impl<C: Context> TypeResolve for C {
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::ValidatorId, C::Signature>;
type BftCommitted = bft::Committed<C::Proposal,C::Digest,C::Signature>;
type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
}

Expand Down Expand Up @@ -318,6 +329,11 @@ impl<C: Context> SharedTable<C> {
self.inner.lock().table.get_misbehavior().clone()
}

/// Fill a statement batch.
pub fn fill_batch(&self, batch: &mut C::StatementBatch) {
self.inner.lock().table.fill_batch(batch);
}

// Get a handle to the table context.
fn context(&self) -> &TableContext<C> {
&*self.context
Expand Down Expand Up @@ -425,8 +441,6 @@ pub struct AgreementParams<C: Context> {
pub message_buffer_size: usize,
/// Interval to attempt forming proposals over.
pub form_proposal_interval: Duration,
/// Interval to create table statement packets over.
pub table_broadcast_interval: Duration,
}

/// Recovery for messages
Expand All @@ -435,6 +449,19 @@ pub trait MessageRecovery<C: Context> {
fn check_message(&self, UncheckedMessage) -> Option<CheckedMessage<C>>;
}

/// A batch of statements to send out.
pub trait StatementBatch<V, T> {
/// Get the target authorities of these statements.
fn targets(&self) -> &[V];

/// Push a statement onto the batch. Returns false when the batch is full.
///
/// This is meant to do work like incrementally serializing the statements
/// into a vector of bytes while making sure the length is below a certain
/// amount.
fn push(&mut self, statement: T) -> bool;
}

/// Recovered and fully checked messages.
pub enum CheckedMessage<C: Context> {
/// Messages meant for the BFT agreement logic.
Expand All @@ -443,19 +470,42 @@ pub enum CheckedMessage<C: Context> {
Table(Vec<<C as TypeResolve>::SignedTableStatement>),
}

/// Outgoing messages to the network.
pub enum OutgoingMessage<C: Context> {
/// Messages meant for BFT agreement peers.
Bft(<C as TypeResolve>::BftCommunication),
/// Batches of table statements.
Table(C::StatementBatch),
}

/// Create an agreement future, and I/O streams.
pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, recovery: R)
-> Box<Future<Item=bft::Committed<C::Proposal,C::Digest,C::Signature>,Error=Error>>
// TODO: kill 'static bounds and use impl Future.
pub fn agree<
Context,
NetIn,
NetOut,
Recovery,
PropagateStatements,
Err,
>(
params: AgreementParams<Context>,
net_in: NetIn,
net_out: NetOut,
recovery: Recovery,
propagate_statements: PropagateStatements,
)
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
where
C: Context + 'static,
C::CheckCandidate: IntoFuture<Error=E>,
C::CheckAvailability: IntoFuture<Error=E>,
I: Stream<Item=(C::ValidatorId, Vec<UncheckedMessage>),Error=E> + 'static,
O: Sink<SinkItem=CheckedMessage<C>> + 'static,
R: MessageRecovery<C> + 'static,
Context: ::Context + 'static,
Context::CheckCandidate: IntoFuture<Error=Err>,
Context::CheckAvailability: IntoFuture<Error=Err>,
NetIn: Stream<Item=(Context::ValidatorId, Vec<UncheckedMessage>),Error=Err> + 'static,
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
Recovery: MessageRecovery<Context> + 'static,
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
{
let (bft_in_in, bft_in_out) = mpsc::unbounded();
let (bft_out_in, bft_out_out) = mpsc::unbounded::<bft::ContextCommunication<BftContext<C>>>();
let (bft_out_in, bft_out_out) = mpsc::unbounded();

let agreement = {
let bft_context = BftContext {
Expand Down Expand Up @@ -489,14 +539,16 @@ pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, r


let route_messages_out = {
let periodic_table_statements = params.timer.interval(params.table_broadcast_interval)
.map_err(|_| Error::FaultyTimer)
.map(|()| unimplemented!()); // create table statements to send. but to _who_ and how many?
let table = params.table.clone();
let periodic_table_statements = propagate_statements
.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
.map(move |mut batch| { table.fill_batch(&mut batch); batch })
.map(OutgoingMessage::Table);

let complete_out_stream = bft_out_out
.map_err(|_| Error::IoTerminated)
.map(|bft::ContextCommunication(x)| x)
.map(CheckedMessage::Bft)
.map(OutgoingMessage::Bft)
.select(periodic_table_statements);

net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
Expand Down
125 changes: 125 additions & 0 deletions candidate-agreement/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use std::collections::hash_map::{HashMap, Entry};
use std::hash::Hash;
use std::fmt::Debug;

use super::StatementBatch;

/// Context for the statement table.
pub trait Context {
/// A validator ID
Expand Down Expand Up @@ -238,6 +240,15 @@ struct ValidatorData<C: Context> {
known_statements: HashSet<StatementTrace<C::ValidatorId, C::Digest>>,
}

impl<C: Context> Default for ValidatorData<C> {
fn default() -> Self {
ValidatorData {
proposal: None,
known_statements: HashSet::default(),
}
}
}

/// Stores votes
pub struct Table<C: Context> {
validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
Expand Down Expand Up @@ -369,6 +380,120 @@ impl<C: Context> Table<C> {
&self.detected_misbehavior
}

/// Fill a statement batch and note messages seen by the targets.
pub fn fill_batch<B>(&mut self, batch: &mut B)
where B: StatementBatch<
C::ValidatorId,
SignedStatement<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
>
{
// naively iterate all statements so far, taking any that
// at least one of the targets has not seen.

// workaround for the fact that it's inconvenient to borrow multiple
// entries out of a hashmap mutably -- we just move them out and
// replace them when we're done.
struct SwappedTargetData<'a, C: 'a + Context> {
validator_data: &'a mut HashMap<C::ValidatorId, ValidatorData<C>>,
target_data: Vec<(C::ValidatorId, ValidatorData<C>)>,
}

impl<'a, C: 'a + Context> Drop for SwappedTargetData<'a, C> {
fn drop(&mut self) {
for (id, data) in self.target_data.drain(..) {
self.validator_data.insert(id, data);
}
}
}

// pre-fetch authority data for all the targets.
let mut target_data = {
let validator_data = &mut self.validator_data;
let mut target_data = Vec::with_capacity(batch.targets().len());
for target in batch.targets() {
let active_data = match validator_data.get_mut(target) {
None => Default::default(),
Some(x) => ::std::mem::replace(x, Default::default()),
};

target_data.push((target.clone(), active_data));
}

SwappedTargetData {
validator_data,
target_data
}
};

let target_data = &mut target_data.target_data;

macro_rules! attempt_send {
($trace:expr, sender=$sender:expr, sig=$sig:expr, statement=$statement:expr) => {{
let trace = $trace;
let can_send = target_data.iter()
.any(|t| t.1.known_statements.contains(&trace));

if can_send {
let statement = SignedStatement {
statement: $statement,
signature: $sig,
sender: $sender,
};

if batch.push(statement) {
for target in target_data.iter_mut() {
target.1.known_statements.insert(trace.clone());
}
} else {
return;
}
}
}}
}

// reconstruct statements for anything whose trace passes the filter.
for (digest, candidate) in self.candidate_votes.iter() {
for (sender, vote) in candidate.validity_votes.iter() {
match *vote {
ValidityVote::Issued(ref sig) => {
attempt_send!(
StatementTrace::Candidate(sender.clone()),
sender = sender.clone(),
sig = sig.clone(),
statement = Statement::Candidate(candidate.candidate.clone())
)
}
ValidityVote::Valid(ref sig) => {
attempt_send!(
StatementTrace::Valid(sender.clone(), digest.clone()),
sender = sender.clone(),
sig = sig.clone(),
statement = Statement::Valid(digest.clone())
)
}
ValidityVote::Invalid(ref sig) => {
attempt_send!(
StatementTrace::Invalid(sender.clone(), digest.clone()),
sender = sender.clone(),
sig = sig.clone(),
statement = Statement::Invalid(digest.clone())
)
}
}
};


for (sender, sig) in candidate.availability_votes.iter() {
attempt_send!(
StatementTrace::Available(sender.clone(), digest.clone()),
sender = sender.clone(),
sig = sig.clone(),
statement = Statement::Available(digest.clone())
)
}
}
}

fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
proposal: None,
Expand Down
22 changes: 22 additions & 0 deletions candidate-agreement/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Tests and test helpers for the candidate agreement.
const VALIDITY_CHECK_DELAY_MS: isize = 400;
const AVAILABILITY_CHECK_DELAY_MS: isize = 200;

use super::*;

0 comments on commit e2dd5a4

Please sign in to comment.