Apache Storm 작업 예


Apache Storm의 핵심 기술 세부 사항을 살펴보았습니다. 이제 몇 가지 간단한 시나리오를 작성할 차례입니다.

시나리오 - 모바일 통화 로그 분석기

모바일 통화 및 통화 시간은 Apache Storm에 대한 입력으로 사용되며 Storm은 총 통화 수와 함께 동일한 발신자와 수신자 간의 통화를 처리하고 그룹화합니다.

Spout Creation

Spout은 데이터 생성에 사용되는 컴포넌트입니다. 기본적으로 Spout는 IRichSpout 인터페이스를 구현합니다. "IRichSpout" 인터페이스에는 Spout의 실행 환경을 제공하기 위해 다음과 같은 중요한 메소드(

  • open)가 있습니다. 실행자는 이 메서드를 실행하여 스프링클러 헤드를 초기화합니다.

  • nextTuple - 생성된 데이터를 수집기를 통해 내보냅니다.

  • close - 스파우트가 닫히려고 할 때 이 메소드가 호출됩니다.

  • declareOutputFields - 튜플의 출력 모드를 선언합니다.

  • ack - 특정 튜플이 처리되었는지 확인합니다.

  • fail - 특정 튜플을 처리하지 않고 재처리하지 않도록 지정합니다.

open

open 메소드의 서명은 다음과 같습니다 -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 이 스파우트에 대한 storm 구성을 제공합니다.

  • context - 토폴로지에서 Spout의 위치, 작업 ID, 입력 및 출력 정보에 대한 완전한 정보를 제공합니다.

  • collector - 볼트로 처리될 튜플을 내보낼 수 있습니다.

nextTuple

nextTuple 메소드의 서명은 다음과 같습니다. -

nextTuple()

nextTuple()은 ack() 및 failure() 메소드와 동일한 루프에서 주기적으로 호출됩니다. 다른 메서드가 호출될 수 있도록 수행할 작업이 없으면 스레드 제어를 해제해야 합니다. 따라서 nextTuple의 첫 번째 줄은 처리가 완료되었는지 확인합니다. 그렇다면 반환하기 전에 프로세서의 로드를 줄이기 위해 최소 1밀리초 동안 절전 모드로 전환해야 합니다.

close

close 메소드의 시그니처는 다음과 같습니다 -

close()

declareOutputFields

declareOutputFields 메소드의 시그니처는 다음과 같습니다 -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - 출력을 선언하는 데 사용됩니다. 스트림 ID, 출력 필드 등

이 방법은 튜플의 출력 모드를 지정하는 데 사용됩니다.

ack

ack 메소드의 서명은 다음과 같습니다. -

ack(Object msgId)

이 메소드는 특정 튜플이 처리되었음을 확인합니다.

fail

nextTuple 메소드의 시그니처는 다음과 같습니다. -

ack(Object msgId)

이 메소드는 특정 튜플이 완전히 처리되지 않았음을 알립니다. Storm은 특정 튜플을 다시 처리합니다.

FakeCallLogReaderSpout

우리 시나리오에서는 통화 기록 세부 정보를 수집해야 합니다. 통화 기록 정보가 포함되어 있습니다.

  • 발신번호
  • 수신번호
  • 지속시간

통화기록에 대한 실시간 정보가 없기 때문에 가짜 통화기록을 생성해 드립니다. Random 클래스를 사용하여 가짜 정보가 생성됩니다. 전체 프로그램 코드는 다음과 같습니다.

Coding - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt 생성

Bolt는 튜플을 입력으로 사용하고, 튜플을 처리하고, 새 튜플을 출력으로 생성하는 구성 요소입니다. Bolts는 IRichBolt인터페이스를 구현합니다. 이 프로그램에서는 두 개의 Bolts
클래스인 CallLogCreatorBoltCallLogCounterBolt을 사용하여 작업을 수행합니다.

IRichBolt 인터페이스에는 다음과 같은 메소드가 있습니다. -

  • prepare - Bolt를 실행할 환경을 제공합니다. 실행자는 이 메서드를 실행하여 Spout를 초기화합니다.

  • execute - 단일 튜플의 입력을 처리합니다.

  • cleanup - Spout가 닫힐 때 호출됩니다.

  • declareOutputFields - 튜플의 출력 모드를 선언합니다.

Prepare

prepare 메소드의 서명은 다음과 같습니다 -


prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - 이 볼트에 대한 Storm 구성을 제공합니다.

  • context - 토폴로지의 볼트 위치, 작업 ID, 입력 및 출력 정보 등에 대한 완전한 정보를 제공합니다.

  • collector - 처리된 튜플을 내보낼 수 있습니다.

execute

execute 메소드의 서명은 다음과 같습니다.

execute(Tuple tuple)

여기서 tuple은 처리할 입력 튜플입니다.

execute 메서드는 한 번에 하나의 튜플을 처리합니다. Tuple 데이터는 Tuple 클래스의 getValue 메소드를 통해 액세스할 수 있습니다. 입력 튜플을 즉시 처리할 필요는 없습니다. 튜플은 단일 출력 튜플로 처리 및 출력될 수 있습니다. 처리된 튜플은 OutputCollector 클래스를 사용하여 내보낼 수 있습니다.

