Friday, June 17, 2016

Hello Streaming : A Word Count Example

  1. scala> import org.apache.spark.streaming._
  2. import org.apache.spark.streaming._
  3.  
  4. scala> val stc = new StreamingContext(sc, Seconds(5))
  5. stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@36429c28
  6.  
  7. scala> val lines = stc.socketTextStream("localhost", 9999)
  8. lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4feb2798
  9.  
  10. scala> val words = lines.flatMap(_.split(" "))
  11. words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@389785db
  12.  
  13. scala> val pairs = words.map(word => (word, 1))
  14. pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@2fad123b
  15.  
  16. scala> val wordCounts = pairs.reduceByKey(_ + _)
  17. wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@6e9ad193
  18.  
  19. scala> wordCounts.print()
  20.  
  21. scala> /*wordCounts.foreachRDD { rdd =>
  22. | rdd.foreach(print)
  23. | }*/
  24. | stc.start()
  25.  
  26. scala> stc.awaitTermination()

netcat(nc) input

[raj@Rajkumars-MacBook-Pro ~]$nc -lk 9999
This is a test


This is anothet test

Output in spark-shell

-------------------------------------------
Time: 1466196225000 ms
-------------------------------------------

-------------------------------------------
Time: 1466196230000 ms
-------------------------------------------
(a,1)
(is,2)
(test,2)
(anothet,1)


(This,2)