Skip to content

Commit

Permalink
Merge #20
Browse files Browse the repository at this point in the history
20: feat(watcher): add new watcher agent r=tiagolobocastro a=tiagolobocastro

The watcher is currently part of the core agent and it processes
 get/create/delete calls
On creation it makes use of the store to create a watch on the specified
 resource notifying the requester of changes everytime the resource
 changes.
At the moment the only supported type of watch are http callbacks. This
 is meant to allow the operator to watch for changes without polling.
Our own agents will also be using this method (eg, the control loop)
 might also need to watch for changes so that it can reconcile though
 that will likely not be notified via http (maybe via nats or even
 same binary callback).

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Apr 1, 2021
2 parents f6ae418 + c958fec commit 489908a
Show file tree
Hide file tree
Showing 37 changed files with 1,786 additions and 299 deletions.
241 changes: 234 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,12 @@ impl ComposeTest {
Ok(())
}

/// get container ip
pub fn container_ip(&self, name: &str) -> String {
let (_id, ip) = self.containers.get(name).unwrap();
ip.to_string()
}

/// stop all the containers part of the network
/// returns the last error, if any or Ok
pub async fn stop_network_containers(&self) -> Result<(), Error> {
Expand Down
10 changes: 8 additions & 2 deletions control-plane/agents/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tokio = { version = "0.2", features = ["full"] }
tonic = "0.1"
futures = "0.3.8"
serde_json = "1.0"
async-trait = "0.1.36"
async-trait = "=0.1.42"
dyn-clonable = "0.9.0"
smol = "1.0.0"
snafu = "0.6"
Expand All @@ -35,12 +35,18 @@ tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2.4"
rpc = "0.1.0"
url = "2.2.0"
http = "0.2.3"
paste = "1.0.4"
store = { path = "../store" }
reqwest = "0.10.0"

[dev-dependencies]
composer = { path = "../../composer" }
ctrlp-tests = { path = "../../tests-mayastor" }
actix-rt = "1.1.1"
actix-web = { version = "3.2.0", features = ["rustls"] }
url = "2.2.0"
once_cell = "1.4.1"

[dependencies.serde]
features = ["derive"]
Expand Down
50 changes: 50 additions & 0 deletions control-plane/agents/common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use mbus_api::{
ResourceKind,
};
use snafu::{Error, Snafu};
use store::store::StoreError;
use tonic::Code;

/// Common error type for send/receive
Expand Down Expand Up @@ -85,6 +86,22 @@ pub enum SvcError {
InvalidArguments {},
#[snafu(display("Multiple nexuses not supported"))]
MultipleNexuses {},
#[snafu(display("Store returned an error: {}", source.to_string()))]
Store { source: store::store::StoreError },
#[snafu(display("Watch Config Not Found"))]
WatchNotFound {},
#[snafu(display("{} Resource to be watched does not exist", kind.to_string()))]
WatchResourceNotFound { kind: ResourceKind },
#[snafu(display("Watch Already Exists"))]
WatchAlreadyExists {},
}

impl From<StoreError> for SvcError {
fn from(source: StoreError) -> Self {
SvcError::Store {
source,
}
}
}

impl From<mbus_api::Error> for SvcError {
Expand All @@ -107,6 +124,7 @@ impl From<SvcError> for ReplyError {
fn from(error: SvcError) -> Self {
#[allow(deprecated)]
let desc: &String = &error.description().to_string();
let error_str = error.full_string();
match error {
SvcError::BusGetNode {
source, ..
Expand Down Expand Up @@ -185,6 +203,14 @@ impl From<SvcError> for ReplyError {
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::Store {
..
} => ReplyError {
kind: ReplyErrorKind::Internal,
resource: ResourceKind::Watch,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::JsonRpc {
..
} => ReplyError {
Expand Down Expand Up @@ -233,6 +259,30 @@ impl From<SvcError> for ReplyError {
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::WatchResourceNotFound {
kind,
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: kind,
source: desc.to_string(),
extra: error_str,
},
SvcError::WatchNotFound {
..
} => ReplyError {
kind: ReplyErrorKind::NotFound,
resource: ResourceKind::Watch,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::WatchAlreadyExists {
..
} => ReplyError {
kind: ReplyErrorKind::AlreadyExists,
resource: ResourceKind::Watch,
source: desc.to_string(),
extra: error.full_string(),
},
SvcError::InvalidFilter {
..
} => ReplyError {
Expand Down
39 changes: 27 additions & 12 deletions control-plane/agents/core/src/core/registry.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,47 @@
//! Registry containing all mayastor instances which register themselves via the
//! `Register` Message.
//! Said instances may also send `Deregister` to unregister themselves
//! during node/pod shutdown/restart. When this happens the node state is
//! set as `Unknown`. It's TBD how to detect when a node is really going
//! away for good.
//!
//! A mayastor instance sends `Register` every N seconds as sort of a keep
//! alive message.
//! A watchful watchdog is started for each node and it will change the
//! state of said node to `Offline` if it is not petted before its
//! `deadline`.
//!
//! Each instance also contains the known nexus, pools and replicas that live in
//! said instance.
use super::wrapper::NodeWrapper;
use crate::core::wrapper::InternalOps;
use mbus_api::v0::NodeId;
use std::{collections::HashMap, sync::Arc};
use store::{etcd::Etcd, store::Store};
use tokio::sync::{Mutex, RwLock};

/// Registry containing all mayastor instances which register themselves via the
/// `Register` Message.
/// Said instances may also send `Deregister` to unregister themselves during
/// node/pod shutdown/restart. When this happens the node state is set as
/// `Unknown`. It's TBD how to detect when a node is really going away for good.
///
/// A mayastor instance sends `Register` every N seconds as sort of a keep
/// alive message.
/// A watchful watchdog is started for each node and it will change the state
/// of said node to `Offline` if it is not petted before its `deadline`.
/// Registry containing all mayastor instances (aka nodes)
pub type Registry = RegistryInner<Etcd>;

/// Generic Registry Inner with a Store trait
#[derive(Clone, Debug)]
pub struct Registry {
pub struct RegistryInner<S: Store> {
pub(crate) nodes: Arc<RwLock<HashMap<NodeId, Arc<Mutex<NodeWrapper>>>>>,
/// period to refresh the cache
period: std::time::Duration,
pub(crate) store: Arc<Mutex<S>>,
}

impl Registry {
/// Create a new registry with the `period` to reload the cache
pub fn new(period: std::time::Duration) -> Self {
pub async fn new(period: std::time::Duration, store_url: String) -> Self {
let store = Etcd::new(&store_url)
.await
.expect("Should connect to the persistent store");
let registry = Self {
nodes: Default::default(),
period,
store: Arc::new(Mutex::new(store)),
};
registry.start();
registry
Expand Down
73 changes: 10 additions & 63 deletions control-plane/agents/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,73 +34,20 @@ pub(crate) fn configure(builder: Service) -> Service {
#[cfg(test)]
mod tests {
use super::*;
use composer::*;
use rpc::mayastor::Null;
use testlib::ClusterBuilder;

async fn bus_init() -> Result<(), Box<dyn std::error::Error>> {
tokio::time::timeout(std::time::Duration::from_secs(2), async {
mbus_api::message_bus_init("10.1.0.2".into()).await
})
.await?;
Ok(())
}
async fn wait_for_node() -> Result<(), Box<dyn std::error::Error>> {
let _ = GetNodes {}.request().await?;
Ok(())
}
fn init_tracing() {
if let Ok(filter) =
tracing_subscriber::EnvFilter::try_from_default_env()
{
tracing_subscriber::fmt().with_env_filter(filter).init();
} else {
tracing_subscriber::fmt().with_env_filter("info").init();
}
}
// to avoid waiting for timeouts
async fn orderly_start(
test: &ComposeTest,
) -> Result<(), Box<dyn std::error::Error>> {
test.start_containers(vec!["nats", "core"]).await?;

bus_init().await?;
wait_for_node().await?;

test.start("mayastor").await?;

let mut hdl = test.grpc_handle("mayastor").await?;
hdl.mayastor.list_nexus(Null {}).await?;
Ok(())
}

#[tokio::test]
#[actix_rt::test]
async fn node() {
init_tracing();
let maya_name = NodeId::from("node-test-name");
let test = Builder::new()
.name("node")
.add_container_bin(
"nats",
Binary::from_nix("nats-server").with_arg("-DV"),
)
.add_container_bin(
"core",
Binary::from_dbg("core")
.with_nats("-n")
.with_args(vec!["-d", "2sec"]),
)
.add_container_bin(
"mayastor",
Binary::from_nix("mayastor")
.with_nats("-n")
.with_args(vec!["-N", maya_name.as_str()]),
)
.autorun(false)
let cluster = ClusterBuilder::builder()
.with_rest(false)
.with_agents(vec!["core"])
.with_node_deadline("2s")
.build()
.await
.unwrap();

orderly_start(&test).await.unwrap();
let maya_name = cluster.node(0);
let grpc = format!("{}:10124", cluster.node_ip(0));

let nodes = GetNodes {}.request().await.unwrap();
tracing::info!("Nodes: {:?}", nodes);
Expand All @@ -109,7 +56,7 @@ mod tests {
nodes.0.first().unwrap(),
&Node {
id: maya_name.clone(),
grpc_endpoint: "0.0.0.0:10124".to_string(),
grpc_endpoint: grpc.clone(),
state: NodeState::Online,
}
);
Expand All @@ -121,7 +68,7 @@ mod tests {
nodes.0.first().unwrap(),
&Node {
id: maya_name.clone(),
grpc_endpoint: "0.0.0.0:10124".to_string(),
grpc_endpoint: grpc.clone(),
state: NodeState::Offline,
}
);
Expand Down
Loading

0 comments on commit 489908a

Please sign in to comment.