forked from spirom/spark-streaming-with-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMultipleStreams.scala
141 lines (108 loc) · 3.93 KB
/
MultipleStreams.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import java.util.{Arrays, Properties}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import util.{EmbeddedKafkaServer, SimpleKafkaClient, SparkKafkaSink}
/**
* This example creates two streams based on a single consumer group, so they divide up the data.
* There's an interesting partitioning interaction here as the streams each get data from two fo the four
* topic partitions, and each produce RDDs with two partitions each.
*/
object MultipleStreams {
/**
* Publish some data to a topic. Encapsulated here to ensure serializability.
* @param max
* @param sc
* @param topic
* @param config
*/
def send(max: Int, sc: SparkContext, topic: String, config: Properties): Unit = {
// put some data in an RDD and publish to Kafka
val numbers = 1 to max
val numbersRDD = sc.parallelize(numbers, 4)
val kafkaSink = sc.broadcast(SparkKafkaSink(config))
println("*** producing data")
numbersRDD.foreach { n =>
kafkaSink.value.send(topic, "key_" + n, "string_" + n)
}
}
def main (args: Array[String]) {
val topic = "foo"
val kafkaServer = new EmbeddedKafkaServer()
kafkaServer.start()
kafkaServer.createTopic(topic, 4)
val conf = new SparkConf().setAppName("MultipleStreams").setMaster("local[4]")
val sc = new SparkContext(conf)
// streams will produce data every second
val ssc = new StreamingContext(sc, Seconds(1))
val max = 1000
//
// the first stream subscribes to the default consumer group in our SParkKafkaClient class
//
val props: Properties = SimpleKafkaClient.getBasicStringStringConsumer(kafkaServer)
val kafkaStream1 =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Arrays.asList(topic),
props.asInstanceOf[java.util.Map[String, Object]]
)
)
kafkaStream1.foreachRDD(r => {
println("*** [stream 1] got an RDD, size = " + r.count())
r.foreach(s => println("*** [stream 1] " + s))
if (r.count() > 0) {
println("*** [stream 1] " + r.getNumPartitions + " partitions")
r.glom().foreach(a => println("*** [stream 1] partition size = " + a.size))
}
})
//
// a second stream, uses the same props and hence the same consumer group
//
val kafkaStream2 =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Arrays.asList(topic),
props.asInstanceOf[java.util.Map[String, Object]]
)
)
kafkaStream2.foreachRDD(r => {
println("*** [stream 2] got an RDD, size = " + r.count())
r.foreach(s => println("*** [stream 2] " + s))
if (r.count() > 0) {
println("*** [stream 2] " + r.getNumPartitions + " partitions")
r.glom().foreach(a => println("*** [stream 2] partition size = " + a.size))
}
})
ssc.start()
println("*** started termination monitor")
// streams seem to need some time to get going
Thread.sleep(5000)
val producerThread = new Thread("Streaming Termination Controller") {
override def run() {
val client = new SimpleKafkaClient(kafkaServer)
send(max, sc, topic, client.basicStringStringProducer)
Thread.sleep(5000)
println("*** requesting streaming termination")
ssc.stop(stopSparkContext = false, stopGracefully = true)
}
}
producerThread.start()
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
println("*** streaming exception caught in monitor thread")
}
}
// stop Spark
sc.stop()
// stop Kafka
kafkaServer.stop()
println("*** done")
}
}