Apache Storm 작업 예
Apache Storm의 핵심 기술 세부 사항을 살펴보았습니다. 이제 몇 가지 간단한 시나리오를 작성할 차례입니다.
시나리오 - 모바일 통화 로그 분석기
모바일 통화 및 통화 시간은 Apache Storm에 대한 입력으로 사용되며 Storm은 총 통화 수와 함께 동일한 발신자와 수신자 간의 통화를 처리하고 그룹화합니다.
Spout Creation
Spout은 데이터 생성에 사용되는 컴포넌트입니다. 기본적으로 Spout는 IRichSpout 인터페이스를 구현합니다. "IRichSpout" 인터페이스에는 Spout의 실행 환경을 제공하기 위해 다음과 같은 중요한 메소드(
open)가 있습니다. 실행자는 이 메서드를 실행하여 스프링클러 헤드를 초기화합니다.
nextTuple - 생성된 데이터를 수집기를 통해 내보냅니다.
close - 스파우트가 닫히려고 할 때 이 메소드가 호출됩니다.
declareOutputFields - 튜플의 출력 모드를 선언합니다.
ack - 특정 튜플이 처리되었는지 확인합니다.
fail - 특정 튜플을 처리하지 않고 재처리하지 않도록 지정합니다.
open
open 메소드의 서명은 다음과 같습니다 -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 이 스파우트에 대한 storm 구성을 제공합니다.
context - 토폴로지에서 Spout의 위치, 작업 ID, 입력 및 출력 정보에 대한 완전한 정보를 제공합니다.
collector - 볼트로 처리될 튜플을 내보낼 수 있습니다.
nextTuple
nextTuple 메소드의 서명은 다음과 같습니다. -
nextTuple()
nextTuple()은 ack() 및 failure() 메소드와 동일한 루프에서 주기적으로 호출됩니다. 다른 메서드가 호출될 수 있도록 수행할 작업이 없으면 스레드 제어를 해제해야 합니다. 따라서 nextTuple의 첫 번째 줄은 처리가 완료되었는지 확인합니다. 그렇다면 반환하기 전에 프로세서의 로드를 줄이기 위해 최소 1밀리초 동안 절전 모드로 전환해야 합니다.
close
close 메소드의 시그니처는 다음과 같습니다 -
close()
declareOutputFields
declareOutputFields 메소드의 시그니처는 다음과 같습니다 -
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - 출력을 선언하는 데 사용됩니다. 스트림 ID, 출력 필드 등
이 방법은 튜플의 출력 모드를 지정하는 데 사용됩니다.
ack
ack 메소드의 서명은 다음과 같습니다. -
ack(Object msgId)
이 메소드는 특정 튜플이 처리되었음을 확인합니다.
fail
nextTuple 메소드의 시그니처는 다음과 같습니다. -
ack(Object msgId)
이 메소드는 특정 튜플이 완전히 처리되지 않았음을 알립니다. Storm은 특정 튜플을 다시 처리합니다.
FakeCallLogReaderSpout
우리 시나리오에서는 통화 기록 세부 정보를 수집해야 합니다. 통화 기록 정보가 포함되어 있습니다.
- 발신번호
- 수신번호
- 지속시간
통화기록에 대한 실시간 정보가 없기 때문에 가짜 통화기록을 생성해 드립니다. Random 클래스를 사용하여 가짜 정보가 생성됩니다. 전체 프로그램 코드는 다음과 같습니다.
Coding - FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bolt 생성
Bolt는 튜플을 입력으로 사용하고, 튜플을 처리하고, 새 튜플을 출력으로 생성하는 구성 요소입니다. Bolts는 IRichBolt인터페이스를 구현합니다. 이 프로그램에서는 두 개의 Bolts
클래스인 CallLogCreatorBolt 및 CallLogCounterBolt을 사용하여 작업을 수행합니다.
IRichBolt 인터페이스에는 다음과 같은 메소드가 있습니다. -
prepare - Bolt를 실행할 환경을 제공합니다. 실행자는 이 메서드를 실행하여 Spout를 초기화합니다.
execute - 단일 튜플의 입력을 처리합니다.
cleanup - Spout가 닫힐 때 호출됩니다.
declareOutputFields - 튜플의 출력 모드를 선언합니다.
Prepare
prepare 메소드의 서명은 다음과 같습니다 -prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - 이 볼트에 대한 Storm 구성을 제공합니다.
context - 토폴로지의 볼트 위치, 작업 ID, 입력 및 출력 정보 등에 대한 완전한 정보를 제공합니다.
collector - 처리된 튜플을 내보낼 수 있습니다.
execute
execute 메소드의 서명은 다음과 같습니다.
execute(Tuple tuple)
여기서 tuple은 처리할 입력 튜플입니다.
execute 메서드는 한 번에 하나의 튜플을 처리합니다. Tuple 데이터는 Tuple 클래스의 getValue 메소드를 통해 액세스할 수 있습니다. 입력 튜플을 즉시 처리할 필요는 없습니다. 튜플은 단일 출력 튜플로 처리 및 출력될 수 있습니다. 처리된 튜플은 OutputCollector 클래스를 사용하여 내보낼 수 있습니다.
cleanup
cleanup 메소드의 서명은 다음과 같습니다. -
cleanup()
declareOutputFields
declareOutputFields메소드의 서명은 다음과 같습니다. -
declareOutputFields(OutputFieldsDeclarer declarer)
여기 매개변수 declarer는 다음과 같은 용도로 사용됩니다. 출력 스트림 ID, 출력 필드 등을 선언합니다.
이 방법은 튜플의 출력 모드를 지정하는 데 사용됩니다.
통화 기록 생성기 볼트
통화 기록 생성기 볼트는 통화 기록 튜플을 받습니다. 통화 로그 튜플에는 발신자 번호, 수신자 번호 및 통화 시간이 포함됩니다. 이 볼트는 단순히 발신자 번호와 수신자 번호를 결합하여 새로운 값을 생성합니다. 새 값의 형식은 "발신 번호 - 수신자 번호"이며 새 필드의 이름은 "Call"입니다. 전체 코드는 다음과 같습니다.
Encoding - CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Call Log Counter Bolt
Call Log Creator bolt는 호출 로그 튜플을 받습니다. 통화 로그 튜플에는 발신자 번호, 수신자 번호 및 통화 시간이 포함됩니다. 이 볼트는 단순히 발신자 번호와 수신자 번호를 결합하여 새로운 값을 생성합니다. 새 값의 형식은 "발신 번호 - 수신자 번호"이며 새 필드의 이름은 "Call"입니다. 전체 코드는 다음과 같습니다.
Coding - CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
토폴로지 만들기
Storm 토폴로지는 기본적으로 Thrift 구조입니다. TopologyBuilder 클래스는 복잡한 토폴로지를 만드는 간단하고 쉬운 방법을 제공합니다. TopologyBuilder 클래스에는 spout (setSpout) 및 bolt (setBolt) 을 설정하는 메서드가 있습니다. 마지막으로 TopologyBuilder에는 토폴로지를 생성하기 위한 createTopology가 있습니다. 다음 코드 조각을 사용하여 토폴로지를 만듭니다.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping 및 fieldsGrouping 메서드는 Spout 및 Bolt에 대한 흐름 그룹화를 설정하는 데 도움이 됩니다.
Local Cluster
개발 목적으로 "LocalCluster" 개체를 사용하여 로컬 클러스터를 만든 다음 "LocalCluster" 클래스의 "submitTopology" 메서드를 사용하여 토폴로지를 제출할 수 있습니다. "submitTopology"의 매개변수 중 하나는 "Config" 클래스의 인스턴스입니다. "Config" 클래스는 토폴로지를 제출하기 전에 구성 옵션을 설정하는 데 사용됩니다. 이 구성 옵션은 런타임 시 클러스터 구성과 병합되고 prepare 메서드를 사용하여 모든 작업(Spout 및 Bolt)으로 전송됩니다. 토폴로지가 클러스터에 제출되면 클러스터가 제출된 토폴로지를 계산할 때까지 10초 동안 기다린 다음 "LocalCluster"의 "shutdown" 방법을 사용하여 클러스터를 종료합니다. 전체 프로그램 코드는 다음과 같습니다. -
Coding - LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
애플리케이션 빌드 및 실행
완전한 애플리케이션에는 4개의 Java 코드가 있습니다.
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
애플리케이션은 다음 명령을 사용하여 빌드할 수 있습니다.
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
실행 명령 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
애플리케이션이 시작되면 클러스터 시작 프로세스, Spout 및 볼트 처리, 마지막으로 클러스터 종료 프로세스에 대한 전체 세부 정보가 출력됩니다. "CallLogCounterBolt"에서는 통화 및 해당 횟수 세부 정보를 인쇄합니다. 이 정보는 다음과 같이 콘솔에 표시됩니다.
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
JVM이 아닌 언어
Storm 토폴로지는 Thrift 인터페이스를 통해 구현되므로 어떤 언어로든 토폴로지를 쉽게 제출할 수 있습니다. Storm은 Ruby, Python 및 기타 여러 언어를 지원합니다. Python 바인딩을 살펴보겠습니다.
Python 바인딩
Python은 범용 해석, 대화형, 객체 지향 및 고급 프로그래밍 언어입니다. Storm은 Python을 지원하여 토폴로지를 구현합니다. Python은 실행, 앵커링, 승인 및 로깅 작업을 지원합니다.
아시다시피 Bolt는 모든 언어로 정의될 수 있습니다. 다른 언어로 작성된 Bolt는 하위 프로세스로 실행되고 Storm은 stdin/stdout을 통해 JSON 메시지와 통신합니다. 먼저 Python 바인딩을 지원하는 샘플 볼트 WordCount를 가져옵니다.
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
여기의 클래스WordCount는 IRichBolt인터페이스를 구현하고 슈퍼 메소드 매개변수 "splitword.py"를 지정하는 Python 구현으로 실행됩니다. 이제 "splitword.py"라는 Python 구현을 만듭니다.
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
이것은 주어진 문장에서 단어 수를 세는 Python 구현의 예입니다. 마찬가지로 지원되는 다른 언어에도 바인딩할 수 있습니다.