This example demonstrates the Sliding window using the Transformation reduceByKeyAndWindow(). This Python stub is used to provide Input to this Application
Input
Input for the Streaming Application is denoted by this color
Output
Source Code
Input
Input for the Streaming Application is denoted by this color
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
[ 9]
[.0] => data=lion || secondscounter=9.0
[ 10]
[.0] => data=lion || secondscounter=10.0
[.8] => data=lion || secondscounter=10.8
[ 11]
[.7] => data=lion || secondscounter=11.7
[ 12]
[.6] => data=lion || secondscounter=12.6
Output
(lion,(2,List(0.9, 1.8)))
(lion,(4,List(0.9, 1.8, 2.7, 3.6)))
(lion,(6,List(0.9, 1.8, 2.7, 3.6, 4.5, 5.4)))
(lion,(7,List(2.7, 3.6, 4.5, 5.4, 6.4, 7.2, 8.1)))
(lion,(7,List(4.5, 5.4, 6.4, 7.2, 8.1, 9.0, 10.0)))
(lion,(7,List(6.4, 7.2, 8.1, 9.0, 10.0, 10.8, 11.7)))
(lion,(5,List(9.0, 10.0, 10.8, 11.7, 12.6)))
(lion,(3,List(10.8, 11.7, 12.6)))
(lion,(1,List(12.6)))
Source Code
- package a11_TCP_IP.a15_Window
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import java.text.SimpleDateFormat
- import java.util.Calendar
- /*
- spark-submit --master local[*] --class a11_TCP_IP.a15_Window.a11_WindowExample --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
- */
- object a11_WindowExample {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("Window_Example")
- val sc = new SparkContext(conf)
- // Set Batch Interval
- val stc = new StreamingContext(sc, Seconds(2))
- // Create an Input stream that receives data from
- // TCP/IP Socket
- val linesDstream = stc.socketTextStream("localhost", 9999)
- val splittedDs = linesDstream.map{ x =>
- x.split("\\|\\|")
- .map(_.trim)
- }
- val filteredListDs = splittedDs.map{ x =>
- val retval = for { element <- x
- val keyNval = element.split("=")
- // Ensure key & value are both present
- if keyNval.size >= 2
- } yield {
- val splitted = element.split("=")
- // Create Tuple of Key and Value
- splitted(0) -> splitted(1)
- }
- retval
- }
- // Convert the Splitted tokens into a Map
- val dnryListDs = filteredListDs.map{ x => x.toMap }
- val keyvaluepairsDs = dnryListDs.map { x =>
- val data = x.getOrElse("data", "")
- (data, x)
- }
- val wordsDs = keyvaluepairsDs.flatMap{ x =>
- val xsplitted = x._1.split(" ").map(_.trim)
- val wordNmeta = for { element <- xsplitted
- } yield {
- (element, ( 1,
- List((x._2.getOrElse("secondscounter", ""))
- )))
- }
- wordNmeta
- }
- def reduceFn( x:(Int, List[String]), y:(Int, List[String]) )
- : (Int, List[String]) = {
- val sum = x._1 + y._1
- val timelist = x._2 ::: y._2
- (sum, timelist)
- }
- // Sliding window with Window duration of 6 seconds and
- // sliding duration of 2 seconds
- // reduceFn _ : Is a Partially Applied Function
- // : Denotes, all the arguments are passed @ runtime
- val reducedDs = wordsDs.reduceByKeyAndWindow(reduceFn _,
- Seconds(6), Seconds(2))
- // Apply the given function to each RDD in this DStream
- reducedDs.foreachRDD{ rdd =>
- val minNSec = getMinNsec()
- //println(f"Window at $minNSec%s")
- //println("-----------------------")
- rdd.collect().foreach(println)
- //println("")
- }
- // Start the Computation & wait for it to terminate
- stc.start()
- stc.awaitTermination()
- }
- def getMinNsec() : String = {
- val now = Calendar.getInstance().getTime()
- val myFormat = new SimpleDateFormat("mm.ss.SS")
- val minNmillisec = myFormat.format(now)
- val splitted = minNmillisec.split("\\.")
- val min = splitted(0)
- val sec = splitted(1)
- val millisec = splitted(2)
- val millisecTruncated = millisec(0)
- f"[$min%2s][$sec%2s.$millisecTruncated%s]"
- }
- }