아파치 스톰 트라이던트


Trident는 Storm의 확장입니다. Storm과 마찬가지로 Trident도 Twitter에서 개발되었습니다. Trident를 개발하는 주된 이유는 Storm을 기반으로 높은 수준의 추상화는 물론 상태 저장 스트리밍 처리 및 지연 시간이 짧은 분산 쿼리를 제공하는 것입니다.

Trident는 스파우트와 볼트를 사용하지만 이러한 하위 수준 구성 요소는 실행 전에 Trident에 의해 자동으로 생성됩니다. Trident에는 기능, 필터, 조인, 그룹화 및 집계가 있습니다.

Trident 프로세스는 트랜잭션이라는 일련의 배치로 스트리밍됩니다. 일반적으로 이러한 미니 배치의 크기는 입력 스트림에 따라 수천 또는 수백만 개의 튜플 정도입니다. 이러한 방식으로 Trident는 튜플별 처리를 수행한다는 점에서 Storm과 다릅니다.

일괄 처리 개념은 데이터베이스 트랜잭션과 매우 유사합니다. 각 거래에는 거래 ID가 할당됩니다. 모든 처리가 완료되면 거래가 성공한 것으로 간주됩니다. 그러나 트랜잭션의 튜플 중 하나를 처리하지 못하면 전체 트랜잭션이 재전송됩니다. 각 배치에 대해 Trident는 트랜잭션 시작 부분에서 BeginCommit을 호출하고 끝 부분에서 커밋을 호출합니다.

Trident Topology

Trident API는 "TridentTopology" 클래스를 사용하여 Trident 토폴로지를 생성하는 간단한 옵션을 제공합니다. 기본적으로 Trident 토폴로지는 유출로부터 입력 스트림을 수신하고 스트림에 대해 정렬된 작업 순서(필터링, 집계, 그룹화 등)를 수행합니다. Storm 튜플은 Trident 튜플로 대체되고 볼트는 작업으로 대체됩니다. 간단한 Trident 토폴로지는 다음과 같이 만들 수 있습니다.

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples는 명명된 값 목록입니다. TridentTuple 인터페이스는 Trident 토폴로지를 위한 데이터 모델입니다. TridentTuple 인터페이스는 Trident 토폴로지에서 처리할 수 있는 데이터의 기본 단위입니다.

Trident Spout

Trident 스파우트는 Storm spout와 유사하며 Trident의 기능을 사용하는 추가 옵션이 있습니다. 실제로 IRichSpout을 계속 사용할 수 있고 Storm 토폴로지에서 사용할 수 있지만 본질적으로 트랜잭션이 아니며 Trident가 제공하는 이점을 사용할 수 없습니다.

트라이던트의 기능을 모두 활용한 기본 스파우트는 "ITridentSpout" 입니다. 트랜잭션과 불투명한 트랜잭션 의미 체계를 지원합니다. 다른 스파우트는 IBatchSpout, IPartitionedTridentSpout 및 IOpaquePartitionedTridentSpout입니다.

이러한 일반 스파우트 외에도 Trident에는 트라이던트 스파우트의 샘플 구현이 많이 있습니다. 그 중 하나는 FeederBatchSpout 출력으로, 일괄 처리, 병렬 처리 등에 대해 걱정할 필요 없이 삼지창 튜플의 명명된 목록을 보내는 데 사용할 수 있습니다.

FeederBatchSpout 생성 및 데이터 공급은 다음과 같이 수행할 수 있습니다. -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident 작업

Trident는 "Trident 작업"을 사용하여 트라이던트 튜플의 입력 스트림을 처리합니다. Trident API에는 단순부터 복잡한 스트림 처리를 처리하기 위한 여러 가지 기본 제공 작업이 있습니다. 이러한 작업은 간단한 유효성 검사부터 복잡한 그룹화 및 삼지창 튜플 집계까지 다양합니다. 가장 중요하고 자주 사용되는 작업을 살펴보겠습니다.

Filtering

필터는 입력 유효성 검사 작업을 수행하는 데 사용되는 개체입니다. Trident 필터는 Trident 튜플 필드의 하위 집합을 입력으로 사용하고 특정 조건이 충족되는지 여부에 따라 true 또는 false를 반환합니다. true가 반환되면 튜플은 출력 스트림에 저장되고, 그렇지 않으면 튜플이 스트림에서 제거됩니다. 필터는 기본적으로 BaseFilter 클래스를 상속하고 isKeep 메소드를 구현합니다. 다음은 필터 작업 구현의 예입니다.

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

