Skip to content

Commit

Permalink
Merged PR 859652: Edgelet watchdog iteration 1
Browse files Browse the repository at this point in the history
This PR includes changes to current watchdog functionality to do the following -
- Check if the edgeAgent container exists. If not, then it checks if the identity has SAS auth, and if not, it updates the identity. Then it creates the edgeAgent container
- Added a watchdog that runs periodically and checks the status of the edgeAgent. If it is not Running, then it starts the container.
- Added support for graceful shutdown of the watchdog.

Items remaining (in follow up PRs) -
- Backoff when restarting edgeAgent
- Handling the case if the edgeAgent container is completely removed (docker rm -f edgeAgent).
- Tests
  • Loading branch information
varunpuranik committed Jun 1, 2018
1 parent be6ef01 commit c7834cf
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 89 deletions.
10 changes: 6 additions & 4 deletions edgelet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions edgelet/edgelet-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ serde_json = "1.0"
sha2 = "0.7.0"
log = "0.4"
url = "1.7"
tokio = "0.1"
tokio-timer = "0.2.3"

edgelet-utils = { path = "../edgelet-utils" }

Expand Down
16 changes: 14 additions & 2 deletions edgelet/edgelet-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.

use failure::{Backtrace, Context, Fail};
use std::fmt;
use std::fmt::Display;

use failure::{Backtrace, Context, Fail};
use tokio_timer;

use edgelet_utils::Error as UtilsError;

Expand Down Expand Up @@ -36,6 +36,10 @@ pub enum ErrorKind {
Activate,
#[fail(display = "Edge runtime module has not been created in IoT Hub")]
EdgeRuntimeNotFound,
#[fail(display = "Watchdog error")]
Watchdog,
#[fail(display = "Tokio timer error")]
TokioTimer,
}

impl Fail for Error {
Expand Down Expand Up @@ -85,3 +89,11 @@ impl From<UtilsError> for Error {
}
}
}

impl From<tokio_timer::Error> for Error {
fn from(error: tokio_timer::Error) -> Error {
Error {
inner: error.context(ErrorKind::TokioTimer),
}
}
}
2 changes: 2 additions & 0 deletions edgelet/edgelet-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;
extern crate sha2;
extern crate tokio;
extern crate tokio_timer;

#[macro_use]
extern crate edgelet_utils;
Expand Down
218 changes: 152 additions & 66 deletions edgelet/edgelet-core/src/watchdog.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.

use error::{Error, ErrorKind};
use std::time::{Duration, Instant};

use futures::future::{self, Either, FutureResult};
use futures::Future;
use tokio::prelude::*;
use tokio::timer::Interval;

use error::{Error, ErrorKind};
use identity::{Identity, IdentityManager, IdentitySpec};
use module::{Module, ModuleRegistry, ModuleRuntime, ModuleSpec, ModuleStatus};

/// This variable holds the generation ID associated with the Edge Agent module.
const MODULE_GENERATIONID: &str = "IOTEDGE_MODULEGENERATIONID";

/// This is the frequency with which the watchdog checks for the status of the edge runtime module.
const WATCHDOG_FREQUENCY_SECS: u64 = 60;

