Simple Clojure interface to Kafka.
It's currently a snapshot only until things flesh out a little more. API Documentation is also available.
Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.7 release was published as source. This library uses a build of the 0.7 incubator release published on Clojars.
Add the following to your Leiningen project.clj
:
[clj-kafka "0.0.2-0.7-SNAPSHOT"]
clj-kafka currently only supports Kafka 0.7.
Allows batching of messages:
(use 'clj-kafka.producer)
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
(map #(.getBytes %))
(map message)))
Or sending a single message:
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))
(use 'clj-kafka.consumer.simple)
(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))
(messages c f)
({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})
The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.
(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)
(def config {"zk.connect" "localhost:2181"
"groupid" "my-task-group"})
(with-consumer [c (consumer config)]
(take 5 (messages c "test")))
({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})
Copyright © 2012 Paul Ingles
Distributed under the Eclipse Public License, the same as Clojure.