cleanup

cleanup 메소드의 서명은 다음과 같습니다. -

cleanup()

declareOutputFields

declareOutputFields메소드의 서명은 다음과 같습니다. -

declareOutputFields(OutputFieldsDeclarer declarer)

여기 매개변수 declarer는 다음과 같은 용도로 사용됩니다. 출력 스트림 ID, 출력 필드 등을 선언합니다.

이 방법은 튜플의 출력 모드를 지정하는 데 사용됩니다.

통화 기록 생성기 볼트

통화 기록 생성기 볼트는 통화 기록 튜플을 받습니다. 통화 로그 튜플에는 발신자 번호, 수신자 번호 및 통화 시간이 포함됩니다. 이 볼트는 단순히 발신자 번호와 수신자 번호를 결합하여 새로운 값을 생성합니다. 새 값의 형식은 "발신 번호 - 수신자 번호"이며 새 필드의 이름은 "Call"입니다. 전체 코드는 다음과 같습니다.

Encoding - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call Log Counter Bolt

Call Log Creator bolt는 호출 로그 튜플을 받습니다. 통화 로그 튜플에는 발신자 번호, 수신자 번호 및 통화 시간이 포함됩니다. 이 볼트는 단순히 발신자 번호와 수신자 번호를 결합하여 새로운 값을 생성합니다. 새 값의 형식은 "발신 번호 - 수신자 번호"이며 새 필드의 이름은 "Call"입니다. 전체 코드는 다음과 같습니다.

Coding - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

토폴로지 만들기

Storm 토폴로지는 기본적으로 Thrift 구조입니다. TopologyBuilder 클래스는 복잡한 토폴로지를 만드는 간단하고 쉬운 방법을 제공합니다. TopologyBuilder 클래스에는 spout (setSpout) 및 bolt (setBolt) 을 설정하는 메서드가 있습니다. 마지막으로 TopologyBuilder에는 토폴로지를 생성하기 위한 createTopology가 있습니다. 다음 코드 조각을 사용하여 토폴로지를 만듭니다.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping 메서드는 Spout 및 Bolt에 대한 흐름 그룹화를 설정하는 데 도움이 됩니다.

Local Cluster

개발 목적으로 "LocalCluster" 개체를 사용하여 로컬 클러스터를 만든 다음 "LocalCluster" 클래스의 "submitTopology" 메서드를 사용하여 토폴로지를 제출할 수 있습니다. "submitTopology"의 매개변수 중 하나는 "Config" 클래스의 인스턴스입니다. "Config" 클래스는 토폴로지를 제출하기 전에 구성 옵션을 설정하는 데 사용됩니다. 이 구성 옵션은 런타임 시 클러스터 구성과 병합되고 prepare 메서드를 사용하여 모든 작업(Spout 및 Bolt)으로 전송됩니다. 토폴로지가 클러스터에 제출되면 클러스터가 제출된 토폴로지를 계산할 때까지 10초 동안 기다린 다음 "LocalCluster"의 "shutdown" 방법을 사용하여 클러스터를 종료합니다. 전체 프로그램 코드는 다음과 같습니다. -

Coding - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

애플리케이션 빌드 및 실행

완전한 애플리케이션에는 4개의 Java 코드가 있습니다.

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

애플리케이션은 다음 명령을 사용하여 빌드할 수 있습니다.

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

실행 명령 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

애플리케이션이 시작되면 클러스터 시작 프로세스, Spout 및 볼트 처리, 마지막으로 클러스터 종료 프로세스에 대한 전체 세부 정보가 출력됩니다. "CallLogCounterBolt"에서는 통화 및 해당 횟수 세부 정보를 인쇄합니다. 이 정보는 다음과 같이 콘솔에 표시됩니다.

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

JVM이 아닌 언어 ​​

Storm 토폴로지는 Thrift 인터페이스를 통해 구현되므로 어떤 언어로든 토폴로지를 쉽게 제출할 수 있습니다. Storm은 Ruby, Python 및 기타 여러 언어를 지원합니다. Python 바인딩을 살펴보겠습니다.

Python 바인딩

Python은 범용 해석, 대화형, 객체 지향 및 고급 프로그래밍 언어입니다. Storm은 Python을 지원하여 토폴로지를 구현합니다. Python은 실행, 앵커링, 승인 및 로깅 작업을 지원합니다.

아시다시피 Bolt는 모든 언어로 정의될 수 있습니다. 다른 언어로 작성된 Bolt는 하위 프로세스로 실행되고 Storm은 stdin/stdout을 통해 JSON 메시지와 통신합니다. 먼저 Python 바인딩을 지원하는 샘플 볼트 WordCount를 가져옵니다.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

여기의 클래스WordCountIRichBolt인터페이스를 구현하고 슈퍼 메소드 매개변수 "splitword.py"를 지정하는 Python 구현으로 실행됩니다. 이제 "splitword.py"라는 Python 구현을 만듭니다.

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

이것은 주어진 문장에서 단어 수를 세는 Python 구현의 예입니다. 마찬가지로 지원되는 다른 언어에도 바인딩할 수 있습니다.