Friday, June 17, 2016

Hello Streaming : A Sliding Window Example

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val stc = new StreamingContext(sc, Seconds(3))
stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4374cd21

scala> val lines = stc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@1e3ef59f

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@61cbbfd7

scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@5b8088e

scala> //val wordCounts = pairs.reduceByKey(_ + _)

scala> val wordCounts = pairs.reduceByKeyAndWindow(((x:Int, y:Int) => x + y), 
     |                                         Seconds(15), Seconds(3))
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@7ca28846

scala> wordCounts.foreachRDD { rdd => 
     |   println("-------------------")
     |   rdd.foreach(println)    
     | }

scala> stc.start()

scala> stc.awaitTermination()

netcat(nc) Input

mountain@mountain:~$ nc -lk 9999
1
2
3
4
5
6
7
8
9
10
11
12
13
14

15

Output from spark-shell

-------------------
(3,1)
(5,1)
(1,1)
(6,1)
(4,1)
(2,1)
-------------------
(2,1)
(4,1)
(6,1)
(3,1)
(5,1)
(9,1)
(1,1)
(8,1)
(7,1)
-------------------
(5,1)
(4,1)
(6,1)
(2,1)
(13,1)
(8,1)
(11,1)
(3,1)
(9,1)
(12,1)
(1,1)
(7,1)
(10,1)
-------------------
(2,1)
(13,1)
(6,1)
(5,1)
(14,1)
(3,1)
(9,1)
(12,1)
(1,1)
(8,1)
(11,1)
(4,1)
(15,1)
(7,1)
(10,1)
-------------------
(5,1)
(2,1)
(13,1)
(6,1)
(14,1)
(3,1)
(4,1)
(15,1)
(9,1)
(12,1)
(1,1)
(8,1)
(11,1)
(7,1)
(10,1)
-------------------
(13,1)
(15,1)
(8,1)
(11,1)
(9,1)
(12,1)
(14,1)
(7,1)
(10,1)
-------------------
(11,1)
(13,1)
(15,1)
(12,1)
(14,1)
(10,1)
-------------------
(14,1)
(15,1)

-------------------

Hello Streaming : A Word Count Example

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val stc = new StreamingContext(sc, Seconds(5))
stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@36429c28

scala> val lines = stc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4feb2798

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@389785db

scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@2fad123b

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@6e9ad193

scala> wordCounts.print()

scala> /*wordCounts.foreachRDD { rdd => 
     |   rdd.foreach(print)    
     | }*/
     | stc.start()

scala> stc.awaitTermination()

netcat(nc) input

[raj@Rajkumars-MacBook-Pro ~]$nc -lk 9999
This is a test


This is anothet test

Output in spark-shell

-------------------------------------------
Time: 1466196225000 ms
-------------------------------------------

-------------------------------------------
Time: 1466196230000 ms
-------------------------------------------
(a,1)
(is,2)
(test,2)
(anothet,1)


(This,2)

Monday, May 2, 2016

Sliding window using : reduceByKeyAndWindow()

This example demonstrates the Sliding window using the Transformation reduceByKeyAndWindow(). This Python stub is used to provide Input to this Application

Input

Input for the Streaming Application is denoted by this color

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1
[  9]
     [.0] => data=lion || secondscounter=9.0
[ 10]
     [.0] => data=lion || secondscounter=10.0
     [.8] => data=lion || secondscounter=10.8
[ 11]
     [.7] => data=lion || secondscounter=11.7
[ 12]
     [.6] => data=lion || secondscounter=12.6

Output

(lion,(2,List(0.9, 1.8)))
(lion,(4,List(0.9, 1.8, 2.7, 3.6)))
(lion,(6,List(0.9, 1.8, 2.7, 3.6, 4.5, 5.4)))
(lion,(7,List(2.7, 3.6, 4.5, 5.4, 6.4, 7.2, 8.1)))
(lion,(7,List(4.5, 5.4, 6.4, 7.2, 8.1, 9.0, 10.0)))
(lion,(7,List(6.4, 7.2, 8.1, 9.0, 10.0, 10.8, 11.7)))
(lion,(5,List(9.0, 10.0, 10.8, 11.7, 12.6)))
(lion,(3,List(10.8, 11.7, 12.6)))
(lion,(1,List(12.6)))

