Skip to content

Commit

Permalink
BABE slot and epoch event notifications (paritytech#6563)
Browse files Browse the repository at this point in the history
* BabeWorker -> BabeSlotWorker

* SlotWorker::notify_slot: similar to claim_slot, but called no matter authoring

* Wrap the future with a new struct BabeWorker

* Add type definition slot_notification_sinks

* Function slot_notification_streams for the receiver side

* Get a handle of slot_notification_sinks in BabeSlotWorker

* Implement notify_slot

* Switch to use bounded mpsc

* Do not drop the sink when channel is full

Only skip sending the message and emit a warning, because it is recoverable.

* Fix future type bounds

* Add must_use and sink type alias
  • Loading branch information
sorpaas authored Jul 30, 2020
1 parent ffe4db9 commit bff302d
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 10 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/consensus/babe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc5", path = "../../../primitives/consensu
sc-consensus-uncles = { version = "0.8.0-rc5", path = "../uncles" }
sc-consensus-slots = { version = "0.8.0-rc5", path = "../slots" }
sp-runtime = { version = "2.0.0-rc5", path = "../../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc5", path = "../../../primitives/utils" }
fork-tree = { version = "2.0.0-rc5", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc5"}
futures = "0.3.4"
Expand All @@ -48,6 +49,7 @@ rand = "0.7.2"
merlin = "2.0"
pdqselect = "0.1.0"
derive_more = "0.99.2"
retain_mut = "0.1.1"

[dev-dependencies]
sp-keyring = { version = "2.0.0-rc5", path = "../../../primitives/keyring" }
Expand Down
87 changes: 77 additions & 10 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ use sc_client_api::{
BlockchainEvents, ProvideUncles,
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use retain_mut::RetainMut;

use futures::prelude::*;
use log::{debug, info, log, trace, warn};
Expand Down Expand Up @@ -370,31 +372,34 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
babe_link,
can_author_with,
}: BabeParams<B, C, E, I, SO, SC, CAW>) -> Result<
impl futures::Future<Output=()>,
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
{
let config = babe_link.config;
let worker = BabeWorker {
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));

let worker = BabeSlotWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)),
env,
sync_oracle: sync_oracle.clone(),
force_authoring,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: config.clone(),
};

Expand All @@ -406,29 +411,69 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
)?;

info!(target: "babe", "👶 Starting BABE Authorship worker");
Ok(sc_consensus_slots::start_slot_worker(
let inner = sc_consensus_slots::start_slot_worker(
config.0,
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
))
);
Ok(BabeWorker {
inner: Box::pin(inner),
slot_notification_sinks,
})
}

/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,
}

impl<B: BlockT> BabeWorker<B> {
/// Return an event stream of notifications for when new slot happens, and the corresponding
/// epoch descriptor.
pub fn slot_notification_stream(
&self
) -> Receiver<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
const CHANNEL_BUFFER_SIZE: usize = 1024;

let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.slot_notification_sinks.lock().push(sink);
stream
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context
) -> futures::task::Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

struct BabeWorker<B: BlockT, C, E, I, SO> {
/// Slot notification sinks.
type SlotNotificationSinks<B> = Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>>;

struct BabeSlotWorker<B: BlockT, C, E, I, SO> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
env: E,
sync_oracle: SO,
force_authoring: bool,
keystore: KeyStorePtr,
epoch_changes: SharedEpochChanges<B, Epoch>,
slot_notification_sinks: SlotNotificationSinks<B>,
config: Config,
}

impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down Expand Up @@ -502,6 +547,28 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
s
}

fn notify_slot(
&self,
_parent_header: &B::Header,
slot_number: SlotNumber,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) {
self.slot_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send((slot_number, epoch_descriptor.clone())) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(target: "babe", "Trying to notify a slot but the channel is full");
true
} else {
false
}
},
}
});
}

fn pre_digest_data(
&self,
_slot_number: u64,
Expand Down Expand Up @@ -599,7 +666,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
}
}

impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down
11 changes: 11 additions & 0 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ pub trait SimpleSlotWorker<B: BlockT> {
epoch_data: &Self::EpochData,
) -> Option<Self::Claim>;

/// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we
/// need to author blocks or not.
fn notify_slot(
&self,
_header: &B::Header,
_slot_number: u64,
_epoch_data: &Self::EpochData,
) { }

/// Return the pre digest data to include in a block authored with the given claim.
fn pre_digest_data(
&self,
Expand Down Expand Up @@ -191,6 +200,8 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
};

self.notify_slot(&chain_head, slot_number, &epoch_data);

let authorities_len = self.authorities_len(&epoch_data);

if !self.force_authoring() &&
Expand Down

0 comments on commit bff302d

Please sign in to comment.