Skip to content

Commit

Permalink
@rollulus kafka-streams-plumber
Browse files Browse the repository at this point in the history
  • Loading branch information
zqhxuyuan committed Jul 5, 2017
1 parent 48011d1 commit f445a3c
Show file tree
Hide file tree
Showing 22 changed files with 927 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@

- [x] [kafkastreams-cep](<https://github.com/fhussonnois/kafkastreams-cep>)
- [x] [kafka-streams-drools](<https://github.com/benwatson528/kafka-streams-drools>)
- [ ] [siddhi-kafka-cep](<https://github.com/sbcd90/siddhi-kafka-cep>)
- [x] [siddhi-kafka-cep](<https://github.com/sbcd90/siddhi-kafka-cep>)

### bigdata

Expand Down
1 change: 1 addition & 0 deletions applications/kafka-streams-plumber
Submodule kafka-streams-plumber added at 3ff4c3
1 change: 1 addition & 0 deletions applications/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<module>kafka-streams-drools</module>
<module>kafkastreams-cep</module>
<module>siddhi-kafka-cep</module>
<module>kafka-streams-plumber</module>
</modules>

</project>
13 changes: 13 additions & 0 deletions clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
</repositories>

<dependencies>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>grizzled-slf4j_2.11</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.github.jsendnsca</groupId>
<artifactId>jsendnsca</artifactId>
<version>2.1.1</version>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
Expand Down Expand Up @@ -90,6 +101,8 @@
<type>pom</type>
</dependency>
-->


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import akka.stream.scaladsl._

import scala.concurrent.Future
import scala.util.{Failure, Success}

import scala.concurrent._
import ExecutionContext.Implicits.global

/**
* Created by zhengqh on 17/6/26.
Expand Down
109 changes: 109 additions & 0 deletions clients/src/main/scala/org/apache/kafka/cep/CEP.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.apache.kafka.cep

import java.util.Properties

import grizzled.slf4j.Logger
import kafka.consumer.{Consumer, ConsumerConfig}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.kafka.cep.utils.{Config, Observed}

import scala.collection.JavaConversions._

trait DistributedMode {}

abstract class CEP(val config: Config) extends Observed {
implicit val system: CEP = this
val inDistributedMode: Boolean = this.isInstanceOf[DistributedMode]
var detectors: Set[Detector] = Set[Detector]()
val shell = new CEPShell(CEP.this)
val log = Logger(classOf[CEP])

//
def register[T <: Detector](detector: T) = detectors += detector

//def register(kafkaStream: KafkaStream) = messageTypes += messageStream

def resetZooKeeperOffsets: CEP = {
val zk: ZkClient = new ZkClient(config.getProperty("kafka.zk.connect"), 10000, 10000, new ZkSerializer {
override def deserialize(data: Array[Byte]): Object = if (data == null) null else new String(data)

def serialize(data: Object): Array[Byte] = data.toString.getBytes
})
if (zk.exists("/consumers/" + config.getProperty("consumer.id"))) {
log.info("Resetting consumed offsets.." + "/consumers/" + config.getProperty("consumer.id"))
zk.deleteRecursive("/consumers/" + config.getProperty("consumer.id"))
}
zk.close
this
}

def startWithShell {
start
shell.init
}

def start {
MessageImpulseGenerator.start
}

def stop: Unit = {
MessageImpulseGenerator.stop
}

def processImpulse(impulse: Any) {
detectors.foreach(d {
d.handle(this, impulse)
})
}

def create[T <: Detector](clz: Class[T], args: AnyRef*): T = {
val constructors = clz.getConstructors()
.filter(c {
c.getParameterTypes().size.equals(args.size)
//TODO match Seq[_] with consecutive args and then foldLeft to Boolean
})
val c = constructors(0)
println(c.getParameterTypes()(0))
println(args(0).getClass())
c.newInstance(args: _*).asInstanceOf[T]
}

object MessageImpulseGenerator {
val kafka08Config = new Properties()

config.filter(_._1.startsWith("kafka.")).foreach { case (key, value) => {
kafka08Config.put(key.substring("kafka.".length), value)
}}

if (!kafka08Config.containsKey("group.id")) {
throw new IllegalArgumentException("configuration must contain kafka.group.id")
}
if (!kafka08Config.containsKey("zookeeper.connect")) {
throw new IllegalArgumentException("configuration must contain kafka.zookeeper.connect")
}


val consumer = Consumer.create(new ConsumerConfig(kafka08Config))

def start = {
//consumer.
}
def stop = consumer.shutdown

// override def build: VDNADataStreamProcessor[VDNAMessage] = {
// new VDNADataStreamProcessor[VDNAMessage] {
// override def process(message: VDNAMessage) = {
// if (message != null) {
// CEP.this.processImpulse(message);
// }
// }
//
// override def onError[P <: VDNAMessage](vdnaDataStream: VDNADataStream[P], throwable: Throwable): Unit = {
// throwable.printStackTrace();
// }
// }
// }
}

}
88 changes: 88 additions & 0 deletions clients/src/main/scala/org/apache/kafka/cep/CEPShell.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.apache.kafka.cep

import java.util.concurrent.atomic.AtomicInteger

import org.apache.kafka.cep.utils.{Observed, Observer, Shell, ShellCommand}

import scala.io.Source

class CEPShell(val system: CEP) extends Shell(system) {
register(Exit)
register(Dump)
register(Next)
register(Show)
}

object Dump extends ShellCommand[CEP] {
override def getShortHelp = "show dump of the current outstanding composite events"
override def invoke(stdin: Source, args: String) = {
val dump = context.detectors.flatMap(d => d.outstanding)
if (dump != null) {
for (event <- dump) {
println(event)
}
}
println("---------------------------------------------------------------------------")
}
}

object Exit extends ShellCommand[CEP] {
override def getShortHelp = "Shutdown processors and exit."
override def invoke(stdin: Source, args: String) = {
println("Closing all connections..")
//context.cassandra.close
context.stop
println("Bye.")
shell.exit
}
}

object Show extends ShellCommand[CEP] {
override def getShortHelp = "Show regtistred detectors"
override def invoke(stdin: Source, args: String) = {
context.detectors.map(d => println(d))
}
}
object Next extends ShellCommand[CEP] {
val count: Int = 1
override def getShortHelp = "[n] Wait for next n top level events to occure [event-name-filter] Wait for the next single event of which name contains the filter argument"
override def invoke(stdin: Source, args: String) = {
var eventName: String = null
var count = new AtomicInteger(1)
args match {
case null => {}
case x if (x.forall(_.isDigit)) => count = new AtomicInteger(Integer.valueOf(args))
case _ => { eventName = args }
}

val observer = new Observer {
override def handle(observed: Observed, event: Any) = {
if (count.decrementAndGet >= 0) {
println(event)
}
context.synchronized {
context.notifyAll
}
}
}

context.detectors.foreach(d => if (eventName == null || (d.name.toLowerCase contains eventName.toLowerCase)) {
println("Adding observer for " + d);
d.addObserver(observer);
})

try {
while (count.get > 0) {
context.synchronized {
context.wait
}
}
} finally {
System.out.println("---------------------------------------------------------------------------")
for (d <- context.detectors) yield {
d.removeObserver(observer);
}

}
}
}
14 changes: 14 additions & 0 deletions clients/src/main/scala/org/apache/kafka/cep/ChainDetector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.kafka.cep

import java.util.concurrent.TimeUnit

import org.apache.kafka.cep.utils.Observed

abstract class ChainDetector(timeFrame: Long, unit: TimeUnit) (implicit system:CEP)
extends Detector(timeFrame, unit) {

override def handle(observed: Observed, event: Any) = {
// TODO detects events in sequence - e.g. each consequent event is checked only if the previous one happened
}

}
28 changes: 28 additions & 0 deletions clients/src/main/scala/org/apache/kafka/cep/Composite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.kafka.cep

import java.util.concurrent.TimeUnit

import org.apache.kafka.cep.utils.Observed

class Composite(timeFrame: Long, unit: TimeUnit, compositeDetectors: Seq[Detector])(implicit val system: CEP)
extends Detector(timeFrame, unit) {

def this(detector: Detector)(implicit system: CEP) = this(0, TimeUnit.SECONDS, Seq(detector))

val detectors: Seq[Detector] = compositeDetectors.map(detector detector(Composite.this))

override def toString: String = name + " = {" + detectors.foldLeft("")(_ + "\n" + _).replace("\n", "\n\t") + "\n}"

override def outstanding: List[Event] = detectors.flatMap(d d.outstanding).toList ++ super.outstanding

override def handle(observed: Observed, impulse: Any) = observed match {
case detector: Detector if (detectors.contains(detector)) {
val event = impulse.asInstanceOf[Event]
if (event.isComplete) {
mergeFutureEvent(event, detectors.indexOf(detector), detectors.length)
}
}
case _ {}
}

}
Loading

0 comments on commit f445a3c

Please sign in to comment.