Apache Storm on Twitter
In this chapter, we will discuss real-time applications of Apache Storm. We will see how Storm is used in Twitter.
Twitter is an online social networking service that provides a platform for sending and receiving user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before relevant keywords. Now let’s look at a real-time scenario to find the most used hashtags for each topic.
Spout was created to receive tweets submitted by people as quickly as possible. Twitter provides the "Twitter Streaming API", a web service-based tool for retrieving tweets submitted by people in real time. The Twitter Streaming API can be accessed using any programming language.
twitter4jis an open source and unofficial Java library that provides a Java-based module to easily access the Twitter Streaming API. twitter4j provides a listener-based framework to access tweets. To access the Twitter Streaming API, we need to log in to the Twitter developer account and obtain the following OAuth authentication details.
Customerkey- CustomerSecret
- accessToken
- AccessTookenSecret Storm provides a twitter spout,TwitterSampleSpout. We will use this to retrieve tweets. The email requires OAuth authentication details and at least one keyword. This spout will emit real-time tweets based on keywords. The complete program code is as follows.
Encoding: TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Hashtag Reader spout
Tweets sent by the spout will be forwarded to
HashtagReaderBolt, which will handle the tweet and emit all available hashtags. HashtagReaderBolt uses the getHashTagEntities method provided by twitter4j. getHashTagEntities reads tweets and returns a list of hashtags. The complete program code is as follows -
Encoding: HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Hashtag counter spout
The hashtag issued will be forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save each hashtag and its count in memory using a Java Map object. The complete program code is as follows.
Encoding: HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Submit Topology
Submit Topology is the main application. The Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt and HashtagCounterBolt. The following program code shows how to submit a topology.
Coding: TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Building and running the application
The complete application has four Java codes. They are as follows -
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java You can compile the application using the following command -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
Execute the application using the following command -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> <keyword1> <keyword2> … <keywordN>
Output
The application will print the currently available hashtags and their count. The output should resemble the following -
Result: jazztastic : 1 Result: foodie : 1 Result: Redskins : 1 Result: Recipe : 1 Result: cook : 1 Result: android : 1 Result: food : 2 Result: NoToxicHorseMeat : 1 Result: Purrs4Peace : 1 Result: livemusic : 1 Result: VIPremium : 1 Result: Frome : 1 Result: SundayRoast : 1 Result: Millennials : 1 Result: HealthWithKier : 1 Result: LPs30DaysofGratitude : 1 Result: cooking : 1 Result: gameinsight : 1 Result: Countryfile : 1 Result: androidgames : 1