Source Code

package a11_TCP_IP.a15_Window

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import java.text.SimpleDateFormat
import java.util.Calendar

/*
spark-submit --master local[*] --class a11_TCP_IP.a15_Window.a11_WindowExample --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
*/
object a11_WindowExample {
  
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Window_Example")
    val sc = new SparkContext(conf)

    // Set Batch Interval
    val stc = new StreamingContext(sc, Seconds(2))
    
    // Create an Input stream that receives data from
    // TCP/IP Socket
    val linesDstream = stc.socketTextStream("localhost", 9999)

    val splittedDs = linesDstream.map{ x =>
      x.split("\\|\\|")
       .map(_.trim)
    }

    val filteredListDs = splittedDs.map{ x =>
      val retval = for { element <- x
          val keyNval = element.split("=")
          // Ensure key & value are both present
          if keyNval.size >= 2
        } yield {
          val splitted = element.split("=")
          // Create Tuple of Key and Value
          splitted(0) -> splitted(1)
        }
      retval
    }
    
    // Convert the Splitted tokens into a Map 
    val dnryListDs = filteredListDs.map{ x => x.toMap }
    
    val keyvaluepairsDs = dnryListDs.map { x =>
      val data = x.getOrElse("data", "")
      (data, x)
    }

    val wordsDs = keyvaluepairsDs.flatMap{ x =>
      val xsplitted = x._1.split(" ").map(_.trim)
      val wordNmeta = for { element <- xsplitted
        } yield {
          (element, (  1, 
                       List((x._2.getOrElse("secondscounter", ""))
                    )))
        }
      wordNmeta  
    }
    
    def reduceFn(  x:(Int, List[String]), y:(Int, List[String])  ) 
                                  : (Int, List[String]) = {
      val sum = x._1 + y._1
      val timelist = x._2 ::: y._2
      (sum, timelist)
    }
    
    // Sliding window with Window duration of 6 seconds and
    // sliding duration of 2 seconds
    // reduceFn _ : Is a Partially Applied Function
    //            : Denotes, all the arguments are passed @ runtime
    val reducedDs = wordsDs.reduceByKeyAndWindow(reduceFn _, 
                                        Seconds(6), Seconds(2))
    
    // Apply the given function to each RDD in this DStream
    reducedDs.foreachRDD{ rdd =>
       val minNSec = getMinNsec()
       //println(f"Window at $minNSec%s")
       //println("-----------------------")
       rdd.collect().foreach(println)
       //println("")
    }
    
    // Start the Computation & wait for it to terminate
    stc.start()
    stc.awaitTermination()
  }
  
  def getMinNsec() : String = {
    val now = Calendar.getInstance().getTime()
    val myFormat = new SimpleDateFormat("mm.ss.SS")
    val minNmillisec = myFormat.format(now)
    val splitted = minNmillisec.split("\\.")
    val min = splitted(0)
    val sec = splitted(1)
    val millisec = splitted(2)
    val millisecTruncated = millisec(0)
    f"[$min%2s][$sec%2s.$millisecTruncated%s]"
  }
}

Saturday, April 30, 2016

Python Input Generation Stub for netcat (nc)

Input Generation Stub is executed as below and the  input is send to nc,. This Stub is used for generating Input for this Spark Streaming Application

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1

Program starts in Main()
'''==================================================================
Thread that sends data to 'nc'
Reference :
    http://www.techbeamers.com/python-multithreading-concepts/
    https://docs.python.org/3/library/queue.html
    http://stackoverflow.com/questions/11829982/piping-data-from-python-to-an-external-command
=================================================================='''
# a11 > a21_stub.py
import sys
import threading
import time
import yaml
import logging.config
from a22_Formatter import Formatter

def Get_arguments():
    log = logging.getLogger(__name__)
    argDny = {}
    for arg in sys.argv[1:]:
        log.debug("Arg -> %s", arg)
        key, value = arg.split("=")
        argDny[key] = value
    return argDny

