Contoh kerja Apache Storm
Kami telah meneliti butiran teknikal teras Apache Storm, kini tiba masanya untuk menulis beberapa senario mudah.
Senario - Penganalisis Log Panggilan Mudah Alih
Panggilan mudah alih dan tempohnya akan diambil sebagai input kepada Apache Storm, Storm akan memproses dan mengumpulkan panggilan antara pemanggil dan penerima yang sama berserta dengan jumlah bilangan panggilan mereka.
Spout Creation
Spout ialah komponen yang digunakan untuk penjanaan data. Pada asasnya, muncung akan melaksanakan antara muka IRichSpout. Antara muka "IRichSpout" mempunyai kaedah penting berikut -
open - untuk menyediakan persekitaran pelaksanaan untuk Spout. Pelaksana akan menjalankan kaedah ini untuk memulakan kepala pemercik.
nextTuple - Memancarkan data yang dijana melalui pengumpul.
tutup - Kaedah ini dipanggil apabila muncung hampir menutup.
declareOutputFields - Isytiharkan mod output tupel.
ack - Sahkan bahawa tuple tertentu telah diproses.
gagal - Menentukan untuk tidak memproses dan tidak memproses semula tupel tertentu. Tandatangan kaedah
open
open adalah seperti berikut -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - menyediakan konfigurasi ribut untuk muncung ini.
konteks - Menyediakan maklumat lengkap tentang lokasi muncung dalam topologi, ID tugasnya, maklumat input dan output.
pengumpul - membolehkan kami mengeluarkan tuple yang akan diproses oleh bolt. Tandatangan kaedah
nextTuple
nextTuple adalah seperti berikut -
nextTuple()
nextTuple() dipanggil secara berkala daripada gelung yang sama seperti kaedah ack() dan fail(). Ia mesti melepaskan kawalan benang apabila tiada kerja yang perlu dilakukan supaya kaedah lain mempunyai peluang untuk dipanggil. Oleh itu, baris pertama nextTuple menyemak sama ada pemprosesan telah selesai. Jika ya, ia harus tidur selama sekurang-kurangnya satu milisaat untuk mengurangkan beban pada pemproses sebelum kembali. Tandatangan kaedah
close
close adalah seperti berikut-
close()
declareOutputFields
declareOutputFields adalah sebagai tandatangan
reees- Ia digunakan untuk mengisytiharkan output id aliran, medan output, dsb. Kaedah ini digunakan untuk menentukan mod output tupel. Tandatangan kaedah
ackack
adalah seperti berikut -
declareOutputFields(OutputFieldsDeclarer declarer)
Kaedah ini mengesahkan bahawa tuple tertentu telah diproses. Tandatangan kaedah
gagal
nextTuple adalah seperti berikut - ack(Object msgId)
Kaedah ini memberitahu bahawa tupel tertentu belum diproses sepenuhnya. Ribut akan memproses semula tupel tertentu.
FakeCallLogReaderSpout
Dalam senario kami, kami perlu mengumpul butiran log panggilan. Maklumat log panggilan disertakan.
Nombor Panggilan- Nombor Penerimaan
Tempoh Memandangkan kami tidak mempunyai maklumat masa nyata log panggilan, kami akan menjana log panggilan palsu. Maklumat palsu akan dibuat menggunakan kelas Rawak. Kod program lengkap adalah seperti berikut.
Pengekodan - FakeCallLogReaderSpout.java
ack(Object msgId)
Penciptaan bolt
Bolt ialah komponen yang mengambil tupel sebagai input, memproses tupel dan menghasilkan tupel baharu sebagai output. Bolt akan melaksanakan antara muka IRichBolt. Dalam program ini, dua kelas Bolts
CallLogCreatorBolt dan CallLogCounterBolt digunakan untuk melaksanakan operasi.
Antara muka IRichBolt mempunyai kaedah berikut -
prepare - Menyediakan persekitaran untuk bolt dilaksanakan. Pelaksana akan menjalankan kaedah ini untuk memulakan muncung.
execute - mengendalikan input satu tuple
cleanup - dipanggil apabila muncung hampir ditutup.
declareOutputFields - Isytiharkan mod output tupel. Tandatangan kaedah
Prepare
prepare adalah seperti berikut -import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
conf.conf untuk ini - menyediakan konfigurasi bolt.
konteks - Menyediakan maklumat lengkap tentang lokasi bolt dalam topologi, ID tugasnya, maklumat input dan output, dsb.
pengumpul - Membolehkan kami mengeluarkan tupel yang diproses. Tandatangan kaedah
execute
execute adalah seperti berikut -
prepare(Map conf, TopologyContext context, OutputCollector collector)
di mana tuple ialah tuple input untuk diproses. Kaedah
execute memproses satu tupel pada satu masa. Data Tuple boleh diakses melalui kaedah getValue kelas Tuple. Tuple input tidak perlu diproses dengan segera. Tuple boleh diproses dan dikeluarkan sebagai tuple keluaran tunggal. Tuple yang diproses boleh dipancarkan dengan menggunakan kelas OutputCollector. Tandatangan kaedah
pembersihan
pembersihan adalah seperti berikut -
execute(Tuple tuple)
isytiharOutputFields
isytiharnya ialahtandatangan -
cleanup()Parameter di sini pengisytihar
digunakan untuk mengisytiharkan id Strim keluaran, medan keluaran, dsb. Kaedah ini digunakan untuk menentukan mod keluaran tupel
.Bolt pencipta log panggilan
Bolt pencipta log panggilan menerima tuple log panggilan. Tuple log panggilan mempunyai nombor pihak yang memanggil, nombor pihak yang menerima dan tempoh panggilan. Bolt ini hanya mencipta nilai baharu dengan menggabungkan nombor pihak yang memanggil dan nombor pihak yang menerima. Nilai baharu mempunyai format "Nombor Panggilan - Nombor Penerima" dan menamakan medan baharu "Panggilan". Kod lengkap ada di bawah.
Pengekodan - CallLogCreatorBolt.java
declareOutputFields(OutputFieldsDeclarer declarer)
Bolt Kaunter Log Panggilan
Bolt Pencipta Log Panggilan menerima tuple log panggilan. Tuple log panggilan mempunyai nombor pihak yang memanggil, nombor pihak yang menerima dan tempoh panggilan. Bolt ini hanya mencipta nilai baharu dengan menggabungkan nombor pihak yang memanggil dan nombor pihak yang menerima. Nilai baharu mempunyai format "Nombor Panggilan - Nombor Penerima" dan menamakan medan baharu "Panggilan". Kod lengkap ada di bawah.
Pengekodan - CallLogCounterBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Mencipta topologi
Topologi Ribut pada asasnya ialah struktur Jimat. Kelas TopologyBuilder menyediakan cara yang mudah dan mudah untuk mencipta topologi yang kompleks. Kelas TopologyBuilder mempunyai kaedah untuk menetapkan spout (setSpout) dan set bolt (setBolt) . Akhir sekali, TopologyBuilder mempunyai createTopology untuk mencipta topologi. Gunakan coretan kod berikut untuk mencipta topologi - kaedah
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
shuffleGrouping dan fieldsGrouping membantu menyediakan kumpulan aliran untuk muncung dan bolt.
Kluster Tempatan
Untuk tujuan pembangunan, kita boleh mencipta kluster tempatan menggunakan objek "LocalCluster" dan kemudian menyerahkan topologi menggunakan kaedah "submitTopology" kelas "LocalCluster". Salah satu parameter "submitTopology" ialah contoh kelas "Config". Kelas "Config" digunakan untuk menetapkan pilihan konfigurasi sebelum menyerahkan topologi. Pilihan konfigurasi ini akan digabungkan dengan konfigurasi kluster semasa runtime dan dihantar ke semua tugasan (spouts dan bolt) menggunakan kaedah penyediaan. Setelah topologi diserahkan kepada kluster, kami menunggu 10 saat untuk kluster mengira topologi yang diserahkan dan kemudian menutup kluster menggunakan kaedah "penutupan" "LocalCluster". Kod program lengkap adalah seperti berikut -
Pengekodan - LogAnalyserStorm.java
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
Bina dan jalankan aplikasi
Aplikasi lengkap mempunyai empat kod Java. Mereka adalah-
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java yang berikut boleh dibina
reee
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }OutputSetelah aplikasi bermula, ia akan mengeluarkan butiran lengkap tentang proses permulaan kluster, pengendalian muncung dan bolt, dan akhirnya proses penutupan kluster. Dalam "CallLogCounterBolt" kami mencetak panggilan dan butiran kiraannya. Maklumat ini akan dipaparkan pada konsol seperti berikut -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.javaBahasa bukan JVM Topologi Storm dilaksanakan melalui antara muka Thrift, yang memudahkan untuk menyerahkan topologi dalam mana-mana bahasa. Storm menyokong Ruby, Python dan banyak bahasa lain. Mari kita lihat ikatan ular sawa. Python BindingsPython ialah bahasa pengaturcaraan yang ditafsirkan untuk tujuan umum, interaktif, berorientasikan objek dan peringkat tinggi. Storm menyokong Python untuk melaksanakan topologinya. Python menyokong operasi penembakan, penambat, acking dan pembalakan. Seperti yang anda tahu, bolt boleh ditakrifkan dalam mana-mana bahasa. Bolt yang ditulis dalam bahasa lain dilaksanakan sebagai subproses dan Storm berkomunikasi dengan mesej JSON melalui stdin/stdout. Mula-mula ambil contoh bolt WordCount yang menyokong pengikatan python.
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStormKelas di sini
WordCountmelaksanakan antara muka IRichBolt dan berjalan dengan pelaksanaan python yang menyatakan parameter kaedah super "splitword.py". Sekarang buat pelaksanaan python yang dipanggil "splitword.py".
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93Ini adalah contoh pelaksanaan dalam Python yang mengira perkataan dalam ayat yang diberikan. Begitu juga, anda boleh terikat dengan bahasa lain yang disokong juga.