Monday, May 2, 2016

Sliding window using : reduceByKeyAndWindow()

This example demonstrates the Sliding window using the Transformation reduceByKeyAndWindow(). This Python stub is used to provide Input to this Application

Input

Input for the Streaming Application is denoted by this color

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1
[  9]
     [.0] => data=lion || secondscounter=9.0
[ 10]
     [.0] => data=lion || secondscounter=10.0
     [.8] => data=lion || secondscounter=10.8
[ 11]
     [.7] => data=lion || secondscounter=11.7
[ 12]
     [.6] => data=lion || secondscounter=12.6

Output

(lion,(2,List(0.9, 1.8)))
(lion,(4,List(0.9, 1.8, 2.7, 3.6)))
(lion,(6,List(0.9, 1.8, 2.7, 3.6, 4.5, 5.4)))
(lion,(7,List(2.7, 3.6, 4.5, 5.4, 6.4, 7.2, 8.1)))
(lion,(7,List(4.5, 5.4, 6.4, 7.2, 8.1, 9.0, 10.0)))
(lion,(7,List(6.4, 7.2, 8.1, 9.0, 10.0, 10.8, 11.7)))
(lion,(5,List(9.0, 10.0, 10.8, 11.7, 12.6)))
(lion,(3,List(10.8, 11.7, 12.6)))
(lion,(1,List(12.6)))

Source Code

package a11_TCP_IP.a15_Window

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import java.text.SimpleDateFormat
import java.util.Calendar

/*
spark-submit --master local[*] --class a11_TCP_IP.a15_Window.a11_WindowExample --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
*/
object a11_WindowExample {
  
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Window_Example")
    val sc = new SparkContext(conf)

    // Set Batch Interval
    val stc = new StreamingContext(sc, Seconds(2))
    
    // Create an Input stream that receives data from
    // TCP/IP Socket
    val linesDstream = stc.socketTextStream("localhost", 9999)

    val splittedDs = linesDstream.map{ x =>
      x.split("\\|\\|")
       .map(_.trim)
    }

    val filteredListDs = splittedDs.map{ x =>
      val retval = for { element <- x
          val keyNval = element.split("=")
          // Ensure key & value are both present
          if keyNval.size >= 2
        } yield {
          val splitted = element.split("=")
          // Create Tuple of Key and Value
          splitted(0) -> splitted(1)
        }
      retval
    }
    
    // Convert the Splitted tokens into a Map 
    val dnryListDs = filteredListDs.map{ x => x.toMap }
    
    val keyvaluepairsDs = dnryListDs.map { x =>
      val data = x.getOrElse("data", "")
      (data, x)
    }

    val wordsDs = keyvaluepairsDs.flatMap{ x =>
      val xsplitted = x._1.split(" ").map(_.trim)
      val wordNmeta = for { element <- xsplitted
        } yield {
          (element, (  1, 
                       List((x._2.getOrElse("secondscounter", ""))
                    )))
        }
      wordNmeta  
    }
    
    def reduceFn(  x:(Int, List[String]), y:(Int, List[String])  ) 
                                  : (Int, List[String]) = {
      val sum = x._1 + y._1
      val timelist = x._2 ::: y._2
      (sum, timelist)
    }
    
    // Sliding window with Window duration of 6 seconds and
    // sliding duration of 2 seconds
    // reduceFn _ : Is a Partially Applied Function
    //            : Denotes, all the arguments are passed @ runtime
    val reducedDs = wordsDs.reduceByKeyAndWindow(reduceFn _, 
                                        Seconds(6), Seconds(2))
    
    // Apply the given function to each RDD in this DStream
    reducedDs.foreachRDD{ rdd =>
       val minNSec = getMinNsec()
       //println(f"Window at $minNSec%s")
       //println("-----------------------")
       rdd.collect().foreach(println)
       //println("")
    }
    
    // Start the Computation & wait for it to terminate
    stc.start()
    stc.awaitTermination()
  }
  
  def getMinNsec() : String = {
    val now = Calendar.getInstance().getTime()
    val myFormat = new SimpleDateFormat("mm.ss.SS")
    val minNmillisec = myFormat.format(now)
    val splitted = minNmillisec.split("\\.")
    val min = splitted(0)
    val sec = splitted(1)
    val millisec = splitted(2)
    val millisecTruncated = millisec(0)
    f"[$min%2s][$sec%2s.$millisecTruncated%s]"
  }
}