Apache Tempête sur Twitter
Dans ce chapitre, nous discuterons des applications temps réel avec Apache Storm. Nous verrons comment Storm est utilisé sur Twitter.
Twitter est un service de réseau social en ligne qui fournit une plate-forme pour envoyer et recevoir des tweets d'utilisateurs. Les utilisateurs enregistrés peuvent lire et publier des tweets, mais les utilisateurs non enregistrés ne peuvent lire que les tweets. Le hashtag est utilisé pour classer les tweets par mot-clé en ajoutant # avant les mots-clés pertinents. Examinons maintenant un scénario en temps réel pour trouver les hashtags les plus utilisés pour chaque sujet.
Spout a été créé pour recevoir le plus rapidement possible les tweets soumis par les personnes. Twitter fournit la « Twitter Streaming API », un outil basé sur un service Web permettant de récupérer les tweets soumis par les personnes en temps réel. L'API Twitter Streaming est accessible à l'aide de n'importe quel langage de programmation.
twitter4jest une bibliothèque Java open source et non officielle qui fournit un module basé sur Java pour accéder facilement à l'API Twitter Streaming. twitter4j fournit un cadre basé sur un auditeur pour accéder aux tweets. Pour accéder à l'API Twitter Streaming, nous devons nous connecter au compte de développeur Twitter et obtenir les détails d'authentification OAuth suivants.
Customerkey- CustomerSecret
- accessToken
- AccessTookenSecret Storm fournit un bec Twitter, TwitterSampleSpout, dans son kit de démarrage. Nous l'utiliserons pour récupérer les tweets. L'e-mail nécessite des détails d'authentification OAuth et au moins un mot-clé. Ce bec émettra des tweets en temps réel basés sur des mots-clés. Le code complet du programme est le suivant.
Encodage : TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Hashtag Reader bec
Les tweets émis par le bec seront transmis au
HashtagReaderBolt qui traitera le tweet et émettra tous les hashtags disponibles. HashtagReaderBolt utilise la méthode getHashTagEntities fournie par twitter4j. getHashTagEntities lit les tweets et renvoie une liste de hashtags. Le code complet du programme est le suivant -
Encodage : HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bec du compteur de hashtag
Le hashtag émis sera transmis à HashtagCounterBolt. Ce boulon traitera tous les hashtags et enregistrera chaque hashtag et son décompte en mémoire à l'aide d'un objet Java Map. Le code complet du programme est le suivant.
Codage : HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Soumettre la topologie
Soumettre la topologie est l'application principale. La topologie Twitter se compose de TwitterSampleSpout, HashtagReaderBolt et HashtagCounterBolt. Le code de programme suivant montre comment soumettre une topologie.
Codage : TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Créez et exécutez l'application
L'application complète comporte quatre codes Java. Ils sont les suivants -
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java Vous pouvez compiler l'application en utilisant la commande suivante -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
Exécutez l'application en utilisant ce qui suit command -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> <keyword1> <keyword2> … <keywordN>
Output
L'application imprimera les hashtags actuellement disponibles et leur nombre. Le résultat devrait être similaire à ce qui suit :
Result: jazztastic : 1 Result: foodie : 1 Result: Redskins : 1 Result: Recipe : 1 Result: cook : 1 Result: android : 1 Result: food : 2 Result: NoToxicHorseMeat : 1 Result: Purrs4Peace : 1 Result: livemusic : 1 Result: VIPremium : 1 Result: Frome : 1 Result: SundayRoast : 1 Result: Millennials : 1 Result: HealthWithKier : 1 Result: LPs30DaysofGratitude : 1 Result: cooking : 1 Result: gameinsight : 1 Result: Countryfile : 1 Result: androidgames : 1