アパッチ ストーム トライデント


Trident は Storm の拡張機能です。 Storm と同様に、Trident も Twitter によって開発されています。 Trident を開発する主な理由は、Storm 上で高レベルの抽象化、ステートフル ストリーミング処理、低遅延の分散クエリを提供することです。

Trident はスパウトとボルトを使用しますが、これらの下位コンポーネントは実行前に Trident によって自動的に生成されます。 Trident には、関数、フィルター、結合、グループ化、集計があります。

Trident プロセスは、トランザクションと呼ばれる一連のバッチにストリームされます。通常、これらのミニバッチのサイズは、入力ストリームに応じて、数千または数百万のタプルのオーダーになります。このように、Trident はタプルごとに処理を行う点で Storm とは異なります。

バッチ処理の概念はデータベース トランザクションと非常によく似ています。各トランザクションにはトランザクション ID が割り当てられます。すべての処理が完了すると、トランザクションは成功したとみなされます。ただし、トランザクションのタプルの 1 つの処理に失敗すると、トランザクション全体が再送信されます。各バッチについて、Trident はトランザクションの開始時に beginCommit を呼び出し、最後にコミットします。

Trident トポロジ

Trident API は、「TridentTopology」クラスを使用して Trident トポロジを作成する簡単なオプションを公開します。基本的に、Trident トポロジはアウトフローから入力ストリームを受け取り、そのストリームに対して順序付けられた一連の操作 (フィルタリング、集約、グループ化など) を実行します。 Storm タプルは Trident タプルに置き換えられ、ボルトはオペレーションに置き換えられます。単純な Trident トポロジは次のように作成できます。

TridentTopology topology = new TridentTopology();

Trident タプル

Trident タプルは名前付きの値のリストです。 TridentTuple インターフェイスは、Trident トポロジのデータ モデルです。 TridentTuple インターフェイスは、Trident トポロジで処理できるデータの基本単位です。

トライデントの注ぎ口

トライデントの注ぎ口はストームの注ぎ口に似ていますが、トライデントの機能を使用する追加のオプションがあります。実際には、IRichSpout を引き続き使用することができ、Storm トポロジーで使用していますが、本質的に非トランザクションであるため、Trident が提供する利点を使用することはできません。

トライデントの特性を使用するためのすべての機能を備えたベーススパウトは「ITridentSpout」です。トランザクションと不透明なトランザクション セマンティクスをサポートします。他のスパウトには、IBatchSpout、IPartitionedTridentSpout、および IOpaquePartitionedTridentSpout があります。

これらの一般的なスパウトに加えて、Trident にはトライデント スパウトのサンプル実装が多数あります。そのうちの 1 つは FeederBatchSpout 出力です。これを使用すると、バッチ処理や並列処理などを気にせずに、トライデント タプルの名前付きリストを送信できます。

FeederBatchSpout の作成とデータ フィードは次のように行うことができます -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident 操作

Trident は、「Trident 操作」に依存して trident の入力ストリームを処理します。タプル。 Trident API には、単純なストリーム処理から複雑なストリーム処理までを処理するための複数の組み込みオペレーションがあります。これらの操作は、単純な検証から、トライデント タプルの複雑なグループ化および集計まで多岐にわたります。最も重要で頻繁に使用される操作を見てみましょう。

フィルタリング

フィルタは、入力検証タスクを実行するために使用されるオブジェクトです。 Trident フィルターは、trident タプル フィールドのサブセットを入力として受け取り、特定の条件が満たされるかどうかに基づいて true または false を返します。 true が返された場合、タプルは出力ストリームに保存され、それ以外の場合、タプルはストリームから削除されます。フィルターは基本的に BaseFilter クラスを継承し、isKeep メソッドを実装します。フィルター操作の実装例を次に示します。

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

フィルター関数は、「each」メソッドを使用してトポロジ内で呼び出すことができます。 「Fields」クラスは、入力 (トライデント タプルのサブセット) を指定するために使用できます。サンプル コードは次のとおりです。

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function は、単一の trident タプルに対して単純な操作を実行するために使用されるオブジェクトです。トライデント タプル フィールドのサブセットを取得し、0 個以上の新しいトライデント タプル フィールドを生成します。

関数 は、基本的に BaseFunction クラスから継承し、execute メソッドを実装します。実装例を以下に示します。

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

フィルター操作と同様に、関数操作は、各メソッドを使用してトポロジ内で呼び出すことができます。サンプル コードは次のとおりです。

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

Aggregation は、入力バッチ、パーティション、またはストリームに対して集計操作を実行するために使用されるオブジェクトです。 Trident には 3 種類の集計があります。それらは次のとおりです -

  • aggregate - トライデント タプルの各バッチを個別に集計します。集約プロセス中、最初にグローバル グループ化を使用してタプルが再パーティション化され、同じバッチのすべてのパーティションが 1 つのパーティションに結合されます。

  • partitionAggregate - trident タプル全体ではなく、各パーティションを集約します。パーティション化されたコレクションの出力は、入力タプルを完全に置き換えます。パーティション化されたコレクションの出力には、単一のフィールド タプルが含まれます。

  • persistenttaggregate - すべてのバッチ内のすべての trident タプルを集約し、結果をメモリまたはデータベースに保存します。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

