-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPopularHashtags.scala
More file actions
78 lines (60 loc) · 2.82 KB
/
PopularHashtags.scala
File metadata and controls
78 lines (60 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
/** Listens to a stream of Tweets and keeps track of the most popular
* hashtags over a 5 minute window.
*/
object PopularHashtags {
/** Makes sure only ERROR messages get logged to avoid log spam. */
def setupLogging() = {
import org.apache.log4j.{Level, Logger}
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
}
/** Configures Twitter service credentials using twiter.txt in the main workspace directory */
def setupTwitter() = {
import scala.io.Source
for (line <- Source.fromFile("../twitter.txt").getLines) {
val fields = line.split(" ")
if (fields.length == 2) {
System.setProperty("twitter4j.oauth." + fields(0), fields(1))
}
}
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Configure Twitter credentials using twitter.txt
setupTwitter()
// Set up a Spark streaming context named "PopularHashtags" that runs locally using
// all CPU cores and one-second batches of data
val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
// Get rid of log spam (should be called after the context is set up)
setupLogging()
// Create a DStream from Twitter using our streaming context
val tweets = TwitterUtils.createStream(ssc, None)
// Now extract the text of each status update into DStreams using map()
val statuses = tweets.map(status => status.getText())
// Blow out each word into a new DStream
val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
// Now eliminate anything that's not a hashtag
val hashtags = tweetwords.filter(word => word.startsWith("#"))
// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
// Now count them up over a 5 minute window sliding every one second
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
// You will often see this written in the following shorthand:
//val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
// Sort the results by the count values
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
// Print the top 10
sortedResults.print
// Set a checkpoint directory, and kick it all off
// I could watch this all day!
ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}