Skip to content

Commit

Permalink
Revert "remove connected disconnected state only (paritytech#3868)" (p…
Browse files Browse the repository at this point in the history
…aritytech#3896)

This reverts commit 7bc3526.
  • Loading branch information
drahnr authored Sep 20, 2021
1 parent 3e92a34 commit eeb6cc6
Show file tree
Hide file tree
Showing 17 changed files with 302 additions and 547 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where
Expand Down Expand Up @@ -214,7 +213,7 @@ impl OverseerGen for BehaveMaleficient {
),
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
.map_err(|e| e.into())

// A builder pattern will simplify this further
Expand Down
5 changes: 2 additions & 3 deletions node/malus/src/variant-a.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerConnector, OverseerHandle},
overseer::{self, OverseerHandle},
FromOverseer,
};

Expand Down Expand Up @@ -86,7 +86,6 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
Expand Down Expand Up @@ -114,7 +113,7 @@ impl OverseerGen for BehaveMaleficient {
},
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
.map_err(|e| e.into())
}
}
Expand Down
14 changes: 3 additions & 11 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{
self as overseer,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;

Expand Down Expand Up @@ -174,15 +173,8 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);

let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

Expand Down
2 changes: 1 addition & 1 deletion node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle
}
/// Obtain access to the overseer handle.
pub fn as_handle(&self) -> &#handle {
pub fn as_handle(&mut self) -> &#handle {
&self.handle
}
}
Expand Down
54 changes: 0 additions & 54 deletions node/overseer/src/dummy.rs

This file was deleted.

87 changes: 68 additions & 19 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use std::{

use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::RwLock;

use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
Expand All @@ -90,17 +91,12 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
};

/// Test helper supplements.
pub mod dummy;
pub use self::dummy::DummySubsystem;

// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::AllSubsystems;
pub use self::subsystems::{AllSubsystems, DummySubsystem};

/// Metrics re-exports of `polkadot-metrics`.
pub mod metrics;
mod metrics;
use self::metrics::Metrics;

use polkadot_node_metrics::{
Expand All @@ -119,7 +115,7 @@ pub use polkadot_overseer_gen::{

/// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time.
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;

#[cfg(test)]
mod tests;
Expand All @@ -145,12 +141,18 @@ where
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct Handle(pub OverseerHandle);
pub enum Handle {
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}

impl Handle {
/// Create a new [`Handle`].
pub fn new(raw: OverseerHandle) -> Self {
Self(raw)
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self::Disconnected(Arc::new(RwLock::new(None)))
}

/// Inform the `Overseer` that that some block was imported.
Expand Down Expand Up @@ -199,8 +201,58 @@ impl Handle {

/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
self.try_connect();
if let Self::Connected(ref mut handle) = self {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}

/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}

/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}

/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
}
}
}
Expand Down Expand Up @@ -438,13 +490,12 @@ where
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # Overseer,
/// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _,
/// # AllMessages,
/// # AllSubsystems,
/// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
Expand Down Expand Up @@ -498,7 +549,6 @@ where
/// None,
/// AlwaysSupportsParachains,
/// spawner,
/// OverseerConnector::default(),
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
Expand Down Expand Up @@ -565,7 +615,6 @@ where
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
Expand Down Expand Up @@ -630,7 +679,7 @@ where
.supports_parachains(supports_parachains)
.metrics(metrics.clone())
.spawner(s)
.build_with_connector(connector)?;
.build()?;

// spawn the metrics metronome task
{
Expand Down
4 changes: 2 additions & 2 deletions node/overseer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Prometheus metrics related to the overseer and its channels.
use super::*;
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
use polkadot_node_metrics::metrics::{self, prometheus};

use parity_util_mem::MemoryAllocationSnapshot;

Expand Down Expand Up @@ -110,7 +110,7 @@ impl Metrics {
}
}

impl MetricsTrait for Metrics {
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
activated_heads_total: prometheus::register(
Expand Down
42 changes: 40 additions & 2 deletions node/overseer/src/subsystems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,47 @@
//! In the future, everything should be set up using the generated
//! overseer builder pattern instead.
use crate::dummy::DummySubsystem;
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
use polkadot_overseer_gen::MapSubsystem;
use polkadot_overseer_gen::{
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,
};

/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;

impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});

SpawnedSubsystem { name: "dummy-subsystem", future }
}
}

/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
Expand Down
Loading

0 comments on commit eeb6cc6

Please sign in to comment.