Apache Storm di Twitter
Dalam bab ini, kita akan membincangkan tentang aplikasi masa nyata Apache Storm. Kita akan lihat bagaimana Storm digunakan dalam Twitter.
Twitter ialah perkhidmatan rangkaian sosial dalam talian yang menyediakan platform untuk menghantar dan menerima tweet pengguna. Pengguna berdaftar boleh membaca dan menyiarkan tweet, tetapi pengguna tidak berdaftar hanya boleh membaca tweet. Hashtag digunakan untuk mengkategorikan tweet mengikut kata kunci dengan menambahkan # sebelum kata kunci yang berkaitan. Sekarang mari kita lihat senario masa nyata untuk mencari hashtag yang paling banyak digunakan untuk setiap topik.
Spout dicipta untuk menerima tweet yang dihantar oleh orang secepat mungkin. Twitter menyediakan "API Penstriman Twitter", alat berasaskan perkhidmatan web untuk mendapatkan semula tweet yang dihantar oleh orang dalam masa nyata. API Penstriman Twitter boleh diakses menggunakan mana-mana bahasa pengaturcaraan.
twitter4jialah pustaka Java sumber terbuka dan tidak rasmi yang menyediakan modul berasaskan Java untuk mengakses API Penstriman Twitter dengan mudah. twitter4j menyediakan rangka kerja berasaskan pendengar untuk mengakses tweet. Untuk mengakses Twitter Streaming API, kami perlu log masuk ke akaun pembangun Twitter dan mendapatkan butiran pengesahan OAuth berikut.
Customerkey- CustomerSecret
- accessToken
- AccessTookenSecret Storm menyediakan muncung twitter, TwitterSampleSpout, dalam kit permulaannya. Kami akan menggunakan ini untuk mendapatkan semula tweet. E-mel memerlukan butiran pengesahan OAuth dan sekurang-kurangnya satu kata kunci. Muncung ini akan mengeluarkan tweet masa nyata berdasarkan kata kunci. Kod program lengkap adalah seperti berikut.
Pengekodan: TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Hashtag Reader spout
Tweet yang dikeluarkan oleh spout akan dimajukan ke
HashtagReaderBolt yang akan memproses tweet dan mengeluarkan semua hashtag yang tersedia. HashtagReaderBolt menggunakan kaedah getHashTagEntities yang disediakan oleh twitter4j. getHashTagEntities membaca tweet dan mengembalikan senarai hashtag. Kod program lengkap adalah seperti berikut -
Pengekodan: HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Hashtag counter spout
Hashtag yang dikeluarkan akan dimajukan ke HashtagCounterBolt. Bolt ini akan memproses semua hashtag dan menyimpan setiap hashtag dan kiraannya dalam ingatan menggunakan objek Peta Java. Kod program lengkap adalah seperti berikut.
Pengekodan: HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Submit Topology
Submit Topologi ialah aplikasi utama. Topologi Twitter terdiri daripada TwitterSampleSpout, HashtagReaderBolt dan HashtagCounterBolt. Kod program berikut menunjukkan cara menghantar topologi.
Pengekodan: TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Bina dan jalankan aplikasi
Aplikasi lengkap mempunyai empat kod Java. Ia adalah seperti berikut -
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java
Anda boleh menyusun perintah berikut
Laksanakan aplikasi menggunakan yang berikut arahan -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
Output
Apl akan mencetak hashtags yang tersedia pada masa ini dan kiraan mereka. Output hendaklah serupa dengan yang berikut -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> <keyword1> <keyword2> … <keywordN>