Skip to content

Commit

Permalink
[BAHIR-186] SSL support in MQTT structured streaming
Browse files Browse the repository at this point in the history
Closes apache#74
  • Loading branch information
lukasz-antoniak authored and lresende committed Dec 12, 2018
1 parent 0601698 commit a73ab48
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 32 deletions.
38 changes: 25 additions & 13 deletions sql-streaming-mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,31 @@ Setting values for option `localStorage` and `clientId` helps in recovering in c

This connector uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).

* `brokerUrl` An URL MqttClient connects to. Set this or `path` as the URL of the Mqtt Server. e.g. tcp://localhost:1883.
* `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
* `topic` Topic MqttClient subscribes to.
* `clientId` clientId, this client is associated with. Provide the same value to recover a stopped source client. MQTT sink ignores client identifier, because Spark batch can be distributed across multiple workers whereas MQTT broker does not allow simultanous connections with same ID from multiple hosts.
* `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
* `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
* `password` Sets the password to use for the connection.
* `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
* `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
* `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
* `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
* `maxInflight` Same as `MqttConnectOptions.setMaxInflight`
* `autoReconnect` Same as `MqttConnectOptions.setAutomaticReconnect`
| Parameter name | Description | Eclipse Paho reference |
|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
| `brokerUrl` | URL MQTT client connects to. Specify this parameter or _path_. Example: _tcp://localhost:1883_, _ssl://localhost:1883_. | |
| `persistence` | Defines how incoming messages are stored. If _memory_ is provided as value for this option, recovery on restart is not supported. Otherwise messages are stored on disk and parameter _localStorage_ may define target directory. | |
| `topic` | Topic which client subscribes to. | |
| `clientId` | Uniquely identifies client instance. Provide the same value to recover a stopped source client. MQTT sink ignores client identifier, because Spark batch can be distributed across multiple workers whereas MQTT broker does not allow simultaneous connections with same ID from multiple hosts. | |
| `QoS` | The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. | |
| `username` | User name used to authenticate with MQTT server. Do not set it, if server does not require authentication. Leaving empty may lead to errors. | `MqttConnectOptions.setUserName` |
| `password` | User password. | `MqttConnectOptions.setPassword` |
| `cleanSession` | Setting to _true_ starts a clean session, removes all check-pointed messages persisted during previous run. Defaults to `false`. | `MqttConnectOptions.setCleanSession` |
| `connectionTimeout` | Sets the connection timeout, a value of _0_ is interpreted as wait until client connects. | `MqttConnectOptions.setConnectionTimeout` |
| `keepAlive` | Sets the "keep alive" interval in seconds. | `MqttConnectOptions.setKeepAliveInterval` |
| `mqttVersion` | Specify MQTT protocol version. | `MqttConnectOptions.setMqttVersion` |
| `maxInflight` | Sets the maximum inflight requests. Useful for high volume traffic. | `MqttConnectOptions.setMaxInflight` |
| `autoReconnect` | Sets whether the client will automatically attempt to reconnect to the server upon connectivity disruption. | `MqttConnectOptions.setAutomaticReconnect` |
| `ssl.protocol` | SSL protocol. Example: _SSLv3_, _TLS_, _TLSv1_, _TLSv1.2_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.protocol` |
| `ssl.key.store` | Absolute path to key store file. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStore` |
| `ssl.key.store.password` | Key store password. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStorePassword` |
| `ssl.key.store.type` | Key store type. Example: _JKS_, _JCEKS_, _PKCS12_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStoreType` |
| `ssl.key.store.provider` | Key store provider. Example: _IBMJCE_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.keyStoreProvider` |
| `ssl.trust.store` | Absolute path to trust store file. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStore` |
| `ssl.trust.store.password` | Trust store password. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStorePassword` |
| `ssl.trust.store.type` | Trust store type. Example: _JKS_, _JCEKS_, _PKCS12_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStoreType` |
| `ssl.trust.store.provider` | Trust store provider. Example: _IBMJCEFIPS_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.trustStoreProvider` |
| `ssl.ciphers` | List of enabled cipher suites. Example: _SSL_RSA_WITH_AES_128_CBC_SHA_. | `MqttConnectOptions.setSSLProperties`, `com.ibm.ssl.enabledCipherSuites` |

## Environment variables

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.bahir.sql.streaming.mqtt

import java.util.Properties

import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttConnectOptions}
import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence}

Expand All @@ -26,6 +28,23 @@ import org.apache.bahir.utils.Logging