필터 함수는 "each" 메서드를 사용하여 토폴로지에서 호출할 수 있습니다. "Fields" 클래스는 입력(삼지창 튜플의 하위 집합)을 지정하는 데 사용할 수 있습니다. 샘플 코드는 다음과 같습니다.

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function은 단일 삼지창 튜플에 대해 간단한 연산을 수행하는 데 사용되는 객체입니다. Trident 튜플 필드의 하위 집합을 취하고 0개 이상의 새로운 Trident 튜플 필드를 내보냅니다.

Function은 기본적으로 BaseFunction 클래스를 상속받아 execute 메소드를 구현합니다. 구현 예는 다음과 같습니다.

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

필터 작업과 유사하게 각 메서드를 사용하여 토폴로지에서 함수 작업을 호출할 수 있습니다. 샘플 코드는 다음과 같습니다. -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

Aggregation은 입력 배치나 파티션 또는 스트림에 대해 집계 작업을 수행하는 데 사용되는 개체입니다. Trident에는 세 가지 유형의 집계가 있습니다.

  • aggregate - 삼지창 튜플의 각 배치를 개별적으로 집계합니다. 집계 프로세스 중에 튜플은 먼저 전역 그룹화를 사용하여 다시 분할되어 동일한 배치의 모든 파티션을 단일 파티션으로 결합합니다.

  • partitionAggregate - 전체 트라이던트 튜플 대신 각 파티션을 집계합니다. 분할된 컬렉션의 출력은 입력 튜플을 완전히 대체합니다. 분할된 컬렉션의 출력에는 단일 필드 튜플이 포함됩니다.

  • persistaggregate - 모든 배치의 모든 삼지창 튜플을 집계하고 결과를 메모리나 데이터베이스에 저장합니다.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

집계 작업은 CombinerAggregator, ReducerAggregator 또는 일반 Aggregator 인터페이스를 사용하여 생성할 수 있습니다. 위의 예에서 사용된 "count" aggregator는 내장된 aggregator 중 하나이며 "CombinerAggregator"를 사용하여 구현되며 다음과 같이 구현됩니다. -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Group

그룹화 작업은 다음과 같은 작업을 수행할 수 있는 내장 작업입니다. groupBy 메소드에 의해 호출됩니다. groupBy 메서드는 지정된 필드에서 partitionBy를 수행하여 스트림을 다시 분할한 다음 각 파티션 내에서 그룹 필드가 동일한 튜플을 결합합니다. 일반적으로 그룹화된 집계를 얻으려면 "groupBy" 및 "perciousAggregate"를 사용합니다. 샘플 코드는 다음과 같습니다 -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merge and Join

Merge와 Join은 각각 "Merge" 메소드와 "Join" 메소드를 사용하여 수행할 수 있습니다. 병합은 하나 이상의 스트림을 결합합니다. 조인은 두 스트림을 검사하고 조인하기 위해 양쪽의 삼지창 튜플 필드를 사용한다는 점을 제외하면 병합과 유사합니다. 또한 조인은 일괄 처리 수준에서만 작동합니다. 샘플 코드는 다음과 같습니다 -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

상태 유지

Trident는 상태 유지를 위한 메커니즘을 제공합니다. 상태 정보는 토폴로지 자체에 저장되거나 별도의 데이터베이스에 저장될 수 있습니다. 이유는 상태를 유지하기 위함이며, 처리 중 튜플이 실패하면 실패한 튜플을 다시 시도합니다. 이 튜플의 상태가 이전에 업데이트되었는지 확실하지 않기 때문에 상태를 업데이트할 때 문제가 발생합니다. 상태를 업데이트하기 전에 튜플이 실패한 경우 튜플을 다시 시도하면 상태가 안정화됩니다. 그러나 상태를 업데이트한 후 튜플이 실패하는 경우 동일한 튜플을 다시 시도하면 데이터베이스의 개수가 다시 증가하여 상태가 불안정해집니다. 메시지가 한 번만 처리되도록 하려면 다음 단계가 필요합니다.

  • 튜플을 작은 배치로 처리합니다.

  • 각 배치에 고유 ID를 할당합니다. 일괄 처리를 다시 시도하면 동일한 고유 ID가 부여됩니다.

  • 상태 업데이트는 일괄적으로 정렬됩니다. 예를 들어, 상태 업데이트의 첫 번째 일괄 처리가 완료될 때까지 상태 업데이트의 두 번째 일괄 처리는 불가능합니다.

