Monday, May 2, 2016

Sliding window using : reduceByKeyAndWindow()

This example demonstrates the Sliding window using the Transformation reduceByKeyAndWindow(). This Python stub is used to provide Input to this Application

Input

Input for the Streaming Application is denoted by this color

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1
[  9]
     [.0] => data=lion || secondscounter=9.0
[ 10]
     [.0] => data=lion || secondscounter=10.0
     [.8] => data=lion || secondscounter=10.8
[ 11]
     [.7] => data=lion || secondscounter=11.7
[ 12]
     [.6] => data=lion || secondscounter=12.6

Output

(lion,(2,List(0.9, 1.8)))
(lion,(4,List(0.9, 1.8, 2.7, 3.6)))
(lion,(6,List(0.9, 1.8, 2.7, 3.6, 4.5, 5.4)))
(lion,(7,List(2.7, 3.6, 4.5, 5.4, 6.4, 7.2, 8.1)))
(lion,(7,List(4.5, 5.4, 6.4, 7.2, 8.1, 9.0, 10.0)))
(lion,(7,List(6.4, 7.2, 8.1, 9.0, 10.0, 10.8, 11.7)))
(lion,(5,List(9.0, 10.0, 10.8, 11.7, 12.6)))
(lion,(3,List(10.8, 11.7, 12.6)))
(lion,(1,List(12.6)))

Source Code

  1. package a11_TCP_IP.a15_Window
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.SparkContext
  5. import org.apache.spark.streaming.Seconds
  6. import org.apache.spark.streaming.StreamingContext
  7. import java.text.SimpleDateFormat
  8. import java.util.Calendar
  9.  
  10. /*
  11. spark-submit --master local[*] --class a11_TCP_IP.a15_Window.a11_WindowExample --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
  12. */
  13. object a11_WindowExample {
  14. def main(args: Array[String]) {
  15. val conf = new SparkConf().setAppName("Window_Example")
  16. val sc = new SparkContext(conf)
  17.  
  18. // Set Batch Interval
  19. val stc = new StreamingContext(sc, Seconds(2))
  20. // Create an Input stream that receives data from
  21. // TCP/IP Socket
  22. val linesDstream = stc.socketTextStream("localhost", 9999)
  23.  
  24. val splittedDs = linesDstream.map{ x =>
  25. x.split("\\|\\|")
  26. .map(_.trim)
  27. }
  28.  
  29. val filteredListDs = splittedDs.map{ x =>
  30. val retval = for { element <- x
  31. val keyNval = element.split("=")
  32. // Ensure key & value are both present
  33. if keyNval.size >= 2
  34. } yield {
  35. val splitted = element.split("=")
  36. // Create Tuple of Key and Value
  37. splitted(0) -> splitted(1)
  38. }
  39. retval
  40. }
  41. // Convert the Splitted tokens into a Map
  42. val dnryListDs = filteredListDs.map{ x => x.toMap }
  43. val keyvaluepairsDs = dnryListDs.map { x =>
  44. val data = x.getOrElse("data", "")
  45. (data, x)
  46. }
  47.  
  48. val wordsDs = keyvaluepairsDs.flatMap{ x =>
  49. val xsplitted = x._1.split(" ").map(_.trim)
  50. val wordNmeta = for { element <- xsplitted
  51. } yield {
  52. (element, ( 1,
  53. List((x._2.getOrElse("secondscounter", ""))
  54. )))
  55. }
  56. wordNmeta
  57. }
  58. def reduceFn( x:(Int, List[String]), y:(Int, List[String]) )
  59. : (Int, List[String]) = {
  60. val sum = x._1 + y._1
  61. val timelist = x._2 ::: y._2
  62. (sum, timelist)
  63. }
  64. // Sliding window with Window duration of 6 seconds and
  65. // sliding duration of 2 seconds
  66. // reduceFn _ : Is a Partially Applied Function
  67. // : Denotes, all the arguments are passed @ runtime
  68. val reducedDs = wordsDs.reduceByKeyAndWindow(reduceFn _,
  69. Seconds(6), Seconds(2))
  70. // Apply the given function to each RDD in this DStream
  71. reducedDs.foreachRDD{ rdd =>
  72. val minNSec = getMinNsec()
  73. //println(f"Window at $minNSec%s")
  74. //println("-----------------------")
  75. rdd.collect().foreach(println)
  76. //println("")
  77. }
  78. // Start the Computation & wait for it to terminate
  79. stc.start()
  80. stc.awaitTermination()
  81. }
  82. def getMinNsec() : String = {
  83. val now = Calendar.getInstance().getTime()
  84. val myFormat = new SimpleDateFormat("mm.ss.SS")
  85. val minNmillisec = myFormat.format(now)
  86. val splitted = minNmillisec.split("\\.")
  87. val min = splitted(0)
  88. val sec = splitted(1)
  89. val millisec = splitted(2)
  90. val millisecTruncated = millisec(0)
  91. f"[$min%2s][$sec%2s.$millisecTruncated%s]"
  92. }
  93. }