This example demonstrates the Sliding window using the Transformation reduceByKeyAndWindow(). This Python stub is used to provide Input to this Application
Input
Input for the Streaming Application is denoted by this color
Output
Source Code
Input
Input for the Streaming Application is denoted by this color
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
[ 9]
[.0] => data=lion || secondscounter=9.0
[ 10]
[.0] => data=lion || secondscounter=10.0
[.8] => data=lion || secondscounter=10.8
[ 11]
[.7] => data=lion || secondscounter=11.7
[ 12]
[.6] => data=lion || secondscounter=12.6
Output
(lion,(2,List(0.9, 1.8)))
(lion,(4,List(0.9, 1.8, 2.7, 3.6)))
(lion,(6,List(0.9, 1.8, 2.7, 3.6, 4.5, 5.4)))
(lion,(7,List(2.7, 3.6, 4.5, 5.4, 6.4, 7.2, 8.1)))
(lion,(7,List(4.5, 5.4, 6.4, 7.2, 8.1, 9.0, 10.0)))
(lion,(7,List(6.4, 7.2, 8.1, 9.0, 10.0, 10.8, 11.7)))
(lion,(5,List(9.0, 10.0, 10.8, 11.7, 12.6)))
(lion,(3,List(10.8, 11.7, 12.6)))
(lion,(1,List(12.6)))
Source Code
package a11_TCP_IP.a15_Window import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import java.text.SimpleDateFormat import java.util.Calendar /* spark-submit --master local[*] --class a11_TCP_IP.a15_Window.a11_WindowExample --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar */ object a11_WindowExample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Window_Example") val sc = new SparkContext(conf) // Set Batch Interval val stc = new StreamingContext(sc, Seconds(2)) // Create an Input stream that receives data from // TCP/IP Socket val linesDstream = stc.socketTextStream("localhost", 9999) val splittedDs = linesDstream.map{ x => x.split("\\|\\|") .map(_.trim) } val filteredListDs = splittedDs.map{ x => val retval = for { element <- x val keyNval = element.split("=") // Ensure key & value are both present if keyNval.size >= 2 } yield { val splitted = element.split("=") // Create Tuple of Key and Value splitted(0) -> splitted(1) } retval } // Convert the Splitted tokens into a Map val dnryListDs = filteredListDs.map{ x => x.toMap } val keyvaluepairsDs = dnryListDs.map { x => val data = x.getOrElse("data", "") (data, x) } val wordsDs = keyvaluepairsDs.flatMap{ x => val xsplitted = x._1.split(" ").map(_.trim) val wordNmeta = for { element <- xsplitted } yield { (element, ( 1, List((x._2.getOrElse("secondscounter", "")) ))) } wordNmeta } def reduceFn( x:(Int, List[String]), y:(Int, List[String]) ) : (Int, List[String]) = { val sum = x._1 + y._1 val timelist = x._2 ::: y._2 (sum, timelist) } // Sliding window with Window duration of 6 seconds and // sliding duration of 2 seconds // reduceFn _ : Is a Partially Applied Function // : Denotes, all the arguments are passed @ runtime val reducedDs = wordsDs.reduceByKeyAndWindow(reduceFn _, Seconds(6), Seconds(2)) // Apply the given function to each RDD in this DStream reducedDs.foreachRDD{ rdd => val minNSec = getMinNsec() //println(f"Window at $minNSec%s") //println("-----------------------") rdd.collect().foreach(println) //println("") } // Start the Computation & wait for it to terminate stc.start() stc.awaitTermination() } def getMinNsec() : String = { val now = Calendar.getInstance().getTime() val myFormat = new SimpleDateFormat("mm.ss.SS") val minNmillisec = myFormat.format(now) val splitted = minNmillisec.split("\\.") val min = splitted(0) val sec = splitted(1) val millisec = splitted(2) val millisecTruncated = millisec(0) f"[$min%2s][$sec%2s.$millisecTruncated%s]" } }