트위터의 아파치 스톰
이 장에서는 Apache Storm을 사용한 실시간 애플리케이션에 대해 논의합니다. 트위터에서 Storm이 어떻게 사용되는지 살펴보겠습니다.
Twitter는 사용자 트윗을 보내고 받을 수 있는 플랫폼을 제공하는 온라인 소셜 네트워킹 서비스입니다. 등록된 사용자는 트윗을 읽고 게시할 수 있지만, 등록되지 않은 사용자는 트윗을 읽을 수만 있습니다. 해시태그는 관련 키워드 앞에 #을 붙여 트윗을 키워드별로 분류하는 데 사용됩니다. 이제 각 주제에 대해 가장 많이 사용되는 해시태그를 찾기 위해 실시간 시나리오를 살펴보겠습니다.
Spout는 사람들이 제출한 트윗을 최대한 빨리 수신하기 위해 만들어졌습니다. 트위터는 사람들이 제출한 트윗을 실시간으로 검색하기 위한 웹 서비스 기반 도구인 "Twitter Streaming API"를 제공합니다. Twitter 스트리밍 API는 모든 프로그래밍 언어를 사용하여 액세스할 수 있습니다.
twitter4j는 Twitter 스트리밍 API에 쉽게 액세스할 수 있도록 Java 기반 모듈을 제공하는 오픈 소스 및 비공식 Java 라이브러리입니다. twitter4j는 트윗에 액세스할 수 있는 리스너 기반 프레임워크를 제공합니다. Twitter 스트리밍 API에 액세스하려면 Twitter 개발자 계정에 로그인하고 다음 OAuth 인증 세부정보를 얻어야 합니다.
Customerkey- CustomerSecret
- accessToken
- AccessTookenSecret Storm은 스타터 키트에 Twitter Spout인 TwitterSampleSpout를 제공합니다. 우리는 이것을 트윗을 검색하는 데 사용할 것입니다. 이메일에는 OAuth 인증 세부정보와 하나 이상의 키워드가 필요합니다. 이 스파우트는 키워드를 기반으로 실시간 트윗을 내보냅니다. 전체 프로그램 코드는 다음과 같습니다.
인코딩: TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Hashtag Reader spout
Spout에서 내보낸 트윗은
HashtagReaderBolt로 전달되어 트윗을 처리하고 사용 가능한 모든 해시태그를 내보냅니다. HashtagReaderBolt는 twitter4j에서 제공하는 getHashTagEntities 메소드를 사용합니다. getHashTagEntities는 트윗을 읽고 해시태그 목록을 반환합니다. 전체 프로그램 코드는 다음과 같습니다. -
Encoding: HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Hashtag counter spout
내보낸 해시태그는 HashtagCounterBolt로 전달됩니다. 이 볼트는 모든 해시태그를 처리하고 Java Map 개체를 사용하여 각 해시태그와 해당 개수를 메모리에 저장합니다. 전체 프로그램 코드는 다음과 같습니다.
Coding: HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Submit Topology
Submit Topology가 주요 애플리케이션입니다. Twitter 토폴로지는 TwitterSampleSpout, HashtagReaderBolt 및 HashtagCounterBolt로 구성됩니다. 다음 프로그램 코드는 토폴로지를 제출하는 방법을 보여줍니다.
코딩: TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
애플리케이션 빌드 및 실행
전체 애플리케이션에는 4개의 Java 코드가 있습니다.
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java 다음 명령을 사용하여 애플리케이션을 컴파일할 수 있습니다. ee
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.javaOutput