def Load_input_file(argDny):
    with open(argDny['inputfile'], 'rt') as fin:
        text = fin.read()
    inputDny = yaml.load(text)
    return inputDny    
 
def Load_meta_info(inputList):
     metaDny = inputList[0]
     return metaDny
 
def Load_data(inputList):
     dataLst = inputList[1:]
     return dataLst
 
        
def Setup_logger():
    with open("./z11_logging.yaml", "rt") as fin:
        config = yaml.load(fin.read())
    logging.root.handlers = []
    logging.config.dictConfig(config)
    
    log = logging.getLogger(__name__)

def Main():    
    
    Setup_logger()

    log = logging.getLogger(__name__)
    
    # Get command line arguments
    argDny = Get_arguments()
    log.debug("argDny -> %s", argDny)
    
    # Load input file into Dictionary
    inputList = Load_input_file(argDny)
    log.debug("inputDny -> %s", inputList)
    
    # Get Meta data from inputList
    metaDny = Load_meta_info(inputList)
    log.debug("metaDny -> %s", metaDny)
    
    # Get Data List
    dataLst = Load_data(inputList)
    log.debug("dataList -> %s", dataLst)

    if metaDny['type'] == "simple":
        #thrd = MinFormatter(metaDny, dataLst)
        thrd= Formatter(metaDny, dataLst)
        log.info("Press ENTER when Ready...")
        input("")
        thrd.start()
        thrd.Place_data_in_queue()
        thrd.join()
            
Main()

# a11 > a22_Formatter.py
from datetime import datetime, timedelta
import logging.config
import time
import threading
import queue
from math import floor, modf
import sys
import subprocess
import re