private[mqtt] object MQTTUtils extends Logging {
// Since data source configuration properties are case-insensitive,
// we have to introduce our own keys. Also, good for vendor independence.
private[mqtt] val sslParamMapping = Map(
"ssl.protocol" -> "com.ibm.ssl.protocol",
"ssl.key.store" -> "com.ibm.ssl.keyStore",
"ssl.key.store.password" -> "com.ibm.ssl.keyStorePassword",
"ssl.key.store.type" -> "com.ibm.ssl.keyStoreType",
"ssl.key.store.provider" -> "com.ibm.ssl.keyStoreProvider",
"ssl.trust.store" -> "com.ibm.ssl.trustStore",
"ssl.trust.store.password" -> "com.ibm.ssl.trustStorePassword",
"ssl.trust.store.type" -> "com.ibm.ssl.trustStoreType",
"ssl.trust.store.provider" -> "com.ibm.ssl.trustStoreProvider",
"ssl.ciphers" -> "com.ibm.ssl.enabledCipherSuites",
"ssl.key.manager" -> "com.ibm.ssl.keyManager",
"ssl.trust.manager" -> "com.ibm.ssl.trustManager"
)

private[mqtt] def parseConfigParams(config: Map[String, String]):
(String, String, String, MqttClientPersistence, MqttConnectOptions, Int) = {
def e(s: String) = new IllegalArgumentException(s)
Expand Down Expand Up @@ -78,6 +97,13 @@ private[mqtt] object MQTTUtils extends Logging {
mqttConnectOptions.setPassword(p.toCharArray)
case _ =>
}
val sslProperties = new Properties()
config.foreach(e => {
if (e._1.startsWith("ssl.")) {
sslProperties.setProperty(sslParamMapping(e._1), e._2)
}
})
mqttConnectOptions.setSSLProperties(sslProperties)

(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
}
Expand Down
Binary file not shown.
Binary file added sql-streaming-mqtt/src/test/resources/truststore.jks
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
import org.apache.bahir.utils.FileHelper


class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
class MQTTStreamSinkSuite(_ssl: Boolean) extends SparkFunSuite
with SharedSparkContext with BeforeAndAfter {
protected var mqttTestUtils: MQTTTestUtils = _
protected val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/mqtt-test/")
protected val messages = new mutable.HashMap[Int, String]
protected var testClient: MqttClient = _

before {
mqttTestUtils = new MQTTTestUtils(tempDir)
SparkEnv.get.conf.set("spark.mqtt.client.connect.attempts", "1")
mqttTestUtils = new MQTTTestUtils(tempDir, ssl = _ssl)
mqttTestUtils.setup()
tempDir.mkdirs()
messages.clear()
Expand All @@ -70,17 +72,22 @@ class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with Bef
}

protected def sendToMQTT(dataFrame: DataFrame): StreamingQuery = {
dataFrame.writeStream
val protocol = if (_ssl) "ssl" else "tcp"
val writer = dataFrame.writeStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
.option("topic", "test").option("localStorage", tempDir.getAbsolutePath)
.option("clientId", "clientId").option("QoS", "2")
.start("tcp://" + mqttTestUtils.brokerUri)
if (_ssl) {
writer.option("ssl.trust.store", mqttTestUtils.clientTrustStore.getAbsolutePath)
.option("ssl.trust.store.type", "JKS")
.option("ssl.trust.store.password", mqttTestUtils.clientTrustStorePassword)
}
writer.start(protocol + "://" + mqttTestUtils.brokerUri)
}
}

class BasicMQTTSinkSuite extends MQTTStreamSinkSuite {
class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
test("broker down") {
SparkEnv.get.conf.set("spark.mqtt.client.connect.attempts", "1")
SparkSession.setActiveSession(SparkSession.builder().getOrCreate())
val provider = new MQTTStreamSinkProvider
val parameters = Map(
Expand Down Expand Up @@ -138,7 +145,19 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite {
}
}

class StressTestMQTTSink extends MQTTStreamSinkSuite {
class MQTTSSLSinkSuite extends MQTTStreamSinkSuite(true) {
test("verify SSL connectivity") {
val msg1 = "Hello, World!"
val msg2 = "MQTT is a message queue."
val (_, dataFrame) = createContextAndDF(msg1, msg2)

sendToMQTT(dataFrame).awaitTermination(5000)

assert(Set(msg1, msg2).equals(messages.values.toSet))
}
}

class StressTestMQTTSink extends MQTTStreamSinkSuite(false) {
// run with -Xmx1024m
test("Send and receive messages of size 100MB.") {
val freeMemory: Long = Runtime.getRuntime.freeMemory()
Expand Down
Loading

0 comments on commit a73ab48

Please sign in to comment.