The application given below process the
TCP/IP source data stream to produce the count for each unique word in a given
Batch. Apart from the count, we are also producing the timestamp during which the input occurred
Stream Processing App Input Generation Stub
(reads from) (writes to)
|______ netcat(nc) ________|
The Input stream is processed in a
Batch Interval of
2 seconds. Given below is a sample
Input from the
Stub,
python3.5 ./a21_stub.py inputfile=./c15_input2.yaml
Press ENTER when Ready...
[ 0]
[.9] => data=lion || secondscounter=0.9
[ 1]
[.8] => data=lion || secondscounter=1.8
[ 2]
[.7] => data=lion || secondscounter=2.7
[ 3]
[.6] => data=lion || secondscounter=3.6
[ 4]
[.5] => data=lion || secondscounter=4.5
[ 5]
[.4] => data=lion || secondscounter=5.4
[ 6]
[.4] => data=lion || secondscounter=6.4
[ 7]
[.2] => data=lion || secondscounter=7.2
[ 8]
[.1] => data=lion || secondscounter=8.1
Given below is the equivalent output for the sample input,
spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
2016-04-30 20:14:27.605 java[43814:21451071] Unable to load realm info from SCDynamicStore
(lion,(2,List(0.9, 1.8)))
(lion,(2,List(2.7, 3.6)))
(lion,(2,List(4.5, 5.4)))
(lion,(2,List(6.4, 7.2)))
(lion,(1,List(8.1)))
Stub
The Python code for the Input generation Stub is available here...
Stream Processing App
package a11_TCP_IP.a11_Basic
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import java.text.SimpleDateFormat
import java.util.Calendar
/*
spark-submit --master local[*] --class a11_TCP_IP.a11_Basic.a11_HelloTcp --driver-java-options -XX:MaxPermSize=300m target/scala-2.10/sparkstreamingexamples_2.10-1.0.jar
*/
object a11_HelloTcp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Basic_TCP_IP")
val sc = new SparkContext(conf)
// Set Batch Interval
val stc = new StreamingContext(sc, Seconds(2))
// Create an Input stream that receives data from
// TCP/IP Socket
val linesDstream = stc.socketTextStream("localhost", 9999)
val minNsec = this.getMinNsec()
val splittedDs = linesDstream.map{ x =>
x.split("\\|\\|")
.map(_.trim)
}
val filteredListDs = splittedDs.map{ x =>
val retval = for { element <- x
val keyNval = element.split("=")
if keyNval.size >= 2
} yield {
val splitted = element.split("=")
// Create Tuple of Key and Value
splitted(0) -> splitted(1)
}
retval
}
val dnryList = filteredListDs.map{ x =>
x.toMap
}
val filteredDnryListDs = dnryList.filter{ x =>
if (x.getOrElse("data", ()) != ())
true
else
false
}
val keyvaluepairsDs = filteredDnryListDs.map { x =>
val data = x.getOrElse("data", "")
(data, x)
}
val wordsDs = keyvaluepairsDs.flatMap{ x =>
//Underscore (_) is used as a Placeholder indicator
val xsplitted = x._1.split(" ").map(_.trim)
val wordNmeta = for { element <- xsplitted
} yield {
(element, ( 1,
List((x._2.getOrElse("secondscounter", ""))
)))
}
wordNmeta
}
val reducedDs = wordsDs.reduceByKey{ (x, y) =>
val sum = x._1 + y._1
val timelist = x._2 ::: y._2
(sum, timelist)
}
// Print first **10 elements of every batch of data
//reducedDs.print()
// Apply the given function to each RDD in this DStream
reducedDs.foreachRDD{ rdd =>
val minNSec = getMinNsec()
//println(f"Batch at $minNSec%s")
//println("-----------------------")
rdd.collect().foreach(println)
//println("")
}
// Start the Computation & wait for it to terminate
stc.start()
stc.awaitTermination()
}
def getMinNsec() : String = {
val now = Calendar.getInstance().getTime()
val myFormat = new SimpleDateFormat("mm.ss.SS")
val minNmillisec = myFormat.format(now)
val splitted = minNmillisec.split("\\.")
val min = splitted(0)
val sec = splitted(1)
val millisec = splitted(2)
val millisecTruncated = millisec(0)
f"[$min%2s][$sec%2s.$millisecTruncated%s]"
}
}