분산 RPC

분산 RPC는 Trident 토폴로지 결과를 쿼리하고 검색하는 데 사용됩니다. Storm에는 분산 RPC 서버가 내장되어 있습니다. 분산 RPC 서버는 클라이언트로부터 RPC 요청을 수신하고 이를 토폴로지로 전달합니다. 토폴로지는 요청을 처리하고 그 결과를 분산 RPC 서버로 보내고, 그 결과를 클라이언트로 리디렉션합니다. Trident의 분산 RPC 쿼리는 이러한 쿼리가 병렬로 실행된다는 점을 제외하면 일반 RPC 쿼리처럼 실행됩니다.

트라이던트는 언제 사용하나요?

많은 사용 사례에서 쿼리를 한 번만 처리해야 하는 경우 Trident에서 토폴로지를 작성하여 이를 달성할 수 있습니다. 반면 Storm의 경우 정확한 원샷 처리가 어렵습니다. 따라서 Trident는 한 번에 처리해야 하는 사용 사례에 유용합니다. Trident는 Storm에 복잡성을 추가하고 상태를 관리하므로 모든 사용 사례, 특히 고성능 사용 사례에 적합하지 않습니다.

Trident의 작업 예

이전 섹션에서 개발한 통화 기록 분석 애플리케이션을 Trident 프레임워크로 변환하겠습니다. 높은 수준의 API 덕분에 Trident 애플리케이션은 일반 Storm보다 더 쉽습니다. Storm은 기본적으로 Trident에서 Function, Filter, Aggregate, GroupBy, Join 및 Merge 작업 중 하나를 수행해야 합니다. 마지막으로 LocalDRPC 클래스를 사용하여 DRPC 서버를 시작하고 LocalDRPC 클래스의 실행 메서드를 사용하여 일부 키워드를 검색합니다.

통화 정보 형식 지정

FormatCall 클래스의 목적은 "발신자 번호" 및 "수신자 번호"를 포함한 통화 정보 형식을 지정하는 것입니다. 전체 프로그램 코드는 다음과 같습니다. -

Encoding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 클래스의 목적은 "쉼표(,)"를 기준으로 입력 문자열을 분할하고 문자열의 각 단어를 내보내는 것입니다. 이 함수는 분산 쿼리의 입력 매개변수를 구문 분석하는 데 사용됩니다. 전체 코드는 다음과 같습니다 -

코딩: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

로그 분석기

이것이 주요 애플리케이션입니다. 처음에 애플리케이션은 FeederBatchSpout을 사용하여 TridentTopology를 초기화하고 발신자 정보를 제공합니다. Trident 토폴로지 스트림은 TridentTopology 클래스의 newStream 메서드를 사용하여 생성할 수 있습니다. 마찬가지로 TridentTopology 클래스의 newDRCPStream 메서드를 사용하여 Trident 토폴로지 DRPC 스트림을 생성할 수 있습니다. LocalDRPC 클래스 를 사용하여 간단한 DRCP 서버를 만들 수 있습니다. LocalDRPC에는 일부 키워드를 검색하는 실행 메소드가 있습니다. 전체 코드는 다음과 같습니다.

코딩: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

애플리케이션 빌드 및 실행

전체 애플리케이션에는 세 개의 Java 코드가 있습니다.

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

다음 명령을 사용하여 응용 프로그램을 빌드할 수 있습니다. -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

응용 프로그램은 다음 명령을 사용하여 실행할 수 있습니다.

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

애플리케이션이 시작되면 애플리케이션은 클러스터 시작 프로세스, 작업 처리, DRPC 서버 및 클라이언트 정보, 마지막으로 클러스터 종료 프로세스에 대한 전체 세부 정보를 출력합니다. 이 출력은 아래와 같이 콘솔에 표시됩니다.

rreee