-
Notifications
You must be signed in to change notification settings - Fork 35
Streaming
A popular Spark extension is Spark Streaming. Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs. There are examples for use with a checkpoint, Kafka and SQL in the examples module.
We shall also provide a quick example below:
// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession
setRunAfterStart {
println("Stream is started")
}
// create input stream for, for instance, Netcat: `$ nc -lk 9999`
val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
// split input stream on space
val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }
// perform action on each formed RDD in the stream
words.foreachRDD { rdd: JavaRDD<String>, _: Time ->
// to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
withSpark(rdd) { // this: KSparkSession
val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
dataframe
.groupByKey { it.word }
.count()
.show()
// +-----+--------+
// | key|count(1)|
// +-----+--------+
// |hello| 1|
// | is| 1|
// | a| 1|
// | this| 1|
// | test| 3|
// +-----+--------+
}
}
}
Note that withSparkStreaming {}
does not provide a spark
session in the context. This is because it needs to be created from the right SparkConf
depending on what you're doing with the data stream.
This is why we provide withSpark(sc: SparkConf) {}
inside the KSparkStreamingSession
as well as two helper functions for when you already have an instance of ssc: JavaStreamingContext
or an RDD.
For instance, if you want to create a dataset inside the KSparkStreamingSession
context or you want to broadcast a variable, you can create an instance of a spark session from the ssc: JavaStreamingContext
like
withSparkStreaming(...) { // this: KSparkStreamingSession
// for instance
val broadcastSomething: Broadcast<*> = withSpark(sscForConf = ssc) { // this: KsparkSession
spark.broadcast(something)
}
}
When using something like foreachRDD {}
, the spark session must be created from the SparkConf
of the RDD itself. For example:
withSparkStreaming(...) { // this: KSparkStreamingSession
val stream: JavaDStream<*> = ...
stream.foreachRDD { rdd: JavaRDD<*>, time: Time ->
// for instance
withSpark(rddForConf = rdd) { // this: KSparkSession
rdd.toDS().show()
}
}
}
A feature of Spark Streaming is checkpointing. This can also be done easily from the Kotlin API like:
withSparkStreaming(batchDuration = ..., checkPointPath = "/path/to/checkpoint") {
// contents will only only be run the first time
// the second time, the stream transformations will be read from the checkpoint
}
NOTE: If setRunAfterStart {}
is used, this is also only executed if the checkpoint is empty.
Using the Java API, it's necessary to use mapToPair {}
to get a JavaPairDStream
and specific key/value functions like reduceByKey {}
on a JavaDStream
. In Kotlin, however, we have extension functions, which makes it possible to have these types of functions accessible directly on JavaStream<Tuple2<*, *>>
, so we can simply do:
val wordCounts: JavaDStream<Tuple2<String, Int>> = words
.map { it X 1 }
.reduceByKey { a: Int, b: Int -> a + b }