Apache Storm工作實例


我們已經經歷了Apache Storm的核心技術細節,現在是時候寫一些簡單的場景。

場景- 行動呼叫日誌分析器

行動呼叫及其持續時間將作為對Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數。

Spout建立

Spout是用於資料產生的元件。基本上,一個spout將實作一個IRichSpout介面。 「IRichSpout」介面有以下重要方法 -

  • open -為Spout提供執行環境。執行器將運行此方法來初始化噴頭。

  • nextTuple -透過收集器發出產生的資料。

  • close -當spout將要關閉時呼叫此方法。

  • declareOutputFields -宣告元組的輸出模式。

  • ack -確認處理了特定元組。

  • fail -指定不處理和不重新處理特定元組。

open

open方法的簽章如下-

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 為此spout提供storm配置。

  • context - 提供有關拓撲中的spout位置,其任務ID,輸入和輸出資訊的完整資訊。

  • collector - 使我們能夠發出將由bolts處理的元組。

nextTuple

nextTuple方法的簽章如下-

nextTuple()

nextTuple()從與ack()和fail()方法相同的循環中定期調用。它必須釋放線程的控制,當沒有工作要做,以便其他方法有機會被呼叫。因此,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應該休眠至少一毫秒,以減少處理器在返回之前的負載。

close

close方法的簽章如下-

close()

declareOutputFields

declareOutputFields方法的簽章如下-

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用於宣告輸出流id,輸出欄位等

此方法用於指定元組的輸出模式。

ack

ack方法的簽章如下-

ack(Object msgId)

該方法確認已經處理了特定元組。

fail

nextTuple方法的簽章如下-

ack(Object msgId)

此方法通知特定元組尚未完全處理。 Storm將重新處理特定的元組。

FakeCallLogReaderSpout

在我們的場景中,我們需要收集呼叫日誌詳細資訊。呼叫日誌的資訊包含。

  • 主叫號碼
  • 接收號碼
  • 持續時間

由於我們沒有呼叫日誌的即時訊息,我們將產生假呼叫日誌。假資訊將使用Random類別建立。完整的程式碼如下。

編碼-  FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

#Bolt建立

Bolt是使用元組作為輸入,處理元組,並產生新的元組作為輸出的組件。 Bolts將實作IRichBolt介面。在此程式中,使用兩個Bolts
類別CallLogCreatorBoltCallLogCounterBolt來執行操作。

IRichBolt介面有以下方法 -

  • #prepare -為bolt提供執行的環境。執行器將運行此方法來初始化spout。

  • execute -處理單一元組的輸入

  • cleanup -當spout要關閉時呼叫。

  • declareOutputFields -宣告元組的輸出模式。

Prepare

prepare方法的簽章如下-


prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -為此bolt提供Storm設定。

  • context -提供有關拓撲中的bolt位置,其任務ID,輸入和輸出資訊等的完整資訊。

  • collector -使我們能夠發出處理的元組。

execute

execute方法的簽章如下-

execute(Tuple tuple)

這裡的元群組是要處理的輸入元組。

execute方法一次處理單一元組。元組資料可以透過Tuple類別的getValue方法存取。不必立即處理輸入元組。多元組可以被處理和輸出為單一輸出元組。處理的元組可以透過使用OutputCollector類別發出。

cleanup

cleanup方法的簽章如下-

cleanup()

declareOutputFields

#declareOutputFields方法的簽章如下-

declareOutputFields(OutputFieldsDeclarer declarer)

這裡的參數declarer用於宣告輸出流id,輸出欄位等。

此方法用於指定元組的輸出模式

呼叫日誌建立者bolt

呼叫日誌建立者bolt接收呼叫日誌元群組。呼叫日誌元組具有主叫方號碼,接收方號碼和呼叫持續時間。此bolt透過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式為“來電號碼 - 接收方號碼”,並將其命名為新欄位“呼叫”。完整的程式碼如下。

編碼 -  CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

#呼叫日誌計數器Bolt

呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具有主叫方號碼,接收方號碼和呼叫持續時間。此bolt透過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式為“來電號碼 - 接收方號碼”,並將其命名為新欄位“呼叫”。完整的程式碼如下。

編碼 -  CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

建立拓樸

Storm拓樸基本上是一個Thrift結構。 TopologyBuilder類別提供了簡單而容易的方法來創建複雜的拓撲。 TopologyBuilder類別具有設定spout(setSpout)和設定bolt(setBolt)的方法。最後,TopologyBuilder有createTopology來建立拓樸。使用以下程式碼片段建立拓撲 -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping方法有助於為spout和bolts設定流分組。

本地集群

為了開發目的,我們可以使用「LocalCluster」物件建立本地集群,然後使用「LocalCluster」類別的「submitTopology」方法提交拓撲。 「submitTopology」的參數之一是「Config」類別的實例。 “Config”類別用於在提交拓撲之前設定配置選項。此配置選項將在執行時與叢集配置合併,並使用prepare方法傳送至所有任務(spout和bolt)。一旦拓撲提交到集群,我們將等待10秒鐘,集群計算提交的拓撲,然後使用「LocalCluster」的「shutdown」方法關閉集群。完整的程式碼如下 -

編碼 -  LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

建置和執行應用程式

完整的應用程式有四個Java程式碼。它們是-

  • FakeCallLogReaderSpout.java
  • #CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • ##LogAnalyerStorm.java


#LogAnalyerStorm.java


##應用程式可以使用以下命令構建-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

應用程式可以使用以下命令運行-


java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

輸出

一旦應用程式啟動,它將輸出有關集群啟動過程,spout和螺栓處理的完整詳細信息,最後是集群關閉過程。在“CallLogCounterBolt”中,我們列印了通話及其計數詳細資訊。此資訊將顯示在控制台上如下 -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM語言

Storm拓撲透過Thrift介面實現,這使得輕鬆地提交任何語言的拓撲。 Storm支援Ruby,Python和許多其他語言。讓我們來看看python綁定。

Python綁定

Python是一種通用的解釋,交互,物件導向和高階程式語言。 Storm支援Python實現其拓撲。 Python支援發射,錨定,acking和日誌操作。 如你所知,bolt可以用任何語言定義。用另一種語言編寫的bolt作為子進程執行,Storm透過stdin / stdout與JSON訊息進行通訊。首先拿一個支援python綁定的範例bolt WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}
這裡的類別

WordCount

實作

IRichBolt
介面和運行與python實作指定超級方法參數「splitword.py」。現在建立一個名為「splitword.py」的python實作。
import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()
這是Python的範例實現,它計算給定句子中的單字。同樣,您也可以與其他支援語言綁定。
#############