'''===================================================================
Formatter
    -    Generates dummy input to be sent to netcat (nc)
==================================================================='''
class Formatter(threading.Thread):
    
    '''===============================================================
    __init__()
    ==============================================================='''
    def __init__(self, metaDny, dataLst):
        threading.Thread.__init__(self)
        self.log = logging.getLogger(__name__)
        self.fifoQ = queue.Queue()
        self.metaDny = metaDny
        self.dataLst = dataLst
        self.counter = 0
        self.diffInSec = 0
        self.decimalSeconds = 0
        self.firstDtime = datetime.now()
        self.curDtime = datetime.now()
        self.process = self.Connect_to_nc()

    '''===============================================================
    Connect_to_nc()
    ==============================================================='''
    def Connect_to_nc(self):
        ncCommand = "nc -lk 9999"
        splitted = re.split(" ", ncCommand)
        process = subprocess.Popen(splitted, 
                                   stdin=subprocess.PIPE,   \
                                   stdout=subprocess.PIPE,  \
                                   stderr=subprocess.PIPE,  \
                                   universal_newlines=True)
        
        return process
    '''===============================================================
    Increment_counter()
    ==============================================================='''
    def Increment_counter(self, delta):
        for no in range(delta):
            self.counter += 1
            self.Print_counter()

    '''===============================================================
    Print_counter()
    ==============================================================='''
    def Print_counter(self):
        minNsec = self.curDtime.strftime("[%M][%S]")
        '''if self.counter == 0:
            self.log.info("%s%s", "[min][sec]" + " " * 1, \
                                            "[seconds-counter]")
            self.log.info("")'''
        '''self.log.info("%s : [%s]", minNsec, \
                      (" " * 3 + str(self.counter))[-3:])'''
        self.log.info("[%s]", (" " * 3 + str(self.counter))[-3:])


    '''===============================================================
    Print_data()
    ==============================================================='''            
    def Print_data(self, data):
        self.log.debug("self.diffInSec -> %s", self.diffInSec)
        '''self.log.info("%s => %s", \
                    (" " * 16 + "[." + self.decimalSeconds + "]"),\
                                                     data)'''
        self.log.info("%s => %s", \
                    (" " * 5 + "[." + self.decimalSeconds + "]"),\
                                                     data)
        
    '''===============================================================
    Get_element_fifoq()
    ==============================================================='''   
    def Get_element_from_fifoq(self, size):
        dataList = []
        for inc in range(size):
            data = self.fifoQ.get_nowait()
            dataList.append(data)
        return dataList    
    
    '''===============================================================
    Add_meta_info()
    ==============================================================='''            
    def Add_meta_info(self, data):
        dataNmeta = []
        dataNmeta.append("data=" + data)
        metaList = self.metaDny['addtodata']
        for meta in metaList:
            self.log.debug("Meta -> %s", meta)
            if(meta == "minsec") :
                minsec = self.curDtime.\
                        strftime("[%M][%S.")
                minsec = minsec + str(self.decimalSeconds) + "]"
                dataNmeta.append("minsec=" + minsec)        
            if(meta == "secondscounter"):
                dataNmeta.append("secondscounter=" + \
                                 str(self.counter) + \
                                 "." + str(self.decimalSeconds))
        return dataNmeta
    
    
    '''===============================================================
    Send_data_to_nc()
    ==============================================================='''            
    def Send_data_to_nc(self, dataList):
        for data in dataList:
            dataNmeta = self.Add_meta_info(data)
            self.log.debug("dataNmeta -> %s", dataNmeta)
            formattedData = ' || '.join(dataNmeta)
            self.log.debug("formattedData -> %s", formattedData)
            self.Print_data(formattedData)
            self.process.stdin.write(formattedData + "\n")
            self.process.stdin.flush()
            
    '''===============================================================
    Place_data_in_queue()
    ==============================================================='''                
    def Place_data_in_queue(self):
        while True :
            for element in self.dataLst:
                delay = element['delay']
                data = element['data']
                #self.log.debug("element -> %s", element)
                #self.log.debug("data -> %s", data)
                #self.log.debug("delay -> %s", delay)
                
                if delay.endswith('s'):
                    delayTime = delay[:-1]
                    #self.log.debug("Delaying by : %s", delayTime)
                    time.sleep(float(delayTime))
                else:
                    sys.exit(1)
                
                if data != 'None':
                    self.fifoQ.put(data)
            if self.metaDny['mode'] == "singlepassmode":
                break            
    '''===============================================================
    run()
    ==============================================================='''                
    def run(self):
        floored = 0
        self.firstDtime = datetime.now()
        self.Print_counter()
        while True:
            time.sleep(0.1)
            self.curDtime = datetime.now()
            self.diffInSec = (self.curDtime - self.firstDtime).\
                                            total_seconds()
            self.log.debug("diffInSec -> %s", self.diffInSec)
            #rounded = round(self.diffInSec, 1)
            #self.log.debug("rounded -> %s", rounded)
            #self.decimalSeconds = str(rounded).split(".")[1]
            self.decimalSeconds = str(self.diffInSec).\
                                        split(".")[1][:1]
            self.log.debug("decimalSeconds -> %s", \
                           self.decimalSeconds)
            self.log.debug("self.diffInSec -> %s", self.diffInSec)
            floored = floor(self.diffInSec)
            self.log.debug("floored -> %s", floored)
            delta = floored - self.counter
            if delta >= 1:
                self.Increment_counter(delta)
            size = self.fifoQ.qsize()
            if(size > 0):  
                dataList = self.Get_element_from_fifoq(size)
                self.Send_data_to_nc(dataList)


Please note the YAML files(given below) should not have any TAB characters. Configure your editor to replace TAB by Spaces (Editors like VI, Eclipse have options to configure this setting)

# a11 > z11_logging.yaml
version         : 1
formatters      :
  simple        :
    #format      : "%(levelname)-7s || %(asctime)s || [ %(filename)s ] [ %(funcName)s() ] [ %(lineno)s ] || %(message)s"
    #format      : "%(levelname)-7s || %(asctime)s || %(filename)s > %(funcName)s() > %(lineno)s ] || %(message)s"
    format      : "%(message)s"
handlers        :
  console       :
    class       : logging.StreamHandler
    level       : DEBUG
    formatter   : simple
    stream      : ext://sys.stdout
    
  file_handler  :
    class       : logging.handlers.RotatingFileHandler
    level       : DEBUG
    formatter   : simple
    maxBytes    : 10485760
    backupCount : 2
    encoding    : utf8
    filename    : purge.log
    
root            :
    level       : INFO
    handlers    : [console, file_handler]           

