Skip to content

Commit

Permalink
[mqtt] upstream bridge event handlers (Azure#3742)
Browse files Browse the repository at this point in the history
`Pump` suppose to support several activities running in parallel: 
* Ingress - a reading publication from store and publishing using MQTT client
* Egress - a receiving publication from the MQTT client and put it in the opposite pump store
* Messages Processor - reacting on messages coming to the `Pump`

Pump messages can be different for different pumps (local/remote, upstream/east-west). 
`PumpMessageHandler` addressing this variety of options.

This PR includes `Connectivity` message functionality
  • Loading branch information
dmolokanov authored Oct 21, 2020
1 parent ca33b33 commit 8ea9582
Show file tree
Hide file tree
Showing 30 changed files with 1,460 additions and 767 deletions.
1 change: 1 addition & 0 deletions mqtt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mqtt/mqtt-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ regex = "1"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_derive = "1.0"
serde_json = "1.0"
serial_test = "0.4"
thiserror = "1.0"
tokio = { version = "0.2", features = ["sync", "rt-util"] }
Expand Down
120 changes: 76 additions & 44 deletions mqtt/mqtt-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use futures_util::{future::select, future::Either, pin_mut};
use mqtt3::ShutdownError;
use tokio::sync::{mpsc::error::SendError, oneshot, oneshot::Sender};
use tokio::{select, sync::oneshot::Sender};
use tracing::{debug, error, info, info_span};
use tracing_futures::Instrument;

use crate::{
client::ClientError,
persist::PersistError,
pump::{PumpMessage, PumpPair},
rpc::RpcError,
settings::ConnectionSettings,
client::{ClientError, MqttClientConfig},
persist::{PersistError, PublicationStore, StreamWakeableState, WakingMemoryStore},
pump::{Builder, Pump},
settings::{ConnectionSettings, Credentials},
upstream::{
ConnectivityError, LocalUpstreamHandler, LocalUpstreamPumpEventHandler,
RemoteUpstreamHandler, RemoteUpstreamPumpEventHandler, RpcError,
},
};

#[derive(Debug)]
Expand All @@ -33,73 +35,100 @@ impl BridgeShutdownHandle {
}

/// Bridge implementation that connects to local broker and remote broker and handles messages flow
pub struct Bridge {
pumps: PumpPair,
pub struct Bridge<S> {
local_pump: Pump<S, LocalUpstreamHandler<S>, LocalUpstreamPumpEventHandler>,
remote_pump: Pump<S, RemoteUpstreamHandler<S>, RemoteUpstreamPumpEventHandler>,
connection_settings: ConnectionSettings,
}

impl Bridge {
pub async fn new(
impl Bridge<WakingMemoryStore> {
pub async fn new_upstream(
system_address: String,
device_id: String,
connection_settings: ConnectionSettings,
settings: ConnectionSettings,
) -> Result<Self, BridgeError> {
debug!("creating bridge {}...", connection_settings.name());

let mut pumps = PumpPair::new(&connection_settings, &system_address, &device_id)?;

pumps
.local_pump
const BATCH_SIZE: usize = 10;

debug!("creating bridge...");

let (mut local_pump, mut remote_pump) = Builder::default()
.with_local(|pump| {
pump.with_config(MqttClientConfig::new(
&system_address,
settings.keep_alive(),
settings.clean_session(),
Credentials::Anonymous(format!(
"{}/$edgeHub/$bridge/{}",
device_id,
settings.name()
)),
))
.with_rules(settings.forwards());
})
.with_remote(|pump| {
pump.with_config(MqttClientConfig::new(
settings.address(),
settings.keep_alive(),
settings.clean_session(),
settings.credentials().clone(),
))
.with_rules(settings.subscriptions());
})
.with_store(|| PublicationStore::new_memory(BATCH_SIZE))
.build()?;

// TODO move subscriptions into run method
local_pump
.subscribe()
.instrument(info_span!("pump", name = "local"))
.await?;

pumps
.remote_pump
remote_pump
.subscribe()
.instrument(info_span!("pump", name = "remote"))
.await?;

debug!("created {} bridge...", connection_settings.name());
debug!("created bridge...");

Ok(Bridge {
pumps,
connection_settings,
local_pump,
remote_pump,
connection_settings: settings,
})
}
}

pub async fn run(mut self) -> Result<(), BridgeError> {
info!("starting {} bridge...", self.connection_settings.name());

let (local_shutdown, local_shutdown_listener) = oneshot::channel::<()>();
let (remote_shutdown, remote_shutdown_listener) = oneshot::channel::<()>();
let shutdown_handle = BridgeShutdownHandle {
local_shutdown,
remote_shutdown,
};
impl<S> Bridge<S>
where
S: StreamWakeableState + Send,
{
pub async fn run(self) -> Result<(), BridgeError> {
info!("Starting {} bridge...", self.connection_settings.name());

let local_pump = self
.pumps
.local_pump
.run(local_shutdown_listener)
.run()
.instrument(info_span!("pump", name = "local"));

let remote_pump = self
.pumps
.remote_pump
.run(remote_shutdown_listener)
.run()
.instrument(info_span!("pump", name = "remote"));
pin_mut!(local_pump, remote_pump);

debug!(
"starting pumps for {} bridge...",
self.connection_settings.name()
);
match select(local_pump, remote_pump).await {
Either::Left(_) => {
shutdown_handle.shutdown().await?;

select! {
_ = local_pump => {
// TODO shutdown remote pump
// shutdown_handle.shutdown().await?;
}
Either::Right(_) => {
shutdown_handle.shutdown().await?;

_ = remote_pump => {
// TODO shutdown local pump
// shutdown_handle.shutdown().await?;
}
}

Expand All @@ -123,12 +152,15 @@ pub enum BridgeError {
#[error("failed to load settings.")]
LoadingSettings(#[from] config::ConfigError),

#[error("failed to get send pump message.")]
SenderToPump(#[from] SendError<PumpMessage>),
#[error("Failed to get send pump message.")]
SendToPump,

#[error("failed to execute RPC command")]
Rpc(#[from] RpcError),

#[error("failed to execute connectivity event")]
Connectivity(#[from] ConnectivityError),

#[error("failed to signal bridge shutdown.")]
ShutdownBridge(()),

Expand Down
70 changes: 39 additions & 31 deletions mqtt/mqtt-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ where
T: TokenSource + Clone + Send + Sync + 'static,
{
pub fn new(
address: String,
address: impl Into<String>,
token_source: Option<T>,
trust_bundle_source: Option<TrustBundleSource>,
) -> Self {
Self {
address,
address: address.into(),
token_source,
trust_bundle_source,
}
Expand Down Expand Up @@ -184,11 +184,31 @@ impl BridgeIoSource {
}
}

pub struct MqttClientConfig {
addr: String,
keep_alive: Duration,
clean_session: bool,
credentials: Credentials,
}

impl MqttClientConfig {
pub fn new(
addr: impl Into<String>,
keep_alive: Duration,
clean_session: bool,
credentials: Credentials,
) -> Self {
Self {
addr: addr.into(),
keep_alive,
clean_session,
credentials,
}
}
}

/// This is a wrapper over mqtt3 client
pub struct MqttClient<H>
where
H: EventHandler,
{
pub struct MqttClient<H> {
client_id: Option<String>,
username: Option<String>,
io_source: BridgeIoSource,
Expand All @@ -198,44 +218,32 @@ where
}

impl<H: EventHandler> MqttClient<H> {
pub fn tcp(
address: &str,
keep_alive: Duration,
clean_session: bool,
event_handler: H,
connection_credentials: &Credentials,
) -> Self {
let token_source = Self::token_source(&connection_credentials);
let tcp_connection = TcpConnection::new(address.to_owned(), token_source, None);
pub fn tcp(config: MqttClientConfig, event_handler: H) -> Self {
let token_source = Self::token_source(&config.credentials);
let tcp_connection = TcpConnection::new(config.addr, token_source, None);
let io_source = BridgeIoSource::Tcp(tcp_connection);

Self::new(
keep_alive,
clean_session,
config.keep_alive,
config.clean_session,
event_handler,
connection_credentials,
&config.credentials,
io_source,
)
}

pub fn tls(
address: &str,
keep_alive: Duration,
clean_session: bool,
event_handler: H,
connection_credentials: &Credentials,
) -> Self {
let trust_bundle = Some(TrustBundleSource::new(connection_credentials.clone()));
pub fn tls(config: MqttClientConfig, event_handler: H) -> Self {
let trust_bundle = Some(TrustBundleSource::new(config.credentials.clone()));

let token_source = Self::token_source(&connection_credentials);
let tcp_connection = TcpConnection::new(address.to_owned(), token_source, trust_bundle);
let token_source = Self::token_source(&config.credentials);
let tcp_connection = TcpConnection::new(config.addr, token_source, trust_bundle);
let io_source = BridgeIoSource::Tls(tcp_connection);

Self::new(
keep_alive,
clean_session,
config.keep_alive,
config.clean_session,
event_handler,
connection_credentials,
&config.credentials,
io_source,
)
}
Expand Down
3 changes: 2 additions & 1 deletion mqtt/mqtt-bridge/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ impl BridgeController {

let upstream_bridge = async move {
let bridge =
Bridge::new(system_address, device_id, upstream_settings.clone()).await;
Bridge::new_upstream(system_address, device_id, upstream_settings.clone())
.await;

match bridge {
Ok(bridge) => {
Expand Down
5 changes: 2 additions & 3 deletions mqtt/mqtt-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@

mod bridge;
pub mod client;
mod connectivity;
pub mod controller;
mod messages;
mod persist;
mod pump;
mod rpc;
pub mod pump;
pub mod settings;
mod token_source;
pub mod upstream;

pub use crate::controller::{
BridgeController, BridgeControllerHandle, BridgeControllerUpdate, Error,
Expand Down
Loading

0 comments on commit 8ea9582

Please sign in to comment.