Friday, June 17, 2016

Hello Streaming : A Sliding Window Example

  1. scala> import org.apache.spark.streaming._
  2. import org.apache.spark.streaming._
  3.  
  4. scala> val stc = new StreamingContext(sc, Seconds(3))
  5. stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4374cd21
  6.  
  7. scala> val lines = stc.socketTextStream("localhost", 9999)
  8. lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@1e3ef59f
  9.  
  10. scala> val words = lines.flatMap(_.split(" "))
  11. words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@61cbbfd7
  12.  
  13. scala> val pairs = words.map(word => (word, 1))
  14. pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@5b8088e
  15.  
  16. scala> //val wordCounts = pairs.reduceByKey(_ + _)
  17.  
  18. scala> val wordCounts = pairs.reduceByKeyAndWindow(((x:Int, y:Int) => x + y),
  19. | Seconds(15), Seconds(3))
  20. wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@7ca28846
  21.  
  22. scala> wordCounts.foreachRDD { rdd =>
  23. | println("-------------------")
  24. | rdd.foreach(println)
  25. | }
  26.  
  27. scala> stc.start()
  28.  
  29. scala> stc.awaitTermination()

netcat(nc) Input

mountain@mountain:~$ nc -lk 9999
1
2
3
4
5
6
7
8
9
10
11
12
13
14

15

Output from spark-shell

-------------------
(3,1)
(5,1)
(1,1)
(6,1)
(4,1)
(2,1)
-------------------
(2,1)
(4,1)
(6,1)
(3,1)
(5,1)
(9,1)
(1,1)
(8,1)
(7,1)
-------------------
(5,1)
(4,1)
(6,1)
(2,1)
(13,1)
(8,1)
(11,1)
(3,1)
(9,1)
(12,1)
(1,1)
(7,1)
(10,1)
-------------------
(2,1)
(13,1)
(6,1)
(5,1)
(14,1)
(3,1)
(9,1)
(12,1)
(1,1)
(8,1)
(11,1)
(4,1)
(15,1)
(7,1)
(10,1)
-------------------
(5,1)
(2,1)
(13,1)
(6,1)
(14,1)
(3,1)
(4,1)
(15,1)
(9,1)
(12,1)
(1,1)
(8,1)
(11,1)
(7,1)
(10,1)
-------------------
(13,1)
(15,1)
(8,1)
(11,1)
(9,1)
(12,1)
(14,1)
(7,1)
(10,1)
-------------------
(11,1)
(13,1)
(15,1)
(12,1)
(14,1)
(10,1)
-------------------
(14,1)
(15,1)

-------------------