# a11 > c15_input2.yaml
- meta:
  type            : simple
  # Can be minsec (or) secondscounter
  addtodata       : 
    - secondscounter
  # Can be singpassmode (or) loopmode  
  mode            : loopmode    
  
  # 'None' is used to provide delay without sending data to the
  # 'nc' server
- data  : lion
  delay : 0.9s  

TCP/IP Source Stream Processing

The application given below process the TCP/IP source data stream to produce the count for each unique word in a given Batch. Apart from the count, we are also producing the timestamp during which the input occurred

Stream Processing App   Input Generation Stub
(reads from)            (writes to)
    |______ netcat(nc) ________|

The Input stream is processed in a Batch Interval of 2 seconds. Given below is a sample Input  from the Stub,

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1

Given below is the equivalent output for the sample input,

spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar 
2016-04-30 20:14:27.605 java[43814:21451071] Unable to load realm info from SCDynamicStore
(lion,(2,List(0.9, 1.8)))
(lion,(2,List(2.7, 3.6)))
(lion,(2,List(4.5, 5.4)))
(lion,(2,List(6.4, 7.2)))
(lion,(1,List(8.1)))

Stub

The Python code for the Input generation Stub is available here...

Stream Processing App
package a11_TCP_IP.a11_Basic

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import java.text.SimpleDateFormat
import java.util.Calendar

/*
spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar  
 */
object a11_HelloTcp {
  
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Basic_TCP_IP")
    val sc = new SparkContext(conf)

    // Set Batch Interval
    val stc = new StreamingContext(sc, Seconds(2))
    
    //  Create an Input stream that receives data from
    //  TCP/IP Socket
    val linesDstream = stc.socketTextStream("localhost", 9999)

    val minNsec = this.getMinNsec()
    
    val splittedDs = linesDstream.map{ x =>
      x.split("\\|\\|")
       .map(_.trim)
    }

    val filteredListDs = splittedDs.map{ x =>
      val retval = for { element <- x
          val keyNval = element.split("=")
          if keyNval.size >= 2
        } yield {
          val splitted = element.split("=")
          // Create Tuple of Key and Value
          splitted(0) -> splitted(1)
        }
      retval
    }
    
    val dnryList = filteredListDs.map{ x =>
      x.toMap
    }  
    
    val filteredDnryListDs = dnryList.filter{ x =>
      if (x.getOrElse("data", ()) != ())
        true
      else
        false
    }
    
    val keyvaluepairsDs = filteredDnryListDs.map { x =>
      val data = x.getOrElse("data", "")
      (data, x)
    }

    val wordsDs = keyvaluepairsDs.flatMap{ x =>
      //Underscore (_) is used as a Placeholder indicator
      val xsplitted = x._1.split(" ").map(_.trim)
      val wordNmeta = for { element <- xsplitted
        } yield {
          (element, (  1, 
                       List((x._2.getOrElse("secondscounter", ""))
                    )))
        }
      wordNmeta  
    }
    
    val reducedDs = wordsDs.reduceByKey{ (x, y) =>
      val sum = x._1 + y._1
      val timelist = x._2 ::: y._2
      (sum, timelist)
    }
    
    // Print first **10 elements of every batch of data
    //reducedDs.print()
    
    // Apply the given function to each RDD in this DStream
    reducedDs.foreachRDD{ rdd =>
       val minNSec = getMinNsec()
       //println(f"Batch at $minNSec%s")
       //println("-----------------------")
       rdd.collect().foreach(println)
       //println("")
    }
    
    //  Start the Computation & wait for it to terminate
    stc.start()
    stc.awaitTermination()
  }
  
  def getMinNsec() : String = {
    val now = Calendar.getInstance().getTime()
    val myFormat = new SimpleDateFormat("mm.ss.SS")
    val minNmillisec = myFormat.format(now)
    val splitted = minNmillisec.split("\\.")
    val min = splitted(0)
    val sec = splitted(1)
    val millisec = splitted(2)
    val millisecTruncated = millisec(0)
    f"[$min%2s][$sec%2s.$millisecTruncated%s]"
  }
}