pub struct Watchdog<M, I> {
runtime: M,
id_mgr: I,
Expand All @@ -30,66 +38,162 @@ where
// Start the edge runtime module (EdgeAgent). This also updates the identity of the module (module_id)
// to make sure it is configured for the right authentication type (sas token)
// spec.name = edgeAgent / module_id = $edgeAgent
pub fn start(
&mut self,
pub fn run_until<F>(
self,
spec: ModuleSpec<<M::Module as Module>::Config>,
module_id: &str,
) -> impl Future<Item = (), Error = Error> {
let (runtime, runtime_copy) = (self.runtime.clone(), self.runtime.clone());
shutdown_signal: F,
) -> impl Future<Item = (), Error = Error>
where
F: Future<Item = (), Error = ()> + 'static,
{
let runtime = self.runtime.clone();
let name = spec.name().to_string();

// Update identity of EdgeAgent to use the Auth mechanism supported by this Edgelet (Sas tokens)
let mut id_mgr_copy = self.id_mgr.clone();
let mut id_mgr = self.id_mgr.clone();
let module_id = module_id.to_string();
//TODO: remove edgeHub identity update when agent can update identities
update_identity(&mut self.id_mgr, "$edgeHub")
.and_then(move |_| update_identity(&mut id_mgr_copy, &module_id))
.and_then(move |id| runtime.list().map(|m| (id, m)).map_err(|e| e.into()))
.and_then(move |(id, m)| {
m.iter()
.filter_map(|m| if m.name() == name { Some(m) } else { None })
.nth(0)
.map(|m| Either::A(start_runtime(runtime_copy.clone(), m)))
.unwrap_or_else(|| {
// add the generation ID for edge agent as an environment variable
let mut env = spec.env().clone();
env.insert(
MODULE_GENERATIONID.to_string(),
id.generation_id().to_string(),
);

Either::B(create_and_start(runtime_copy, spec.with_env(env)))
})
.map(|_| info!("Edge runtime started."))

// Check if the Edge runtime module exists, and if not create it
let watchdog = get_edge_runtime_mod(&runtime, name.clone())
.and_then(move |m| get_or_update_identity(&mut id_mgr, &module_id, m.is_none()))
.and_then(|id| {
// add the generation ID for edge agent as an environment variable
let mut env = spec.env().clone();
env.insert(
MODULE_GENERATIONID.to_string(),
id.generation_id().to_string(),
);
start_watchdog(runtime, spec.with_env(env))
});

// Swallow any errors from shutdown_signal
let shutdown_signal = shutdown_signal.then(|_| Ok(()));

// Wait for the watchdog or shutdown futures to complete
// Since the watchdog never completes, this will wait for the
// shutdown signal.
shutdown_signal
.select(watchdog)
.then(move |result| match result {
Ok(((), _)) => future::ok(()),
Err((e, _)) => future::err(e),
})
}
}

// Update the edge agent identity to use the right authentication mechanism.
fn update_identity<I>(
// Start watchdog on a timer for 1 minute
pub fn start_watchdog<M>(
runtime: M,
spec: ModuleSpec<<M::Module as Module>::Config>,
) -> impl Future<Item = (), Error = Error>
where
M: 'static + ModuleRuntime + Clone,
<M::Module as Module>::Config: Clone,
M::Error: Into<Error>,
<M::Module as Module>::Error: Into<Error>,
{
info!("Starting watchdog...");
Interval::new(Instant::now(), Duration::from_secs(WATCHDOG_FREQUENCY_SECS))
.map_err(Error::from)
.for_each(move |_| {
info!("Checking edge runtime status");
check_runtime(runtime.clone(), spec.clone()).or_else(|e| {
warn!(
"Error in watchdog when checking for edge runtime status: {}",
e
);
future::ok(())
})
})
}

// Check if the edge runtime module is running, and if not, start it.
fn check_runtime<M>(
runtime: M,
spec: ModuleSpec<<M::Module as Module>::Config>,
) -> impl Future<Item = (), Error = Error>
where
M: 'static + ModuleRuntime + Clone,
<M::Module as Module>::Config: Clone,
M::Error: Into<Error>,
<M::Module as Module>::Error: Into<Error>,
{
let module = spec.name().to_string();
get_edge_runtime_mod(&runtime, module.clone())
.and_then(|m| m.map(|m| m.runtime_state().map_err(|e| e.into())))
.and_then(move |state| {
state
.map(|state| {
let res = match *state.status() {
ModuleStatus::Running => {
info!("Edge runtime is running.");
future::Either::A(future::ok(()))
}
_ => {
info!(
"Edge runtime status is {}, starting module now...",
*state.status()
);
future::Either::B(runtime.start(&module).map_err(|e| e.into()))
}
};
Either::A(res)
})
.unwrap_or_else(|| Either::B(create_and_start(runtime, spec)))
})
.map(|_| ())
}

// Gets the edge runtime module, if it exists.
fn get_edge_runtime_mod<M>(
runtime: &M,
name: String,
) -> impl Future<Item = Option<M::Module>, Error = Error>
where
M: 'static + ModuleRuntime + Clone,
<M::Module as Module>::Config: Clone,
M::Error: Into<Error>,
<M::Module as Module>::Error: Into<Error>,
{
runtime
.list()
.map(move |m| {
m.into_iter()
.filter_map(move |m| if m.name() == name { Some(m) } else { None })
.nth(0)
})
.map_err(|e| e.into())
}

// Gets the identity for the module.
// If the update flag is specified, then also updates the identity of the module.
fn get_or_update_identity<I>(
id_mgr: &mut I,
module_id: &str,
update: bool,
) -> impl Future<Item = I::Identity, Error = Error>
where
I: 'static + IdentityManager + Clone,
I::Error: Into<Error>,
{
info!("Updating identity for {}", module_id);

let mut id_mgr_copy = id_mgr.clone();
id_mgr
.get(IdentitySpec::new(module_id))
.map_err(|e| e.into())
.and_then(move |identity| {
identity
.map(|module| {
let res = id_mgr_copy
.update(
IdentitySpec::new(module.module_id())
.with_generation_id(module.generation_id().to_string()),
)
.map_err(|e| e.into());

let res = if update {
info!("Updating identity for module {}", module.module_id());
let res = id_mgr_copy
.update(
IdentitySpec::new(module.module_id())
.with_generation_id(module.generation_id().to_string()),
)
.map_err(|e| e.into());
Either::A(res)
} else {
Either::B(future::ok(module))
};
Either::A(res)
})
.unwrap_or_else(|| {
Expand Down Expand Up @@ -122,30 +226,6 @@ where
.map_err(|e| e.into())
}

// Check edge agent state and start container if not running.
fn start_runtime<M>(
runtime: M,
module: &<M as ModuleRuntime>::Module,
) -> impl Future<Item = (), Error = Error>
where
M: 'static + ModuleRuntime + Clone,
<M::Module as Module>::Config: Clone,
M::Error: Into<Error>,
<M::Module as Module>::Error: Into<Error>,
{
let module_name = module.name().to_string();
module
.runtime_state()
.map_err(|e| e.into())
.and_then(move |state| match *state.status() {
ModuleStatus::Running => future::Either::A(future::ok(())),
_ => {
info!("Starting edge runtime module {}", module_name);
future::Either::B(runtime.start(&module_name).map_err(|e| e.into()))
}
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -344,7 +424,9 @@ mod tests {
let mut manager = TestIdentityManager::new(vec![]).with_fail_get(true);
assert_eq!(
true,
update_identity(&mut manager, "$edgeAgent").wait().is_err()
get_or_update_identity(&mut manager, "$edgeAgent", false)
.wait()
.is_err()
);
}

Expand All @@ -359,7 +441,9 @@ mod tests {

assert_eq!(
true,
update_identity(&mut manager, "$edgeAgent").wait().is_err()
get_or_update_identity(&mut manager, "$edgeAgent", true)
.wait()
.is_err()
);
assert_eq!(true, manager.state.borrow().update_called);
}
Expand All @@ -375,7 +459,9 @@ mod tests {

assert_eq!(
false,
update_identity(&mut manager, "$edgeAgent").wait().is_err()
get_or_update_identity(&mut manager, "$edgeAgent", true)
.wait()
.is_err()
);
assert_eq!(true, manager.state.borrow().update_called);
assert_eq!(
Expand Down
Loading

0 comments on commit c7834cf

Please sign in to comment.