Saturday, April 30, 2016

TCP/IP Source Stream Processing

The application given below process the TCP/IP source data stream to produce the count for each unique word in a given Batch. Apart from the count, we are also producing the timestamp during which the input occurred

Stream Processing App   Input Generation Stub
(reads from)            (writes to)
    |______ netcat(nc) ________|

The Input stream is processed in a Batch Interval of 2 seconds. Given below is a sample Input  from the Stub,

python3.5 ./a21_stub.py inputfile=./c15_input2.yaml 
Press ENTER when Ready...
[  0]
     [.9] => data=lion || secondscounter=0.9
[  1]
     [.8] => data=lion || secondscounter=1.8
[  2]
     [.7] => data=lion || secondscounter=2.7
[  3]
     [.6] => data=lion || secondscounter=3.6
[  4]
     [.5] => data=lion || secondscounter=4.5
[  5]
     [.4] => data=lion || secondscounter=5.4
[  6]
     [.4] => data=lion || secondscounter=6.4
[  7]
     [.2] => data=lion || secondscounter=7.2
[  8]
     [.1] => data=lion || secondscounter=8.1

Given below is the equivalent output for the sample input,

spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar 
2016-04-30 20:14:27.605 java[43814:21451071] Unable to load realm info from SCDynamicStore
(lion,(2,List(0.9, 1.8)))
(lion,(2,List(2.7, 3.6)))
(lion,(2,List(4.5, 5.4)))
(lion,(2,List(6.4, 7.2)))
(lion,(1,List(8.1)))

Stub

The Python code for the Input generation Stub is available here...

Stream Processing App
package a11_TCP_IP.a11_Basic

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import java.text.SimpleDateFormat
import java.util.Calendar

/*
spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar  
 */
object a11_HelloTcp {
  
  def main(args: Array[String]) {
    
    val conf = new SparkConf().setAppName("Basic_TCP_IP")
    val sc = new SparkContext(conf)

    // Set Batch Interval
    val stc = new StreamingContext(sc, Seconds(2))
    
    //  Create an Input stream that receives data from
    //  TCP/IP Socket
    val linesDstream = stc.socketTextStream("localhost", 9999)

    val minNsec = this.getMinNsec()
    
    val splittedDs = linesDstream.map{ x =>
      x.split("\\|\\|")
       .map(_.trim)
    }

    val filteredListDs = splittedDs.map{ x =>
      val retval = for { element <- x
          val keyNval = element.split("=")
          if keyNval.size >= 2
        } yield {
          val splitted = element.split("=")
          // Create Tuple of Key and Value
          splitted(0) -> splitted(1)
        }
      retval
    }
    
    val dnryList = filteredListDs.map{ x =>
      x.toMap
    }  
    
    val filteredDnryListDs = dnryList.filter{ x =>
      if (x.getOrElse("data", ()) != ())
        true
      else
        false
    }
    
    val keyvaluepairsDs = filteredDnryListDs.map { x =>
      val data = x.getOrElse("data", "")
      (data, x)
    }

    val wordsDs = keyvaluepairsDs.flatMap{ x =>
      //Underscore (_) is used as a Placeholder indicator
      val xsplitted = x._1.split(" ").map(_.trim)
      val wordNmeta = for { element <- xsplitted
        } yield {
          (element, (  1, 
                       List((x._2.getOrElse("secondscounter", ""))
                    )))
        }
      wordNmeta  
    }
    
    val reducedDs = wordsDs.reduceByKey{ (x, y) =>
      val sum = x._1 + y._1
      val timelist = x._2 ::: y._2
      (sum, timelist)
    }
    
    // Print first **10 elements of every batch of data
    //reducedDs.print()
    
    // Apply the given function to each RDD in this DStream
    reducedDs.foreachRDD{ rdd =>
       val minNSec = getMinNsec()
       //println(f"Batch at $minNSec%s")
       //println("-----------------------")
       rdd.collect().foreach(println)
       //println("")
    }
    
    //  Start the Computation & wait for it to terminate
    stc.start()
    stc.awaitTermination()
  }
  
  def getMinNsec() : String = {
    val now = Calendar.getInstance().getTime()
    val myFormat = new SimpleDateFormat("mm.ss.SS")
    val minNmillisec = myFormat.format(now)
    val splitted = minNmillisec.split("\\.")
    val min = splitted(0)
    val sec = splitted(1)
    val millisec = splitted(2)
    val millisecTruncated = millisec(0)
    f"[$min%2s][$sec%2s.$millisecTruncated%s]"
  }
}