forked from spirom/spark-streaming-with-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDirectServerDemo.scala
55 lines (36 loc) · 1.13 KB
/
DirectServerDemo.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package util
/**
* Run this first to verify that the embedded Kafka setup is working for you.
* It starts an embedded Kafka server, creates a topic, publishes some messages,
* reads them back and shuts down the embedded server.
*/
object DirectServerDemo {
def main (args: Array[String]) {
val topic = "foo"
println("*** about to start embedded Kafka server")
val kafkaServer = new EmbeddedKafkaServer()
kafkaServer.start()
println("*** server started")
kafkaServer.createTopic(topic, 4)
println("*** topic [" + topic + "] created")
Thread.sleep(5000)
val kafkaClient = new SimpleKafkaClient(kafkaServer)
println("*** about to produce messages")
kafkaClient.send(topic, Seq(
("Key_1", "Value_1"),
("Key_2", "Value_2"),
("Key_3", "Value_3"),
("Key_4", "Value_4"),
("Key_5", "Value_5")
))
println("*** produced messages")
Thread.sleep(5000)
println("*** about to consume messages")
kafkaClient.consumeAndPrint(
topic,
5)
println("*** stopping embedded Kafka server")
kafkaServer.stop()
println("*** done")
}
}