Skip to content

Commit

Permalink
Embedded Kafka support in OpenWhisk Standalone mode (apache#4628)
Browse files Browse the repository at this point in the history
Enable support for Embedded Kafka to enable Kafka based feature development. It also support launching Kafdrop 3 based Ui for visualizing Kafka related metadata.
  • Loading branch information
chetanmeh authored Sep 19, 2019
1 parent e9dd207 commit 94fccbc
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 29 deletions.
67 changes: 55 additions & 12 deletions core/standalone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,37 @@ $ java -jar openwhisk-standalone.jar -h
\ \ / \/ \___/| .__/ \___|_| |_|__/\__|_| |_|_|___/_|\_\
\___\/ tm |_|
--api-gw Enable API Gateway support
--api-gw-port <arg> Api Gateway Port
--clean Clean any existing state like database
-c, --config-file <arg> application.conf which overrides the default
standalone.conf
--couchdb Enable CouchDB support
-d, --data-dir <arg> Directory used for storage
--disable-color-logging Disables colored logging
-m, --manifest <arg> Manifest json defining the supported runtimes
-p, --port <arg> Server port
--api-gw Enable API Gateway support
--api-gw-port <arg> Api Gateway Port
--clean Clean any existing state like database
-c, --config-file <arg> application.conf which overrides the default
standalone.conf
--couchdb Enable CouchDB support
-d, --data-dir <arg> Directory used for storage
--dev-mode Developer mode speeds up the startup by
disabling preflight checks and avoiding
explicit pulls.
--disable-color-logging Disables colored logging
--kafka Enable embedded Kafka support
--kafka-docker-port <arg> Kafka port for use by docker based services.
If not specified then 9091 or some random
free port (if 9091 is busy) would be used
--kafka-port <arg> Kafka port. If not specified then 9092 or
some random free port (if 9092 is busy) would
be used
--kafka-ui Enable Kafka UI
-m, --manifest <arg> Manifest json defining the supported runtimes
-p, --port <arg> Server port
-v, --verbose
-h, --help Show help message
--version Show version of this program
--zk-port <arg> Zookeeper port. If not specified then 2181 or
some random free port (if 2181 is busy) would
be used
-h, --help Show help message
--version Show version of this program
OpenWhisk standalone server
```

Sections below would illustrate some of the supported options
Expand Down Expand Up @@ -204,7 +219,35 @@ Api Gateway mode can be enabled via `--api-gw` flag. In this mode upon launch a
would be launched on port `3234` (can be changed with `--api-gw-port`). In this mode you can make use of the
[api gateway][4] support.

#### Using Kafka

Standalone OpenWhisk supports launching an [embedded kafka][5]. This mode is mostly useful for developers working on OpenWhisk
implementation itself.

```
java -jar openwhisk-standalone.jar --kafka
```

It also supports launching a Kafka UI based on [Kafdrop 3][6] which enables seeing the topics created and structure of messages
exchanged on those topics.

```
java -jar openwhisk-standalone.jar --kafka --kafka-ui
```

By default the ui server would be accessible at http://localhost:9000. In case 9000 port is busy then a random port would
be selected. TO find out the port look for message in log like below (or grep for `whisk-kafka-drop-ui`)

```
[ 9092 ] localhost:9092 (kafka)
[ 9092 ] 192.168.65.2:9091 (kafka-docker)
[ 2181 ] Zookeeper (zookeeper)
[ 9000 ] http://localhost:9000 (whisk-kafka-drop-ui)
```

[1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/cli.md
[2]: https://github.com/apache/incubator-openwhisk/blob/master/docs/samples.md
[3]: https://github.com/apache/incubator-openwhisk-apigateway
[4]: https://github.com/apache/incubator-openwhisk/blob/master/docs/apigateway.md
[5]: https://github.com/embeddedkafka/embedded-kafka
[6]: https://github.com/obsidiandynamics/kafdrop
5 changes: 5 additions & 0 deletions core/standalone/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ dependencies {
compile project(':tools:admin')
compile 'org.rogach:scallop_2.12:3.3.1'

//Tried with 0.16.0 has support for Kafka 0.11.0 https://github.com/embeddedkafka/embedded-kafka/tree/v0.16.0
//But that causes class compatability issue die to use of newer client version
compile ("io.github.embeddedkafka:embedded-kafka_2.12:2.1.1")
compile ("org.scala-lang:scala-reflect:${gradle.scala.version}")

testCompile 'junit:junit:4.11'
testCompile 'org.scalatest:scalatest_2.12:3.0.5'
}
Expand Down
2 changes: 2 additions & 0 deletions core/standalone/src/main/resources/logback-standalone.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
<logger name="org.apache.zookeeper" level="ERROR" />
<logger name="kafka.server" level="ERROR" />

<root level="${logback.log.level:-INFO}">
<appender-ref ref="console" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.standalone

import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import kafka.server.KafkaConfig
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.commons.io.FileUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.io.Directory
import scala.util.Try

class KafkaLauncher(docker: StandaloneDockerClient,
kafkaPort: Int,
kafkaDockerPort: Int,
zkPort: Int,
workDir: File,
kafkaUi: Boolean)(implicit logging: Logging,
ec: ExecutionContext,
actorSystem: ActorSystem,
materializer: ActorMaterializer,
tid: TransactionId) {

def run(): Future[Seq[ServiceContainer]] = {
for {
kafkaSvcs <- runKafka()
uiSvcs <- if (kafkaUi) runKafkaUI() else Future.successful(Seq.empty[ServiceContainer])
} yield kafkaSvcs ++ uiSvcs
}

def runKafka(): Future[Seq[ServiceContainer]] = {

//Below setting based on https://rmoff.net/2018/08/02/kafka-listeners-explained/
// We configure two listeners where one is used for host based application and other is used for docker based application
// to connect to Kafka server running on host
// Here controller / invoker will use LISTENER_LOCAL since they run in the same JVM as the embedded Kafka
// and Kafka UI will run in a Docker container and use LISTENER_DOCKER
val brokerProps = Map(
KafkaConfig.ListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://localhost:$kafkaDockerPort",
KafkaConfig.AdvertisedListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://${StandaloneDockerSupport
.getLocalHostIp()}:$kafkaDockerPort",
KafkaConfig.ListenerSecurityProtocolMapProp -> "LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT",
KafkaConfig.InterBrokerListenerNameProp -> "LISTENER_LOCAL")
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps)

val t = Try {
EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
EmbeddedKafka.startKafka(createDir("kafka"))
}

Future
.fromTry(t)
.map(
_ =>
Seq(
ServiceContainer(kafkaPort, s"localhost:$kafkaPort", "kafka"),
ServiceContainer(
kafkaDockerPort,
s"${StandaloneDockerSupport.getLocalHostIp()}:$kafkaDockerPort",
"kafka-docker"),
ServiceContainer(zkPort, "Zookeeper", "zookeeper")))
}

def runKafkaUI(): Future[Seq[ServiceContainer]] = {
val hostIp = StandaloneDockerSupport.getLocalHostIp()
val port = checkOrAllocatePort(9000)
val env = Map(
"ZOOKEEPER_CONNECT" -> s"$hostIp:$zkPort",
"KAFKA_BROKERCONNECT" -> s"$hostIp:$kafkaDockerPort",
"JVM_OPTS" -> "-Xms32M -Xmx64M",
"SERVER_SERVLET_CONTEXTPATH" -> "/")

logging.info(this, s"Starting Kafka Drop UI port: $port")
val name = containerName("kafka-drop-ui")
val params = Map("-p" -> Set(s"$port:9000"))
val args = createRunCmd(name, env, params)

val f = docker.runDetached("obsidiandynamics/kafdrop", args, true)
f.map(_ => Seq(ServiceContainer(port, s"http://localhost:$port", name)))
}

private def createDir(name: String) = {
val dir = new File(workDir, name)
FileUtils.forceMkdir(dir)
Directory(dir)
}
}

object KafkaAwareLeanBalancer extends LoadBalancerProvider {
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts

override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = LeanBalancer.instance(whiskConfig, instance)
}

object KafkaLauncher {
val preferredKafkaPort = 9092
val preferredKafkaDockerPort = preferredKafkaPort - 1
val preferredZkPort = 2181
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.openwhisk.core.cli.WhiskAdmin
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.standalone.ColorOutput.clr
import org.apache.openwhisk.standalone.StandaloneDockerSupport.checkOrAllocatePort
import org.rogach.scallop.ScallopConf
import pureconfig.loadConfigOrThrow

Expand All @@ -41,6 +42,8 @@ import scala.concurrent.{Await, ExecutionContext}
import scala.io.AnsiColor
import scala.util.{Failure, Success, Try}

import KafkaLauncher._

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
banner(StandaloneOpenWhisk.banner)
footer("\nOpenWhisk standalone server")
Expand All @@ -63,6 +66,34 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val apiGwPort = opt[Int](descr = "Api Gateway Port", default = Some(3234), noshort = true)
val dataDir = opt[File](descr = "Directory used for storage", default = Some(StandaloneOpenWhisk.defaultWorkDir))

val kafka = opt[Boolean](descr = "Enable embedded Kafka support", noshort = true)
val kafkaUi = opt[Boolean](descr = "Enable Kafka UI", noshort = true)

//The port option below express following usage. Note that "preferred"" port values are not configured as default
// on purpose
// - no config - Attempt to use default port. For e.g. 9092 for Kafka
// - non config if default preferred port is busy then select a random port
// - port config provided - Then this port would be used. If port is already busy
// then that service would not start. This is mostly meant to be used for test setups
// where test logic would determine a port and needs the service to start on that port only
val kafkaPort = opt[Int](
descr =
s"Kafka port. If not specified then $preferredKafkaPort or some random free port (if $preferredKafkaPort is busy) would be used",
noshort = true,
required = false)

val kafkaDockerPort = opt[Int](
descr = s"Kafka port for use by docker based services. If not specified then $preferredKafkaDockerPort or some random free port " +
s"(if $preferredKafkaDockerPort is busy) would be used",
noshort = true,
required = false)

val zkPort = opt[Int](
descr =
s"Zookeeper port. If not specified then $preferredZkPort or some random free port (if $preferredZkPort is busy) would be used",
noshort = true,
required = false)

verify()

val colorEnabled = !disableColorLogging()
Expand Down Expand Up @@ -148,8 +179,12 @@ object StandaloneOpenWhisk extends SLF4JLogging {
startApiGateway(conf, dockerClient, dockerSupport)
} else (-1, Seq.empty)

val (kafkaPort, kafkaSvcs) = if (conf.kafka()) {
startKafka(workDir, dockerClient, conf, conf.kafkaUi())
} else (-1, Seq.empty)

val couchSvcs = if (conf.couchdb()) Some(startCouchDb(dataDir, dockerClient)) else None
val svcs = Seq(apiGwSvcs, couchSvcs.toList).flatten
val svcs = Seq(apiGwSvcs, couchSvcs.toList, kafkaSvcs).flatten
if (svcs.nonEmpty) {
new ServiceInfoLogger(conf, svcs, dataDir).run()
}
Expand Down Expand Up @@ -380,7 +415,7 @@ object StandaloneOpenWhisk extends SLF4JLogging {
ec: ExecutionContext,
materializer: ActorMaterializer): ServiceContainer = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "couchDB")
val port = StandaloneDockerSupport.checkOrAllocatePort(5984)
val port = checkOrAllocatePort(5984)
val dbDataDir = new File(dataDir, "couchdb")
FileUtils.forceMkdir(dbDataDir)
val db = new CouchDBLauncher(dockerClient, port, dbDataDir)
Expand All @@ -393,6 +428,42 @@ object StandaloneOpenWhisk extends SLF4JLogging {
Await.result(g, 5.minutes)
}

private def startKafka(workDir: File, dockerClient: StandaloneDockerClient, conf: Conf, kafkaUi: Boolean)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka")
val k = new KafkaLauncher(
dockerClient,
kafkaPort,
getPort(conf.kafkaDockerPort.toOption, preferredKafkaDockerPort),
getPort(conf.zkPort.toOption, preferredZkPort),
workDir,
kafkaUi)

val f = k.run()
val g = f.andThen {
case Success(_) =>
logging.info(
this,
s"Kafka started successfully at http://${StandaloneDockerSupport.getLocalHostName()}:$kafkaPort")
case Failure(t) =>
logging.error(this, "Error starting Kafka" + t)
}
val services = Await.result(g, 5.minutes)

setConfigProp(WhiskConfig.kafkaHostList, s"localhost:$kafkaPort")
setSysProp("whisk.spi.MessagingProvider", "org.apache.openwhisk.connector.kafka.KafkaMessagingProvider")
setSysProp("whisk.spi.LoadBalancerProvider", "org.apache.openwhisk.standalone.KafkaAwareLeanBalancer")
(kafkaPort, services)
}

private def getPort(configured: Option[Int], preferred: Int): Int = {
configured.getOrElse(checkOrAllocatePort(preferred))
}

private def configureDevMode(): Unit = {
setSysProp("whisk.docker.standalone.container-factory.pull-standard-images", "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,21 @@
package org.apache.openwhisk.standalone

import common.WskProps
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Canceled, Outcome}
import system.basic.WskRestBasicTests

@RunWith(classOf[JUnitRunner])
class StandaloneCouchTests extends WskRestBasicTests with StandaloneServerFixture {
class StandaloneCouchTests extends WskRestBasicTests with StandaloneServerFixture with StandaloneSanityTestSupport {
override implicit val wskprops = WskProps().copy(apihost = serverUrl)

override protected def extraArgs: Seq[String] =
Seq("--couchdb", "--data-dir", FilenameUtils.concat(FileUtils.getTempDirectoryPath, "standalone"))
Seq("--couchdb")

override protected def extraVMArgs: Seq[String] = Seq("-Dwhisk.standalone.couchdb.volumes-enabled=false")

//This is more of a sanity test. So just run one of the test which trigger interaction with couchdb
//and skip running all other tests
private val supportedTests = Set("Wsk Action REST should create, update, get and list an action")

override def withFixture(test: NoArgTest): Outcome = {
if (supportedTests.contains(test.name)) {
super.withFixture(test)
} else {
Canceled()
}
}
override protected def supportedTests: Set[String] =
Set("Wsk Action REST should create, update, get and list an action")
}
Loading

0 comments on commit 94fccbc

Please sign in to comment.