TwitterのApache Storm


この章では、Apache Storm のリアルタイム アプリケーションについて説明します。 Storm が Twitter でどのように使用されるかを見ていきます。

Twitter

Twitter は、ユーザーのツイートを送受信するためのプラットフォームを提供するオンライン ソーシャル ネットワーキング サービスです。登録ユーザーはツイートの閲覧と投稿が可能ですが、未登録ユーザーはツイートの閲覧のみが可能です。ハッシュタグは、関連するキーワードの前に # を追加することで、ツイートをキーワードごとに分類するために使用されます。次に、各トピックで最も使用されているハッシュタグを見つけるリアルタイム シナリオを見てみましょう。

Spout は、人々が投稿したツイートをできるだけ早く受け取るために作成されました。 Twitter は、人々が投稿したツイートをリアルタイムに取得するための Web サービス ベースのツール「Twitter Streaming API」を提供しています。 Twitter Streaming API には、任意のプログラミング言語を使用してアクセスできます。

twitter4j

は、Twitter Streaming API に簡単にアクセスするための Java ベースのモジュールを提供するオープン ソースの非公式 Java ライブラリです。 twitter4j は、ツイートにアクセスするためのリスナーベースのフレームワークを提供します。 Twitter Streaming API にアクセスするには、Twitter 開発者アカウントにログインし、次の OAuth 認証の詳細を取得する必要があります。

Customerkey
  • CustomerSecret
  • accessToken
  • AccessTookenSecret
  • Storm は Twitter スパウトである TwitterSampleSpout を提供します。これをツイートの取得に使用します。電子メールには、OAuth 認証の詳細と少なくとも 1 つのキーワードが必要です。このスパウトはキーワードに基づいてリアルタイムのツイートを発信します。完全なプログラムコードは次のとおりです。

エンコーディング: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

ハッシュタグ リーダー spout

スパウトによって送信されたツイートは、ツイートを処理する

HashtagReaderBolt

に転送され、利用可能なすべてのハッシュタグを発行します。 HashtagReaderBolt は、twitter4j が提供する getHashTagEntities メソッドを使用します。 getHashTagEntities はツイートを読み取り、ハッシュタグのリストを返します。完全なプログラム コードは次のとおりです。
エンコーディング: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

ハッシュタグ カウンター スパウト

発行されたハッシュタグは HashtagCounterBolt に転送されます。このボルトはすべてのハッシュタグを処理し、Java Map オブジェクトを使用して各ハッシュタグとその数をメモリに保存します。完全なプログラムコードは次のとおりです。

エンコーディング: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Submit Topology

Submit Topology はメイン アプリケーションです。 Twitter トポロジは、TwitterSampleSpout、HashtagReaderBolt、および HashtagCounterBolt で構成されます。次のプログラム コードは、トポロジを送信する方法を示しています。

コーディング: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには 4 つの Java コードがあります。それらは次のとおりです -


TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java
  • 次のコマンドを使用してアプリケーションをコンパイルできます -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

次のコマンドを使用してアプリケーションを実行します -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Output

## アプリケーションは、現在利用可能なハッシュタグとその数を出力します。出力は次のようになります -

ああああ