Skip to content

Commit

Permalink
[backport] Lower bft timeout (paritytech#484)
Browse files Browse the repository at this point in the history
* v0.2.3

* Lower bft timeout

* force BFT delay in consensus service, not in proposer logic (paritytech#477)

* move forced delay to consensus service

* fiddle with logging
  • Loading branch information
arkpar authored Aug 2, 2018
1 parent 1089a29 commit 2f6a4d9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
27 changes: 1 addition & 26 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};

const DELAY_UNTIL: Duration = Duration::from_millis(5000);

let parent_hash = parent_header.hash().into();

let id = BlockId::hash(parent_hash);
Expand Down Expand Up @@ -290,9 +288,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
self.parachain_empty_duration.clone(),
);

debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.",
DELAY_UNTIL);

let validation_para = match local_duty.validation {
Chain::Relay => None,
Chain::Parachain(id) => Some(id),
Expand All @@ -315,7 +310,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
client: self.client.clone(),
dynamic_inclusion,
local_key: sign_with,
minimum_delay: now + DELAY_UNTIL,
parent_hash,
parent_id: id,
parent_number: parent_header.number,
Expand Down Expand Up @@ -370,7 +364,6 @@ pub struct Proposer<C: PolkadotApi> {
client: Arc<C>,
dynamic_inclusion: DynamicInclusion,
local_key: Arc<ed25519::Pair>,
minimum_delay: Instant,
parent_hash: Hash,
parent_id: BlockId,
parent_number: BlockNumber,
Expand Down Expand Up @@ -401,17 +394,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>
initial_included,
).unwrap_or_else(|| now + Duration::from_millis(1));

let minimum_delay = if self.minimum_delay > now + ATTEMPT_PROPOSE_EVERY {
Some(Delay::new(self.minimum_delay))
} else {
None
};

let timing = ProposalTiming {
attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion: self.dynamic_inclusion.clone(),
minimum_delay,
last_included: initial_included,
};

Expand Down Expand Up @@ -484,11 +470,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
// delay casting vote until able according to minimum block time,
// timestamp delay, and count delay.
// construct a future from the maximum of the two durations.
let max_delay = [timestamp_delay, count_delay, Some(self.minimum_delay)]
.iter()
.cloned()
.max()
.expect("iterator not empty; thus max returns `Some`; qed");
let max_delay = ::std::cmp::max(timestamp_delay, count_delay);

let temporary_delay = match max_delay {
Some(duration) => future::Either::A(
Expand Down Expand Up @@ -610,7 +592,6 @@ struct ProposalTiming {
attempt_propose: Interval,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Delay,
minimum_delay: Option<Delay>,
last_included: usize,
}

Expand All @@ -627,12 +608,6 @@ impl ProposalTiming {
x.expect("timer still alive; intervals never end; qed");
}

if let Some(ref mut min) = self.minimum_delay {
try_ready!(min.poll().map_err(ErrorKind::Timer));
}

self.minimum_delay = None; // after this point, the future must have completed.

if included == self.last_included {
return self.enough_candidates.poll().map_err(ErrorKind::Timer);
}
Expand Down
47 changes: 34 additions & 13 deletions consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use transaction_pool::TransactionPool;
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use tokio::timer::{Delay, Interval};

use super::{Network, Collators, ProposerFactory};
use error;
Expand All @@ -49,23 +49,44 @@ const TIMER_INTERVAL_MS: u64 = 500;
// spin up an instance of BFT agreement on the current thread's executor.
// panics if there is no current thread executor.
fn start_bft<F, C>(
header: &Header,
bft_service: &BftService<Block, F, C>,
header: Header,
bft_service: Arc<BftService<Block, F, C>>,
) where
F: bft::Environment<Block> + 'static,
C: bft::BlockImport<Block> + bft::Authorities<Block> + 'static,
F::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
<F as bft::Environment<Block>>::Error: ::std::fmt::Display
{
const DELAY_UNTIL: Duration = Duration::from_millis(5000);

let mut handle = LocalThreadHandle::current();
match bft_service.build_upon(&header) {
Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
},
Ok(None) => {},
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
}
let work = Delay::new(Instant::now() + DELAY_UNTIL)
.then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}

match bft_service.build_upon(&header) {
Ok(maybe_bft_work) => {
if maybe_bft_work.is_some() {
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
}

maybe_bft_work
}
Err(e) => {
warn!(target: "bft", "BFT agreement error: {}", e);
None
}
}
})
.map(|_| ());

if let Err(e) = handle.spawn_local(Box::new(work)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
}

/// Consensus service. Starts working when created.
Expand Down Expand Up @@ -113,7 +134,7 @@ impl Service {

client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, &*bft_service);
start_bft(notification.header, bft_service.clone());
}
Ok(())
})
Expand All @@ -139,9 +160,9 @@ impl Service {
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.hash();
if hash == prev_best {
if hash == prev_best && s.live_agreement() != Some(hash) {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, &*s);
start_bft(best_block, s.clone());
}
prev_best = hash;
}
Expand Down

0 comments on commit 2f6a4d9

Please sign in to comment.