The application given below process the TCP/IP source data stream to produce the count for each unique word in a given Batch. Apart from the count, we are also producing the timestamp during which the input occurred
Stream Processing App Input Generation Stub
(reads from) (writes to)
|______ netcat(nc) ________|
The Input stream is processed in a Batch Interval of 2 seconds. Given below is a sample Input from the Stub,
Stream Processing App Input Generation Stub
(reads from) (writes to)
|______ netcat(nc) ________|
The Input stream is processed in a Batch Interval of 2 seconds. Given below is a sample Input from the Stub,
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
Given below is the equivalent output for the sample input,
spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
2016-04-30 20:14:27.605 java[43814:21451071] Unable to load realm info from SCDynamicStore
(lion,(2,List(0.9, 1.8)))
(lion,(2,List(2.7, 3.6)))
(lion,(2,List(4.5, 5.4)))
(lion,(2,List(6.4, 7.2)))
(lion,(1,List(8.1)))
Stub
The Python code for the Input generation Stub is available here...
Stream Processing App
package a11_TCP_IP.a11_Basic import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import java.text.SimpleDateFormat import java.util.Calendar /* spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar */ object a11_HelloTcp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Basic_TCP_IP") val sc = new SparkContext(conf) // Set Batch Interval val stc = new StreamingContext(sc, Seconds(2)) // Create an Input stream that receives data from // TCP/IP Socket val linesDstream = stc.socketTextStream("localhost", 9999) val minNsec = this.getMinNsec() val splittedDs = linesDstream.map{ x => x.split("\\|\\|") .map(_.trim) } val filteredListDs = splittedDs.map{ x => val retval = for { element <- x val keyNval = element.split("=") if keyNval.size >= 2 } yield { val splitted = element.split("=") // Create Tuple of Key and Value splitted(0) -> splitted(1) } retval } val dnryList = filteredListDs.map{ x => x.toMap } val filteredDnryListDs = dnryList.filter{ x => if (x.getOrElse("data", ()) != ()) true else false } val keyvaluepairsDs = filteredDnryListDs.map { x => val data = x.getOrElse("data", "") (data, x) } val wordsDs = keyvaluepairsDs.flatMap{ x => //Underscore (_) is used as a Placeholder indicator val xsplitted = x._1.split(" ").map(_.trim) val wordNmeta = for { element <- xsplitted } yield { (element, ( 1, List((x._2.getOrElse("secondscounter", "")) ))) } wordNmeta } val reducedDs = wordsDs.reduceByKey{ (x, y) => val sum = x._1 + y._1 val timelist = x._2 ::: y._2 (sum, timelist) } // Print first **10 elements of every batch of data //reducedDs.print() // Apply the given function to each RDD in this DStream reducedDs.foreachRDD{ rdd => val minNSec = getMinNsec() //println(f"Batch at $minNSec%s") //println("-----------------------") rdd.collect().foreach(println) //println("") } // Start the Computation & wait for it to terminate stc.start() stc.awaitTermination() } def getMinNsec() : String = { val now = Calendar.getInstance().getTime() val myFormat = new SimpleDateFormat("mm.ss.SS") val minNmillisec = myFormat.format(now) val splitted = minNmillisec.split("\\.") val min = splitted(0) val sec = splitted(1) val millisec = splitted(2) val millisecTruncated = millisec(0) f"[$min%2s][$sec%2s.$millisecTruncated%s]" } }