阿帕契風暴三叉戟


Trident是Storm的延伸。像Storm,Trident也是Twitter開發的。開發Trident的主要原因是在Storm上提供高階抽象,以及狀態流程處理和低延遲分散式查詢。

Trident使用spout和bolt,但這些低階元件在執行前會由Trident自動產生。 Trident具有函數,過濾器,聯接,分組和聚合。

Trident將流處理為一系列批次,稱為事務。通常,這些小批量的大小將是大約數千或數百萬個元組,這取決於輸入流。這樣,Trident不同於Storm,它執行元組一元組處理。

批次概念非常類似於資料庫事務。每個事務都分配了一個事務ID。該事務被認為是成功的,一旦其所有的處理完成。然而,處理事務的元組中的一個的失敗將導致整個事務被重傳。對於每個批次,Trident將在事務開始時呼叫beginCommit,並在結束時提交。

Trident拓樸

Trident API公開了一個簡單的選項,使用「TridentTopology」類別建立Trident拓樸。基本上,Trident拓撲從流出接收輸入流,並對流上執行有序的操作序列(濾波,聚合,分組等)。 Storm元組被替換為Trident元組,bolt被操作取代。一個簡單的Trident拓樸可以建立如下 -

TridentTopology topology = new TridentTopology();

Trident Tuples

#Trident Tuples是一個命名的值清單。 TridentTuple介面是Trident拓樸的資料模型。 TridentTuple介面是可由Trident拓樸處理的資料的基本單位。

Trident Spout

Trident spout與類似Storm spout,附加選項使用Trident的功能。實際上,我們仍然可以使用IRichSpout,我們在Storm拓撲中使用它,但它本質上是非事務性的,我們將無法使用Trident提供的優點。

具有使用Trident的特徵的所有功能的基本spout是「ITridentSpout」。它支援事務和不透明的事務語義。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。

除了這些通用spouts,Trident有許多樣本實施trident spout。其中之一是FeederBatchSpout輸出,我們可以使用它來發送trident tuples的命名列表,而不必擔心批次處理,並行性等。

FeederBatchSpout建立和資料饋送可以如下所示完成 -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident運算

Trident依賴「Trident作業」來處理trident tuples的輸入流。 Trident API具有多個內建操作來處理簡單到複雜的流處理。這些操作的範圍從簡單驗證到複雜的trident tuples分組和聚合。讓我們經歷最重要且經常使用的操作。

過濾

過濾器是用來執行輸入驗證任務的物件。 Trident過濾器取得trident tuples欄位的子集作為輸入,並根據是否滿足某些條件傳回真或假。如果傳回true,則該元組會儲存在輸出流中;否則,從流中移除元組。過濾器基本上將繼承自BaseFilter類別並實作isKeep方法。這裡是一個濾波器操作的範例實作 -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用「each」方法在拓撲中呼叫過濾器功能。 「Fields」類別可以用來指定輸入(trident tuple的子集)。範例程式碼如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

函數

函數是用來對單一trident tuple執行簡單操作的物件。它需要一個trident tuple字段的子集,並發出零個或多個新的trident tuple字段。

函數基本上從BaseFunction類別繼承並實作execute方法。下面給出了一個範例實作:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

與過濾操作類似,可以使用每個方法在拓撲中呼叫函數操作。範例程式碼如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

#聚合是用於對輸入批次或分割區或流執行聚合操作的物件。 Trident有三種類型的聚集。他們如下 -

  • aggregate -單獨聚合每批trident tuple。在聚合過程期間,首先使用全域分組將元組重新分區,以將同一批次的所有分區組合到單一分區中。

  • partitionAggregate -聚合每個分區,而不是整個trident tuple。分區集合的輸出完全替換輸入元組。分區集合的輸出包含單一字段元組。

  • persistentaggregate -聚合所有批次中的所有trident tuple,並將結果儲存在記憶體或資料庫中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator,ReducerAggregator或通用Aggregator介面建立聚合操作。上面範例中使用的「計數」聚合器是內建聚合器之一,它使用「CombinerAggregator」實現,實現如下-

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分組

