Skip to content

Commit

Permalink
Consolidate env vars for EdgeHub and Broker (Azure#4393)
Browse files Browse the repository at this point in the history
There are few issues with env vars that we have and below are the improvements I suggest. 	
- Not consistent naming - I suggest to land on PascalCase, since it's already most popular format for our existing vars. Please note this is pure documentation update. Our configuration implementation will be case insensitive.
- We use `:` in many settings where it is not well-supported on all platforms. Specifically it is hard to use on Linux - I suggest to move to `__` as a separator. This is a supported separator for dotnet core config provider and is back-compatible with `:`. So this, again, can be just documentation change, since both `:` and `__` will continue working in dotnet. For rust (broker code) we will also support both `:` and `__`.
- Some variables in broker needs to be changed from snake_case to PascalCase, while we have time before release. Those vars are:
  - `MqttBroker__MaxQueuedMessages`
  - `MqttBroker__MaxQueuedBytes`
  - `MqttBroker__MaxInflightMessages`
  - `MqttBroker__WhenFull`

Some other changes:
- Added support for `StorageFolder` env for Broker, so EH and Broker will store their state to the same folder.
- Updated `EnvironmentVariables.md`
  • Loading branch information
vadim-kovalyov authored Feb 12, 2021
1 parent afdc76c commit 6167323
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 77 deletions.
38 changes: 21 additions & 17 deletions doc/EnvironmentVariables.md

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions mqtt/mqtt-broker/src/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,21 @@ impl Default for RetainedMessagesConfig {

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct SessionPersistenceConfig {
path: PathBuf,
folder_path: PathBuf,
#[serde(with = "humantime_serde")]
time_interval: Duration,
}

impl SessionPersistenceConfig {
pub fn new(path: PathBuf, time_interval: Duration) -> Self {
pub fn new(folder_path: PathBuf, time_interval: Duration) -> Self {
Self {
path,
folder_path,
time_interval,
}
}

pub fn file_path(&self) -> PathBuf {
self.path.clone()
pub fn folder_path(&self) -> PathBuf {
self.folder_path.clone()
}

pub fn time_interval(&self) -> Duration {
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqtt-edgehub/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"when_full": "drop_new"
},
"persistence": {
"path": "/tmp/mqttd/",
"folder_path": "/tmp/mqttd/",
"time_interval": "5m"
}
}
Expand Down
103 changes: 52 additions & 51 deletions mqtt/mqtt-edgehub/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{
collections::HashMap,
env,
path::{Path, PathBuf},
};

use config::{Config, ConfigError, File, FileFormat};
use config::{Config, ConfigError, Environment, File, FileFormat, Source, Value};
use lazy_static::lazy_static;
use serde::Deserialize;

Expand All @@ -27,33 +26,59 @@ lazy_static! {
};
}

/// `BrokerEnvironment` is our custom implementation of `config::Source`
/// that can handle existing `EdgeHub` env settings and convert them
/// into broker config structure.
#[derive(Debug, Clone)]
pub struct BrokerEnvironment;

impl config::Source for BrokerEnvironment {
fn clone_into_box(&self) -> Box<dyn config::Source + Send + Sync> {
impl Source for BrokerEnvironment {
fn clone_into_box(&self) -> Box<dyn Source + Send + Sync> {
Box::new((*self).clone())
}

// Currently, BrokerEnvironment allows only the following four environment variables to be set externally.
// Otherwise, all values must come from the default.json file
fn collect(&self) -> Result<HashMap<String, config::Value>, ConfigError> {
// Currently, BrokerEnvironment allows only the env variables explicitly
// defined in the method below.
//
// We use intermediate instance of `Config` to enumerate all env vars
// and then manually map them to our internal config structure.
// This is done for two reasons:
// - our broker config structure does not match legacy EdgeHub env vars,
// - `Config` does a bunch of useful things - takes care of
// evn vars casing, prefixing, separators...
//
// NOTE: if adding new env vars - don't forget to use lowercase
// and update `check_env_var_name_override` test.
fn collect(&self) -> Result<HashMap<String, Value>, ConfigError> {
let mut host_env = Config::new();
// regular env vars
host_env.merge(Environment::new())?;
// broker specific vars
host_env.merge(Environment::with_prefix("MqttBroker_").separator(":"))?;
host_env.merge(Environment::with_prefix("MqttBroker_").separator("__"))?;

let mut result: HashMap<String, config::Value> = HashMap::new();
if let Ok(val) = env::var("mqttBroker__max_queued_messages") {
result.insert("broker.session.max_queued_messages".into(), val.into());
}

if let Ok(val) = env::var("mqttBroker__max_queued_bytes") {
result.insert("broker.session.max_queued_size".into(), val.into());
// session
if let Ok(val) = host_env.get::<Value>("maxinflightmessages") {
result.insert("broker.session.max_inflight_messages".into(), val);
}

if let Ok(val) = env::var("mqttBroker__max_inflight_messages") {
result.insert("broker.session.max_inflight_messages".into(), val.into());
if let Ok(val) = host_env.get::<Value>("maxqueuedmessages") {
result.insert("broker.session.max_queued_messages".into(), val);
}
if let Ok(val) = host_env.get::<Value>("maxqueuedbytes") {
result.insert("broker.session.max_queued_size".into(), val);
}
if let Ok(val) = host_env.get::<Value>("whenfull") {
result.insert("broker.session.when_full".into(), val);
}

if let Ok(val) = env::var("mqttBroker__when_full") {
result.insert("broker.session.when_full".into(), val.into());
// persistance
if let Ok(val) = host_env.get::<Value>("storagefolder") {
result.insert("broker.persistence.folder_path".into(), val.clone());
result.insert("bridge.persistence.folder_path".into(), val);
}

Ok(result)
}
}
Expand Down Expand Up @@ -246,14 +271,16 @@ mod tests {
use super::{AuthConfig, ListenerConfig, Settings, TcpTransportConfig, TlsTransportConfig};

const DAYS: u64 = 24 * 60 * 60;
const MINS: u64 = 60;

#[test]
#[serial(env_settings)]
fn check_env_var_name_override() {
let _max_inflight_messages = env::set_var("mqttBroker__max_inflight_messages", "17");
let _max_queued_messages = env::set_var("mqttBroker__max_queued_messages", "1001");
let _max_queued_bytes = env::set_var("mqttBroker__max_queued_bytes", "1");
let _when_full = env::set_var("mqttBroker__when_full", "drop_old");
let _max_inflight_messages = env::set_var("MqttBroker__MaxInflightMessages", "17");
let _max_queued_messages = env::set_var("MqttBroker__MaxQueuedMessages", "1001");
let _max_queued_bytes = env::set_var("MqttBroker__MaxQueuedBytes", "1");
let _when_full = env::set_var("MqttBroker__WhenFull", "drop_old");
let _storage_folder = env::set_var("StorageFolder", "/iotedge/storage");

let settings = Settings::new().unwrap();

Expand All @@ -269,39 +296,13 @@ mod tests {
QueueFullAction::DropOld,
)
);
}

#[test]
#[serial(env_settings)]
fn check_other_env_vars_cant_be_overridden() {
let _broker_session_max_inflight_messages =
env::set_var("broker__session__max_inflight_messages", "17");
let _max_queued_messages = env::set_var("broker__session__max_queued_messages", "1001");
let _max_queued_bytes = env::set_var("broker__session__max_queued_bytes", "1");
let _when_full = env::set_var("broker__session__when_full", "drop_old");

let _tcp = env::set_var("listener__tcp__address", "0.0.0.0:1880");
let _tls = env::set_var("listener__tls__address", "0.0.0.0:1880");
let _system = env::set_var("listener__system__address", "0.0.0.0:1880");
let _port = env::set_var("auth__port", "7121");
let _base_url = env::set_var("auth__base_url", "/authWRONGticate");

let settings = Settings::new().unwrap();

let listener = &ListenerConfig::new(
Some(TcpTransportConfig::new("0.0.0.0:1883")),
Some(TlsTransportConfig::new("0.0.0.0:8883", None)),
TcpTransportConfig::new("0.0.0.0:1882"),
);
let auth = &AuthConfig::new(7120, "/authenticate/");

assert_eq!(settings.broker().session(), &SessionConfig::default());
assert_eq!(
settings.broker().persistence(),
&SessionPersistenceConfig::default()
&SessionPersistenceConfig::new(
"/iotedge/storage".into(),
Duration::from_secs(5 * MINS)
)
);
assert_eq!(settings.listener(), listener);
assert_eq!(settings.auth(), auth);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqtt-generic/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"when_full": "drop_new"
},
"persistence": {
"path": "/tmp/mqttd/",
"folder_path": "/tmp/mqttd/",
"time_interval": "5m"
}
}
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqttd/src/app/edgehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Bootstrap for EdgeHubBootstrap {
) -> Result<(Broker<Self::Authorizer>, FilePersistor<VersionedFileFormat>)> {
info!("loading state...");
let persistence_config = settings.broker().persistence();
let state_dir = persistence_config.file_path();
let state_dir = persistence_config.folder_path();

fs::create_dir_all(state_dir.clone())?;
let mut persistor = FilePersistor::new(state_dir, VersionedFileFormat::default());
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqttd/src/app/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Bootstrap for GenericBootstrap {
) -> Result<(Broker<Self::Authorizer>, FilePersistor<VersionedFileFormat>)> {
info!("loading state...");
let persistence_config = settings.broker().persistence();
let state_dir = persistence_config.file_path();
let state_dir = persistence_config.folder_path();

fs::create_dir_all(state_dir.clone())?;
let mut persistor = FilePersistor::new(state_dir, VersionedFileFormat::default());
Expand Down

0 comments on commit 6167323

Please sign in to comment.