集計操作は、CombinerAggregator、ReducerAggregator、または汎用 Aggregator インターフェイスを使用して作成できます。上記の例で使用されている「count」アグリゲーターは、組み込みアグリゲーターの 1 つであり、「CombinerAggregator」を使用して実装され、次のように実装されます -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

グループ化操作は組み込み操作であり、groupBy メソッドによって呼び出すことができます。 groupBy メソッドは、指定されたフィールドに対して PartitionBy を実行することによってストリームを再分割し、各パーティション内でグループ フィールドが等しいタプルを結合します。通常、グループ化された集計を取得するには、「groupBy」と「persistentAggregate」を使用します。サンプル コードは次のとおりです。

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merge と Join

マージと結合は、それぞれ "Merge" メソッドと "Join" メソッドを使用して実行できます。 Merge は 1 つ以上のストリームを結合します。結合はマージと似ていますが、結合では両側のトライデント タプル フィールドを使用して 2 つのストリームを調べて結合するという点が異なります。さらに、結合はバッチ レベルでのみ機能します。サンプルコードは次のとおりです -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

状態維持

Trident は状態維持のためのメカニズムを提供します。状態情報は、トポロジ自体に保存することも、別のデータベースに保存することもできます。その理由は、状態を維持するためであり、処理中にタプルが失敗した場合は、失敗したタプルが再試行されます。このタプルの状態が以前に更新されたかどうかがわからないため、状態を更新するときに問題が発生します。状態を更新する前にタプルが失敗した場合、タプルを再試行すると状態が安定します。ただし、状態の更新後にタプルが失敗した場合、同じタプルを再試行すると、データベース内のカウントが再び増加し、状態が不安定になります。メッセージが 1 回だけ処理されるようにするには、次の手順が必要です。

  • タプルを小さなバッチで処理します。

  • 各バッチに一意​​の ID を割り当てます。バッチが再試行されると、同じ一意の ID が与えられます。

  • ステータス更新はバッチ間でソートされます。たとえば、ステータス更新の 2 番目のバッチは、ステータス更新の最初のバッチが完了するまで実行できません。

分散 RPC

分散 RPC は、Trident トポロジ結果のクエリと取得に使用されます。 Storm には分散 RPC サーバーが組み込まれています。分散 RPC サーバーは、クライアントから RPC 要求を受信し、それをトポロジに配信します。トポロジは要求を処理し、結果を分散 RPC サーバーに送信し、分散 RPC サーバーはそれをクライアントにリダイレクトします。 Trident の分散 RPC クエリは、これらのクエリが並列で実行されることを除いて、通常の RPC クエリと同様に実行されます。

Trident をいつ使用するか?

多くのユースケースでは、クエリを 1 回だけ処理するという要件がある場合、Trident でトポロジを記述することでこれを実現できます。一方、Storm の場合、正確なワンショット処理を実現することは困難です。したがって、Trident は、一度に処理する必要があるユースケースに役立ちます。 Trident は、Storm に複雑さを加え、状態を管理するため、すべてのユースケース、特に高パフォーマンスのユースケースに適しているわけではありません。

Trident の動作例

前のセクションで開発した通話ログ アナライザー アプリケーションを Trident フレームワークに変換します。高度な API のおかげで、Trident アプリケーションは通常の Storm よりも簡単になります。 Storm は基本的に、Trident で Function、Filter、Aggregate、GroupBy、Join、Merge 操作のいずれかを実行する必要があります。最後に、LocalDRPC クラスを使用して DRPC サーバーを起動し、LocalDRPC クラスの実行メソッドを使用していくつかのキーワードを検索します。

通話情報のフォーマット

FormatCall クラスの目的は、「発信者番号」と「受信者番号」を含む通話情報をフォーマットすることです。完全なプログラム コードは次のとおりです。

Encoding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit クラスの目的は、「」に基づいて入力文字を分割することです。 comma(,)" 文字列を作成し、文字列内の各単語を出力します。この関数は、分散クエリの入力パラメータを解析するために使用されます。完全なコードは次のとおりです -

コーディング: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

ログ アナライザー

これがメイン アプリケーションです。最初に、アプリケーションは FeederBatchSpout を使用して TridentTopology を初期化し、呼び出し元の情報を提供します。 Trident トポロジ ストリームは、TridentTopology クラスの newStream メソッドを使用して作成できます。同様に、Trident トポロジ DRPC ストリームは、TridentTopology クラスの newDRCPStream メソッドを使用して作成できます。 LocalDRPC クラス を使用して、単純な DRCP サーバーを作成できます。 LocalDRPCいくつかのキーワードを検索するための実行メソッドがあります。完全なコードは以下のとおりです。

コーディング: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

アプリケーションの構築と実行

完全なアプリケーションには 3 つの Java コードがあります。それらは次のとおりです -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

を構築できます次のコマンドを使用します。 アプリケーション -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

次のコマンドを使用して、アプリケーションを実行できます。 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

出力

アプリケーションが起動したら、を実行すると、アプリケーションはクラスターの起動プロセス、運用プロセス、DRPC サーバーとクライアントの情報、そして最後にクラスターのシャットダウン プロセスに関する完全な詳細を出力します。この出力は、以下に示すようにコンソールに表示されます。

えええええ