In this exercise, we’ll use Spark Streaming to window (30s) some tweets on a topic, and display the counts of hashtags in each window every 5 seconds.

Steps

  1. git checkout ex6
  2. Add the dependencies:
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.0",
  "org.apache.spark" %% "spark-sql" % "1.4.0",
  "org.apache.spark" %% "spark-mllib" % "1.4.0",
  "org.apache.spark" %% "spark-graphx" % "1.4.0",
  "org.apache.spark" %% "spark-streaming" % "1.4.0",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.4.0"
)
  1. Add a class called TwiiterKeys:
object TwitterKeys {
  val consumerKey: String = "fill in"
  val secret: String = "fill in"
  val accessToken: String = "fill in"
  val accessTokenSecret: String = "fill in"
}

You’ll need to provide your own access tokens and keys here. In real apps, these would be passed in as args, etc.

  1. Imports:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
  1. Main:
object Main {
  def main(args: Array[String]){
    val sc = new SparkContext("local[2]", "hello-spark")
    val ssc = new StreamingContext(sc, Seconds(5))

    System.setProperty("twitter4j.oauth.consumerKey", TwitterKeys.consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", TwitterKeys.secret)
    System.setProperty("twitter4j.oauth.accessToken", TwitterKeys.accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterKeys.accessTokenSecret)

    val filters = Array("software")
    val stream = TwitterUtils.createStream(ssc,None, filters)

    stream
      .window(Seconds(30))
      .foreachRDD(x =>{
        x.flatMap(_.getHashtagEntities).map(x => (x.getText, 1)).countByKey()
        .foreach(println)
      })


    ssc.start()
    ssc.awaitTermination()

    sc.stop()
  }
}