Apache Storm の動作例


Apache Storm の中核となる技術的な詳細を説明しました。次は、いくつかの簡単なシナリオを作成します。

シナリオ - モバイル通話ログ アナライザー

モバイル通話とその継続時間は Apache Storm への入力として使用され、Storm は同じ発信者と受信者間の通話とその合計通話数を処理およびグループ化します。 。

スパウトの作成

スパウトはデータ生成に使用されるコンポーネントです。基本的に、スパウトは IRichSpout インターフェイスを実装します。 「IRichSpout」インターフェイスには次の重要なメソッドがあります -

  • open - Spout の実行環境を提供します。エグゼキュータはこのメソッドを実行してスプリンクラー ヘッドを初期化します。

  • nextTuple - 生成されたデータをコレクターを通じて出力します。

  • #close - このメソッドは、注ぎ口が閉じようとしているときに呼び出されます。

  • declareOutputFields - タプルの出力モードを宣言します。

  • #ack - 特定のタプルが処理されたことを確認します。

  • #fail - 特定のタプルを処理および再処理しないことを指定します。

#open


open メソッドのシグネチャは次のとおりです -


open(Map conf, TopologyContext context, SpoutOutputCollector collector)

  • conf - このスパウトのストーム構成を提供します。

  • context - トポロジ内のスパウトの位置、そのタスク ID、入力および出力情報に関する完全な情報を提供します。

  • コレクター - ボルトによって処理されるタプルを出力できるようにします。

nextTuple

nextTuple メソッドのシグネチャは次のとおりです -


nextTuple()

nextTuple() は ack() と同じで始まり、 failed() メソッド ループ内で定期的に呼び出されます。実行する作業がない場合は、他のメソッドを呼び出せるようにスレッドの制御を解放する必要があります。したがって、nextTuple の最初の行では、処理が完了したかどうかを確認します。その場合、プロセッサの負荷を軽減するために、復帰する前に少なくとも 1 ミリ秒スリープする必要があります。

#close

close

メソッドのシグネチャは次のとおりです-

close()
declareOutputFields

declareOutputFields

メソッドのシグネチャは次のとおりです -

declareOutputFields(OutputFieldsDeclarer declarer)

declaler

-出力ストリーム ID、出力フィールドなどを宣言するために使用されます。 このメソッドはタプルの出力モードを指定するために使用されます。

ack

ack

メソッドの署名は次のとおりです -

ack(Object msgId)

このメソッドは、特定のタプルが処理されました。

fail

nextTuple

メソッドのシグネチャは次のとおりです -

ack(Object msgId)
このメソッドは、特定のタプルが完全に処理されていません。 Storm は特定のタプルを再処理します。

FakeCallLogReaderSpout

このシナリオでは、通話ログの詳細を収集する必要があります。通話履歴情報が含まれます。

発信番号
  • 受信番号

  • 通話時間
  • 通話履歴のリアルタイム情報がないため、 、偽の通話ログが生成されます。偽の情報はRandomクラスを使用して作成されます。完全なプログラムコードは次のとおりです。

コーディング - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

ボルトの作成

ボルトは、タプルを入力として使用し、タプルを処理し、新しいタプルを出力として生成する関数です。成分。ボルトは IRichBolt インターフェイスを実装します。このプログラムでは、2 つのボルト
クラス CallLogCreatorBoltCallLogCounterBolt を使用して操作を実行します。

IRichBolt インターフェイスには次のメソッドがあります。 -

  • prepare - ボルトを実行するための環境を提供します。エグゼキューターはこのメソッドを実行してスパウトを初期化します。

  • execute - 単一タプルの入力を処理します

  • cleanup - 注ぎ口を閉じるときに呼び出されます。

  • declareOutputFields - タプルの出力モードを宣言します。

Prepare

prepare メソッドのシグネチャは次のとおりです -


prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - このボルトの Storm 構成を提供します。

  • context - トポロジ内のボルトの位置、タスク ID、入力および出力情報などに関する完全な情報を提供します。

  • #コレクター - 処理されたタプルを出力できるようにします。

execute

executeメソッドのシグネチャは次のとおりです -

execute(Tuple tuple)

#ここの ## 要素 グループ は、処理される入力タプルです。

execute メソッドは、一度に 1 つのタプルを処理します。タプル データには、Tuple クラスの getValue メソッドを通じてアクセスできます。入力タプルはすぐに処理する必要はありません。タプルは、単一の出力タプルとして処理および出力できます。処理されたタプルは、OutputCollector クラスを使用して出力できます。