分組運算是一個內建操作,可以由groupBy方法呼叫。 groupBy方法透過在指定欄位上執行partitionBy來重新分區流,然後在每個分區中,它將群組欄位相等的元組組合在一起。通常,我們使用“groupBy”以及“persistentAggregate”來獲得分組聚合。範例程式碼如下 -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合併和連接

合併和連接可以分別透過使用「合併」和「連接」方法來完成。合併組合一個或多個流。加入類似於合併,除了加入使用來自兩邊的trident tuple字段來檢查和連接兩個流的事實。此外,加入將只在批量級別工作。範例程式碼如下 -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

狀態維護

Trident提供了狀態維護的機制。狀態資訊可以儲存在拓撲本身中,否則也可以將其儲存在單獨的資料庫中。原因是維護一個狀態,如果任何元組在處理過程中失敗,則重試失敗的元組。這會在更新狀態時產生問題,因為您不確定此元組的狀態是否已在先前更新。如果在更新狀態之前元組已經失敗,則重試該元組將使狀態穩定。然而,如果元組在更新狀態後失敗,則重試相同的元組將再次增加資料庫中的計數並使狀態不穩定。需要執行以下步驟以確保訊息僅處理一次 -

  • 小批量處理元組。

  • 為每個批次分配唯一的ID。如果重試批次,則給予相同的唯一ID。

  • 狀態更新在批次之間排序。例如,第二批次的狀態更新將不可能,直到第一批次的狀態更新完成為止。

分散式RPC

分散式RPC用於查詢和檢索Trident拓樸結果。 Storm有一個內建的分散式RPC伺服器。分散式RPC伺服器從客戶端接收RPC請求並將其傳遞到拓撲。拓撲處理請求並將結果傳送到分散式RPC伺服器,分散式RPC伺服器將其重定向到客戶端。 Trident的分散式RPC查詢像正常的RPC查詢一樣執行,除了這些查詢並行運行的事實。

什麼時候使用Trident?

在許多使用情況下,如果要求是只處理一次查詢,我們可以透過在Trident中編寫拓撲來實現。另一方面,在Storm的情況下將難以實現精確的一次處理。因此,Trident將對那些需要一次處理的用例有用。 Trident不適用於所有用例,特別是高效能用例,因為它增加了Storm的複雜度並管理狀態。

Trident的工作實例

我們將把上一節中製定的呼叫日誌分析器應用程式轉換為Trident框架。由於其高級API,Trident應用程式將比普通風暴更容易。 Storm基本上需要執行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge作業中的任何一個。最後,我們將使用LocalDRPC類別啟動DRPC伺服器,並使用LocalDRPC類別的execute方法搜尋一些關鍵字。

格式化通話資訊

FormatCall類別的目的是格式化包含「來電者號碼」和「接收者號碼」的通話資訊。完整的程式碼如下-

編碼:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit類別的目的是基於“comma(,)”拆分輸入字符串,並發出字串中的每個字。此函數用於解析分散式查詢的輸入參數。完整的程式碼如下 -

編碼:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日誌分析器

這是主要的應用程式。最初,應用程式將使用FeederBatchSpout初始化TridentTopology並提供呼叫者資訊。 Trident拓樸流可以使用TridentTopology類別的newStream方法建立。類似地,Trident拓樸DRPC流可以使用TridentTopology類別的newDRCPStream方法建立。可以使用LocalDRPC類別建立一個簡單的DRCP伺服器LocalDRPC有execute方法來搜尋一些關鍵字。完整的程式碼如下。

編碼:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

建置和執行應用程式

完整的應用程式有三個Java程式碼。他們如下-

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下指令來建構應用程式-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令運行應用程式-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

輸出

一旦應用程式啟動,應用程式將輸出有關叢集啟動過程,操作處理,DRPC伺服器和客戶端資訊的完整詳細信息,以及最後的叢集關閉過程。此輸出將顯示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends