Skip to content

Commit

Permalink
[SPARKTA-256][PLUGINS] Kafka output
Browse files Browse the repository at this point in the history
  • Loading branch information
eambrosio committed Jan 22, 2016
1 parent d6efec2 commit e0f6cf3
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 2 deletions.
28 changes: 28 additions & 0 deletions plugins/output-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.stratio.sparkta</groupId>
<artifactId>plugins</artifactId>
<version>0.9.0-SNAPSHOT</version>
</parent>

<artifactId>output-kafka</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.sparkta.plugin.output.kafka

import java.io.{Serializable => JSerializable}

import com.stratio.sparkta.plugin.output.kafka.producer.KafkaProducer
import com.stratio.sparkta.sdk.TypeOp.TypeOp
import com.stratio.sparkta.sdk.WriteOp.WriteOp
import com.stratio.sparkta.sdk._
import org.apache.spark.sql._


class KafkaOutput(keyName: String,
version: Option[Int],
properties: Map[String, JSerializable],
operationTypes: Option[Map[String, (WriteOp, TypeOp)]],
bcSchema: Option[Seq[TableSchema]])
extends Output(keyName, version, properties, operationTypes, bcSchema) with KafkaProducer {

override def upsert(dataFrame: DataFrame, tableName: String, timeDimension: String): Unit = {
dataFrame.toJSON.foreachPartition {
messages => messages.foreach(message =>
send(properties, tableName, message))
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.sparkta.plugin.output.kafka.producer

import java.io.{Serializable => JSerializable}
import java.util.Properties
import com.stratio.sparkta.sdk.ValidatingPropertyMap._

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.collection.mutable
import scala.util.{Success, Failure, Try}

trait KafkaProducer {
def send(properties: Map[String, JSerializable], topic: String, message: String): Unit = {
val keyedMessage: KeyedMessage[String, String] = new KeyedMessage[String, String](topic, message)
KafkaProducer.getProducer(topic, properties).send(keyedMessage)
}

}

object KafkaProducer {
private val DefaultHostPort: String = "localhost:9092"
private val DefaultKafkaSerializer: String = "kafka.serializer.StringEncoder"
private val DefaultRequiredAcks: String = "0"
private val DefaultProducerType = "sync"
private val DefaultBatchNumMessages = "200"

private val producers: mutable.Map[String, Producer[String, String]] = mutable.Map.empty

private val getString: ((Map[String, JSerializable], String, String) => String) = (properties, key, default) => {
properties.get(key) match {
case Some(v) => v.toString
case None => throw new IllegalStateException(s"The field $key is mandatory")
}
}

private val getList: ((Map[String, JSerializable], String, String) => String) = (properties, key, default) => {
Try(properties.getMapFromJsoneyString(key)) match {
case Success(jsonObject) => {
val valueAsSeq = jsonObject.map(c =>
(c.get("host") match {
case Some(value) => value.toString
case None => throw new IllegalStateException(s"The field $key is mandatory")
},
c.get("port") match {
case Some(value) => value.toString.toInt
case None => throw new IllegalStateException(s"The field $key is mandatory")
}))
(for (elem <- valueAsSeq) yield s"${elem._1}:${elem._2}").mkString(",")
}
case Failure(_) => throw new IllegalStateException(s"The field $key is mandatory")
}
}

private val mandatoryOptions: Map[String, ((Map[String, JSerializable], String, String) => AnyRef, String)] = Map(
"metadata.broker.list" ->(getList, DefaultHostPort),
"serializer.class" ->(getString, DefaultKafkaSerializer),
"request.required.acks" ->(getString, DefaultRequiredAcks),
"producer.type" ->(getString, DefaultProducerType),
"batch.num.messages" ->(getString, DefaultBatchNumMessages))


def extractOptions(properties: Map[String, JSerializable],
map: Map[String, ((Map[String, JSerializable], String, String) => AnyRef, String)]): Properties = {
val props = new Properties()
map.foreach {
case (key, (func, default)) =>
properties.get(key) match {
case Some(value) => props.put(key, func(properties, key, default))
case None => props.put(key, default)
}
}
props
}

def getProducer(topic: String, properties: Map[String, JSerializable]): Producer[String, String] = {
getInstance(getProducerKey(topic, properties), properties)
}

def getInstance(key: String, properties: Map[String, JSerializable]): Producer[String, String] = {
producers.getOrElse(key, {
val producer = createProducer(properties)
producers.put(key, producer)
producer
})
}

def createProducer(properties: Map[String, JSerializable]): Producer[String, String] = {
val props: Properties = extractOptions(properties, mandatoryOptions)
val producerConfig = new ProducerConfig(props)
new Producer[String, String](producerConfig)
}

def getProducerKey(topic: String, properties: Map[String, JSerializable]): String = {
s"${getList(properties, "metadata.broker.list", DefaultHostPort)}"
}
}
112 changes: 112 additions & 0 deletions plugins/output-kafka/src/test/scala/KafkaProducerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.Serializable
import java.util.Properties

import com.stratio.sparkta.plugin.output.kafka.producer.KafkaProducer
import kafka.producer
import org.apache.kafka.clients.producer.Producer
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner

import scala.util.Try

/**
* Created by eambrosio on 20/01/16.
*/

@RunWith(classOf[JUnitRunner])
class KafkaProducerTest extends FlatSpec with Matchers {


val mandatoryOptions: Map[String, ((Map[String, Serializable], String, String) => AnyRef, String)] = Map(
"metadata.broker.list" -> ((_, _, _) => "localhost:9092","localhost:9092"),
"serializer.class" -> ((_, _, _) => "kafka.serializer.StringEncoder","kafka.serializer.StringEncoder"),
"request.required.acks" -> ((_, _, _) => "1","1"),
"producer.type" -> ((_, _, _) => "sync","sync"),
"batch.num.messages" -> ((_, _, _) => "200","200")
)

val validProperties: Map[String, Serializable] = Map(
"metadata.broker.list" -> """[{"host":"localhost","port":"9092"},{"host":"localhost2","port":"90922"}]""",
"serializer.class" -> "kafka.serializer.StringEncoder",
"request.required.acks" -> 1,
"producer.type" -> "async",
"batch.num.messages" -> 200
)

val noValidProperties: Map[String, Serializable] = Map(
"metadata.broker.list" -> "",
"serializer.class" -> "",
"request.required.acks" -> "",
"producer.type" -> "",
"batch.num.messages" -> ""
)

"getProducerKey" should "concatenate topic with broker list" in {
KafkaProducer.getProducerKey("myTopic", validProperties) shouldBe "localhost:9092,localhost2:90922"
}

"getProducerKey" should "return error if no broker.list exists" in {
intercept[IllegalStateException] {
KafkaProducer.getProducerKey("myTopic", noValidProperties)
}
}

"extractOptions" should "extract mandatory options" in {
val options: Properties = KafkaProducer.extractOptions(validProperties, mandatoryOptions)
options.size shouldBe 5
options.get("metadata.broker.list") shouldBe "localhost:9092"
options.get("serializer.class") shouldBe "kafka.serializer.StringEncoder"
options.get("request.required.acks") shouldBe "1"
options.get("producer.type") shouldBe "sync"
options.get("batch.num.messages") shouldBe "200"
}

"extractOptions" should "extract default mandatory options when map is empty" in {
val options: Properties = KafkaProducer.extractOptions(Map.empty, mandatoryOptions)
options.size shouldBe 5
options.get("metadata.broker.list") shouldBe "localhost:9092"
options.get("serializer.class") shouldBe "kafka.serializer.StringEncoder"
options.get("request.required.acks") shouldBe "1"
options.get("producer.type") shouldBe "sync"
options.get("batch.num.messages") shouldBe "200"
}

"createProducer" should "return a valid KafkaProducer" in {
Try(KafkaProducer.createProducer(validProperties)).isSuccess shouldBe true
}

"createProducer" should "return exception with no valid properties" in {
Try(KafkaProducer.createProducer(noValidProperties)).isSuccess shouldBe false
}

"getInstance" should "create new instance the first time" in {
Try(KafkaProducer.getInstance("myTopic", validProperties)).isInstanceOf[Producer[String, String]]
}
"getInstance" should "return the same instance always" in {
val instance: producer.Producer[String, String] = KafkaProducer.getInstance("myTopic", validProperties)

instance should equal(KafkaProducer.getInstance("myTopic", validProperties))
}

"getProducer" should "return a KafKAProducer" in {
Try(KafkaProducer.getProducer("myTopic", validProperties)).isSuccess shouldBe true

}
}
1 change: 1 addition & 0 deletions plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<module>output-csv</module>
<module>output-parquet</module>
<module>output-solr</module>
<module>output-kafka</module>
<module>operator-count</module>
<module>operator-sum</module>
<module>operator-max</module>
Expand Down
86 changes: 85 additions & 1 deletion web/src/data-templates/output.json
Original file line number Diff line number Diff line change
Expand Up @@ -538,5 +538,89 @@
"qa": "fragment-details-redis-port"
}
]
}
},{
"name": "Kafka",
"modelType": "Kafka",
"description": {
"short": "Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.",
"long": "Apache Kafka is publish-subscribe messaging rethought as a distributed commit log. Based on the received-based approach (https://spark.apache.org/docs/latest/streaming-kafka-integration.html)",
"learnMore": "https://stratio.atlassian.net/wiki/display/SPARKTA0x9/7.4+Outputs#id-7.4Outputs-8.4.9Kafka"
},
"properties": [
{
"propertyId": "metadata.broker.list",
"propertyName": "",
"propertyType": "list",
"regexp": "",
"default": "",
"required": true,
"tooltip": "Kafka host/port to connect",
"qa": "fragment-details-kafka-metadata-broker-list",
"fields": [
{
"propertyId": "host",
"propertyName": "_HOST_",
"propertyType": "text",
"regexp": "((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(((?![0-9]+$)(?!.*-$)(?!-)[a-zA-Z0-9-]{2,63}))",
"default": "localhost",
"required": true,
"tooltip": "Kafka's address.",
"hidden": false,
"qa": "fragment-details-kafka-host"
},
{
"propertyId": "port",
"propertyName": "_PORT_",
"propertyType": "text",
"regexp": "(0|([1-9]\\d{0,3}|[1-5]\\d{4}|[6][0-5][0-5]([0-2]\\d|[3][0-5])))",
"default": "2181",
"required": true,
"tooltip": "Kafka's port.",
"hidden": false,
"qa": "fragment-details-kafka-port"
}
]
},
{
"propertyId": "serializer.class",
"propertyName": "_SERIALIZER_",
"propertyType": "text",
"regexp": "",
"default": "kafka.serializer.StringEncoder",
"required": true,
"tooltip": "The serializer class for messages",
"qa": "fragment-details-kafka-serializer-class"
},
{
"propertyId": "request.required.acks",
"propertyName": "_REQUIRED_ACKS_",
"propertyType": "text",
"regexp": "0|1",
"default": "0",
"required": true,
"tooltip": "Specify whether producer waits for an acknowledgment from the broker (1), or not (0)",
"qa": "fragment-details-kafka-request-required-acks"
},
{
"propertyId": "producer.type",
"propertyName": "_PRODUCER_TYPE_",
"propertyType": "text",
"regexp": "async|sync",
"default": "async",
"required": true,
"tooltip": "This parameter specifies whether the messages are sent asynchronously in a background thread",
"qa": "fragment-details-kafka-producer-type"
},
{
"propertyId": "batch.num.messages",
"propertyName": "_BATCH_NUM_MESSAGES_",
"propertyType": "text",
"regexp": "[0-9]+",
"default": "200",
"required": true,
"tooltip": "The number of messages to send in one batch when using async mode",
"qa": "fragment-details-kafka-batch-num-messages"
}
]
}
]
Loading

0 comments on commit e0f6cf3

Please sign in to comment.