- scala> import org.apache.spark.streaming._
- import org.apache.spark.streaming._
- scala> val stc = new StreamingContext(sc, Seconds(5))
- stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@36429c28
- scala> val lines = stc.socketTextStream("localhost", 9999)
- lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4feb2798
- scala> val words = lines.flatMap(_.split(" "))
- words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@389785db
- scala> val pairs = words.map(word => (word, 1))
- pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@2fad123b
- scala> val wordCounts = pairs.reduceByKey(_ + _)
- wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@6e9ad193
- scala> wordCounts.print()
- scala> /*wordCounts.foreachRDD { rdd =>
- | rdd.foreach(print)
- | }*/
- | stc.start()
- scala> stc.awaitTermination()
netcat(nc) input
[raj@Rajkumars-MacBook-Pro ~]$nc -lk 9999
This is a test
This is anothet test
Output in spark-shell
-------------------------------------------
Time: 1466196225000 ms
-------------------------------------------
-------------------------------------------
Time: 1466196230000 ms
-------------------------------------------
(a,1)
(is,2)
(test,2)
(anothet,1)
(This,2)