Apache Storm工作實例
我們已經經歷了Apache Storm的核心技術細節,現在是時候寫一些簡單的場景。
場景- 行動呼叫日誌分析器
行動呼叫及其持續時間將作為對Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數。
Spout建立
Spout是用於資料產生的元件。基本上,一個spout將實作一個IRichSpout介面。 「IRichSpout」介面有以下重要方法 -
open -為Spout提供執行環境。執行器將運行此方法來初始化噴頭。
nextTuple -透過收集器發出產生的資料。
close -當spout將要關閉時呼叫此方法。
declareOutputFields -宣告元組的輸出模式。
ack -確認處理了特定元組。
fail -指定不處理和不重新處理特定元組。
open
open方法的簽章如下-
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 為此spout提供storm配置。
context - 提供有關拓撲中的spout位置,其任務ID,輸入和輸出資訊的完整資訊。
collector - 使我們能夠發出將由bolts處理的元組。
nextTuple
nextTuple方法的簽章如下-
nextTuple()
nextTuple()從與ack()和fail()方法相同的循環中定期調用。它必須釋放線程的控制,當沒有工作要做,以便其他方法有機會被呼叫。因此,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應該休眠至少一毫秒,以減少處理器在返回之前的負載。
close
close方法的簽章如下-
close()
declareOutputFields
declareOutputFields方法的簽章如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用於宣告輸出流id,輸出欄位等
此方法用於指定元組的輸出模式。
ack
ack方法的簽章如下-
ack(Object msgId)
該方法確認已經處理了特定元組。
fail
nextTuple方法的簽章如下-
ack(Object msgId)
此方法通知特定元組尚未完全處理。 Storm將重新處理特定的元組。
FakeCallLogReaderSpout
在我們的場景中,我們需要收集呼叫日誌詳細資訊。呼叫日誌的資訊包含。
- 主叫號碼
- 接收號碼
- 持續時間
由於我們沒有呼叫日誌的即時訊息,我們將產生假呼叫日誌。假資訊將使用Random類別建立。完整的程式碼如下。
編碼- FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
#Bolt建立
Bolt是使用元組作為輸入,處理元組,並產生新的元組作為輸出的組件。 Bolts將實作IRichBolt介面。在此程式中,使用兩個Bolts
類別CallLogCreatorBolt和CallLogCounterBolt來執行操作。
IRichBolt介面有以下方法 -
#prepare -為bolt提供執行的環境。執行器將運行此方法來初始化spout。
execute -處理單一元組的輸入
cleanup -當spout要關閉時呼叫。
declareOutputFields -宣告元組的輸出模式。
Prepare
prepare方法的簽章如下-prepare(Map conf, TopologyContext context, OutputCollector collector)
conf -為此bolt提供Storm設定。
context -提供有關拓撲中的bolt位置,其任務ID,輸入和輸出資訊等的完整資訊。
collector -使我們能夠發出處理的元組。
execute
execute方法的簽章如下-
execute(Tuple tuple)
這裡的元群組是要處理的輸入元組。
execute方法一次處理單一元組。元組資料可以透過Tuple類別的getValue方法存取。不必立即處理輸入元組。多元組可以被處理和輸出為單一輸出元組。處理的元組可以透過使用OutputCollector類別發出。
cleanup
cleanup方法的簽章如下-
cleanup()
declareOutputFields
#declareOutputFields方法的簽章如下-
declareOutputFields(OutputFieldsDeclarer declarer)
這裡的參數declarer用於宣告輸出流id,輸出欄位等。
此方法用於指定元組的輸出模式。
呼叫日誌建立者bolt
呼叫日誌建立者bolt接收呼叫日誌元群組。呼叫日誌元組具有主叫方號碼,接收方號碼和呼叫持續時間。此bolt透過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式為“來電號碼 - 接收方號碼”,並將其命名為新欄位“呼叫”。完整的程式碼如下。
編碼 - CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
#呼叫日誌計數器Bolt
呼叫日誌建立者bolt接收呼叫日誌元組。呼叫日誌元組具有主叫方號碼,接收方號碼和呼叫持續時間。此bolt透過組合主叫方號碼和接收方號碼簡單地建立一個新值。新值的格式為“來電號碼 - 接收方號碼”,並將其命名為新欄位“呼叫”。完整的程式碼如下。
編碼 - CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
建立拓樸
Storm拓樸基本上是一個Thrift結構。 TopologyBuilder類別提供了簡單而容易的方法來創建複雜的拓撲。 TopologyBuilder類別具有設定spout(setSpout)和設定bolt(setBolt)的方法。最後,TopologyBuilder有createTopology來建立拓樸。使用以下程式碼片段建立拓撲 -
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助於為spout和bolts設定流分組。
本地集群
為了開發目的,我們可以使用「LocalCluster」物件建立本地集群,然後使用「LocalCluster」類別的「submitTopology」方法提交拓撲。 「submitTopology」的參數之一是「Config」類別的實例。 “Config”類別用於在提交拓撲之前設定配置選項。此配置選項將在執行時與叢集配置合併,並使用prepare方法傳送至所有任務(spout和bolt)。一旦拓撲提交到集群,我們將等待10秒鐘,集群計算提交的拓撲,然後使用「LocalCluster」的「shutdown」方法關閉集群。完整的程式碼如下 -
編碼 - LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
建置和執行應用程式
完整的應用程式有四個Java程式碼。它們是-
- FakeCallLogReaderSpout.java
- #CallLogCreaterBolt.java
- CallLogCounterBolt.java ##LogAnalyerStorm.java
#LogAnalyerStorm.java
##應用程式可以使用以下命令構建-
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
應用程式可以使用以下命令運行- java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm輸出一旦應用程式啟動,它將輸出有關集群啟動過程,spout和螺栓處理的完整詳細信息,最後是集群關閉過程。在“CallLogCounterBolt”中,我們列印了通話及其計數詳細資訊。此資訊將顯示在控制台上如下 -
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
非JVM語言
Storm拓撲透過Thrift介面實現,這使得輕鬆地提交任何語言的拓撲。 Storm支援Ruby,Python和許多其他語言。讓我們來看看python綁定。
Python綁定
Python是一種通用的解釋,交互,物件導向和高階程式語言。 Storm支援Python實現其拓撲。 Python支援發射,錨定,acking和日誌操作。 如你所知,bolt可以用任何語言定義。用另一種語言編寫的bolt作為子進程執行,Storm透過stdin / stdout與JSON訊息進行通訊。首先拿一個支援python綁定的範例bolt WordCount。 public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
這裡的類別
實作
IRichBoltimport storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()