#cleanup

cleanup

メソッドのシグネチャは次のとおりです。

cleanup()

declareOutputFields

declareOutputFields

メソッドのシグネチャは次のとおりです -

declareOutputFields(OutputFieldsDeclarer declarer)
ここのパラメータ

declaler

は、出力ストリーム ID、出力フィールドなどを宣言するために使用されます。 このメソッドは、タプルの出力モード

を指定するために使用されます。

通話ログ クリエイター ボルト

通話ログ クリエイター ボルトは、通話ログ タプルを受け取ります。通話ログ タプルには、発信者番号、受信者番号、および通話時間が含まれます。このボルトは、発信側番号と受信側番号を組み合わせて新しい値を作成するだけです。新しい値の形式は「発信者番号 - 受信者番号」であり、新しいフィールドの名前は「Call」です。完全なコードは以下のとおりです。

エンコーディング - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

通話ログ カウンター ボルト

通話ログ作成者ボルトは通話ログ タプルを受け取ります。通話ログ タプルには、発信者番号、受信者番号、および通話時間が含まれます。このボルトは、発信側番号と受信側番号を組み合わせて新しい値を作成するだけです。新しい値の形式は「発信者番号 - 受信者番号」であり、新しいフィールドの名前は「Call」です。完全なコードは以下のとおりです。

コーディング - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

トポロジの作成

Storm トポロジは基本的に Thrift 構造です。 TopologyBuilder クラスは、複雑なトポロジを作成するためのシンプルかつ簡単な方法を提供します。 TopologyBuilder クラスには、スパウト (setSpout) を設定するメソッドとボルト (setBolt) を設定するメソッドがあります。最後に、TopologyBuilder には、トポロジを作成するための createTopology があります。次のコード スニペットを使用してトポロジを作成します。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping メソッドと fieldsGrouping メソッドは、スパウトとボルトのフロー グループ化の設定に役立ちます。

ローカル クラスター

開発目的では、「LocalCluster」オブジェクトを使用してローカル クラスターを作成し、「LocalCluster」クラスの「submitTopology」メソッドを使用してトポロジを送信できます。 「submitTopology」のパラメータの 1 つは、「Config」クラスのインスタンスです。 「Config」クラスは、トポロジを送信する前に構成オプションを設定するために使用されます。この構成オプションは実行時にクラスター構成とマージされ、prepare メソッドを使用してすべてのタスク (スパウトとボルト) に送信されます。トポロジがクラスターに送信されたら、クラスターが送信されたトポロジを計算するまで 10 秒待ってから、「LocalCluster」の「シャットダウン」メソッドを使用してクラスターをシャットダウンします。完全なプログラム コードは次のとおりです。 -

コーディング - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには 4 つの Java コードがあります。それらは -

    #FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java
アプリケーションは次のコマンドを使用して構築できます -


javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

アプリケーションは次のコマンドを使用して実行できます -


java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

出力

アプリケーションが起動すると、クラスターの起動プロセス、スパウトとボルトの処理、そして最後にクラスターのシャットダウン プロセスに関する完全な詳細が出力されます。 「CallLogCounterBolt」では、通話とそのカウントの詳細を出力します。この情報は、次のようにコンソールに表示されます。


1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非 JVM 言語

Storm トポロジは Thrift インターフェイスを介して実装されているため、任意の言語でトポロジを簡単に送信できます。 。 Storm は Ruby、Python、その他多くの言語をサポートしています。 Pythonのバインディングを見てみましょう。

Python バインディング

Python は、汎用の解釈型、対話型、オブジェクト指向の高レベル プログラミング言語です。 Storm は、トポロジを実装するために Python をサポートしています。 Python は、起動、アンカー、確認、ロギングの操作をサポートします。

ご存知のとおり、ボルトはどの言語でも定義できます。別の言語で書かれたボルトはサブプロセスとして実行され、Storm は stdin/stdout 経由で JSON メッセージと通信します。まず、Python バインディングをサポートするサンプル ボルト WordCount を取得します。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

ここのクラス

WordCountIRichBolt インターフェイスを実装し、スーパー メソッド パラメーター "splitword.py" を指定する Python 実装で実行されます。次に、「splitword.py」という Python 実装を作成します。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

これは、指定された文内の単語をカウントする Python でのサンプル実装です。同様に、サポートされている他の言語にもバインドできます。