forked from spirom/spark-streaming-with-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMultipleConsumerGroups.scala
145 lines (111 loc) · 4.07 KB
/
MultipleConsumerGroups.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import java.util.{Arrays, Properties}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import util.{EmbeddedKafkaServer, SimpleKafkaClient, SparkKafkaSink}
/**
* This example creates two streams based on two different consumer groups, so both streams
* get a copy of the same data. It's simply a matter of specifying the two names of the
* two different consumer groups in the two calls to createStream() -- no special
* configuration is needed.
*/
object MultipleConsumerGroups {
/**
* Publish some data to a topic. Encapsulated here to ensure serializability.
* @param max
* @param sc
* @param topic
* @param config
*/
def send(max: Int, sc: SparkContext, topic: String, config: Properties): Unit = {
// put some data in an RDD and publish to Kafka
val numbers = 1 to max
val numbersRDD = sc.parallelize(numbers, 4)
val kafkaSink = sc.broadcast(SparkKafkaSink(config))
println("*** producing data")
numbersRDD.foreach { n =>
kafkaSink.value.send(topic, "key_" + n, "string_" + n)
}
}
def main (args: Array[String]) {
val topic = "foo"
val kafkaServer = new EmbeddedKafkaServer()
kafkaServer.start()
kafkaServer.createTopic(topic, 4)
val conf = new SparkConf().setAppName("MultipleConsumerGroups").setMaster("local[4]")
val sc = new SparkContext(conf)
// streams will produce data every second
val ssc = new StreamingContext(sc, Seconds(1))
val max = 1000
//
// the first stream subscribes to consumer group Group1
//
val props1: Properties = SimpleKafkaClient.getBasicStringStringConsumer(kafkaServer, "Group1")
val kafkaStream1 =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Arrays.asList(topic),
props1.asInstanceOf[java.util.Map[String, Object]]
)
)
kafkaStream1.foreachRDD(r => {
println("*** [stream 1] got an RDD, size = " + r.count())
r.foreach(s => println("*** [stream 1] " + s))
if (r.count() > 0) {
println("*** [stream 1] " + r.getNumPartitions + " partitions")
r.glom().foreach(a => println("*** [stream 1] partition size = " + a.size))
}
})
//
// a second stream, subscribing to the second consumer group (Group2), will
// see all of the same data
//
val props2: Properties = SimpleKafkaClient.getBasicStringStringConsumer(kafkaServer, "Group2")
val kafkaStream2 =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Arrays.asList(topic),
props2.asInstanceOf[java.util.Map[String, Object]]
)
)
kafkaStream2.foreachRDD(r => {
println("*** [stream 2] got an RDD, size = " + r.count())
r.foreach(s => println("*** [stream 2] " + s))
if (r.count() > 0) {
println("*** [stream 2] " + r.getNumPartitions + " partitions")
r.glom().foreach(a => println("*** [stream 2] partition size = " + a.size))
}
})
ssc.start()
println("*** started termination monitor")
// streams seem to need some time to get going
Thread.sleep(5000)
val producerThread = new Thread("Streaming Termination Controller") {
override def run() {
val client = new SimpleKafkaClient(kafkaServer)
send(max, sc, topic, client.basicStringStringProducer)
Thread.sleep(5000)
println("*** requesting streaming termination")
ssc.stop(stopSparkContext = false, stopGracefully = true)
}
}
producerThread.start()
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
println("*** streaming exception caught in monitor thread")
}
}
// stop Spark
sc.stop()
// stop Kafka
kafkaServer.stop()
println("*** done")
}
}