In this exercise, we’ll use Spark Streaming to window (30s) some tweets on a topic, and display the counts of hashtags in each window every 5 seconds.
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.0",
"org.apache.spark" %% "spark-sql" % "1.4.0",
"org.apache.spark" %% "spark-mllib" % "1.4.0",
"org.apache.spark" %% "spark-graphx" % "1.4.0",
"org.apache.spark" %% "spark-streaming" % "1.4.0",
"org.apache.spark" %% "spark-streaming-twitter" % "1.4.0"
)
object TwitterKeys {
val consumerKey: String = "fill in"
val secret: String = "fill in"
val accessToken: String = "fill in"
val accessTokenSecret: String = "fill in"
}
You’ll need to provide your own access tokens and keys here. In real apps, these would be passed in as args, etc.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
object Main {
def main(args: Array[String]){
val sc = new SparkContext("local[2]", "hello-spark")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", TwitterKeys.consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", TwitterKeys.secret)
System.setProperty("twitter4j.oauth.accessToken", TwitterKeys.accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterKeys.accessTokenSecret)
val filters = Array("software")
val stream = TwitterUtils.createStream(ssc,None, filters)
stream
.window(Seconds(30))
.foreachRDD(x =>{
x.flatMap(_.getHashtagEntities).map(x => (x.getText, 1)).countByKey()
.foreach(println)
})
ssc.start()
ssc.awaitTermination()
sc.stop()
}
}