forked from spirom/spark-streaming-with-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSimpleStreaming.scala
121 lines (95 loc) · 3.95 KB
/
SimpleStreaming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import java.util.Properties
import java.util.Arrays
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import util.{EmbeddedKafkaServer, SimpleKafkaClient, SparkKafkaSink}
import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/**
* The most basic streaming example: starts a Kafka server, creates a topic, creates a stream
* to process that topic, and publishes some data using the SparkKafkaSink.
*
* Notice there's quite a lot of waiting. It takes some time for streaming to get going,
* and data published too early tends to be missed by the stream. (No doubt, this is partly
* because this example uses the simplest method to create the stream, and thus doesn't
* get an opportunity to set auto.offset.reset to "earliest".
*
* Also, data that is published takes some time to propagate to the stream.
* This seems inevitable, and is almost guaranteed to be slower
* in a self-contained example like this.
*/
object SimpleStreaming {
def main (args: Array[String]) {
val topic = "foo"
val kafkaServer = new EmbeddedKafkaServer()
kafkaServer.start()
kafkaServer.createTopic(topic, 4)
val conf = new SparkConf().setAppName("SimpleStreaming").setMaster("local[4]")
val sc = new SparkContext(conf)
// streams will produce data every second
val ssc = new StreamingContext(sc, Seconds(1))
// this many messages
val max = 1000
// Create the stream.
val props: Properties = SimpleKafkaClient.getBasicStringStringConsumer(kafkaServer)
val kafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Arrays.asList(topic),
props.asInstanceOf[java.util.Map[String, Object]]
)
)
// now, whenever this Kafka stream produces data the resulting RDD will be printed
kafkaStream.foreachRDD(r => {
println("*** got an RDD, size = " + r.count())
r.foreach(s => println(s))
if (r.count() > 0) {
// let's see how many partitions the resulting RDD has -- notice that it has nothing
// to do with the number of partitions in the RDD used to publish the data (4), nor
// the number of partitions of the topic (which also happens to be four.)
println("*** " + r.getNumPartitions + " partitions")
r.glom().foreach(a => println("*** partition size = " + a.size))
}
})
ssc.start()
println("*** started termination monitor")
// streams seem to need some time to get going
Thread.sleep(5000)
val producerThread = new Thread("Streaming Termination Controller") {
override def run() {
val client = new SimpleKafkaClient(kafkaServer)
val numbers = 1 to max
val producer = new KafkaProducer[String, String](client.basicStringStringProducer)
numbers.foreach { n =>
// NOTE:
// 1) the keys and values are strings, which is important when receiving them
// 2) We don't specify which Kafka partition to send to, so a hash of the key
// is used to determine this
producer.send(new ProducerRecord(topic, "key_" + n, "string_" + n))
}
Thread.sleep(5000)
println("*** requesting streaming termination")
ssc.stop(stopSparkContext = false, stopGracefully = true)
}
}
producerThread.start()
try {
ssc.awaitTermination()
println("*** streaming terminated")
} catch {
case e: Exception => {
println("*** streaming exception caught in monitor thread")
}
}
// stop Spark
sc.stop()
// stop Kafka
kafkaServer.stop()
println("*** done")
}
}