Contoh kerja Apache Storm


Kami telah meneliti butiran teknikal teras Apache Storm, kini tiba masanya untuk menulis beberapa senario mudah.

Senario - Penganalisis Log Panggilan Mudah Alih

Panggilan mudah alih dan tempohnya akan diambil sebagai input kepada Apache Storm, Storm akan memproses dan mengumpulkan panggilan antara pemanggil dan penerima yang sama berserta dengan jumlah bilangan panggilan mereka.

Spout Creation

Spout ialah komponen yang digunakan untuk penjanaan data. Pada asasnya, muncung akan melaksanakan antara muka IRichSpout. Antara muka "IRichSpout" mempunyai kaedah penting berikut -

  • open - untuk menyediakan persekitaran pelaksanaan untuk Spout. Pelaksana akan menjalankan kaedah ini untuk memulakan kepala pemercik.

  • nextTuple - Memancarkan data yang dijana melalui pengumpul.

  • tutup - Kaedah ini dipanggil apabila muncung hampir menutup.

  • declareOutputFields - Isytiharkan mod output tupel.

  • ack - Sahkan bahawa tuple tertentu telah diproses.

  • gagal - Menentukan untuk tidak memproses dan tidak memproses semula tupel tertentu. Tandatangan kaedah

open

open adalah seperti berikut -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - menyediakan konfigurasi ribut untuk muncung ini.

  • konteks - Menyediakan maklumat lengkap tentang lokasi muncung dalam topologi, ID tugasnya, maklumat input dan output.

  • pengumpul - membolehkan kami mengeluarkan tuple yang akan diproses oleh bolt. Tandatangan kaedah

nextTuple

nextTuple adalah seperti berikut -

nextTuple()

nextTuple() dipanggil secara berkala daripada gelung yang sama seperti kaedah ack() dan fail(). Ia mesti melepaskan kawalan benang apabila tiada kerja yang perlu dilakukan supaya kaedah lain mempunyai peluang untuk dipanggil. Oleh itu, baris pertama nextTuple menyemak sama ada pemprosesan telah selesai. Jika ya, ia harus tidur selama sekurang-kurangnya satu milisaat untuk mengurangkan beban pada pemproses sebelum kembali. Tandatangan kaedah

close

close adalah seperti berikut-

close()

declareOutputFields

declareOutputFields adalah sebagai tandatangan

reees

- Ia digunakan untuk mengisytiharkan output id aliran, medan output, dsb. Kaedah ini digunakan untuk menentukan mod output tupel. Tandatangan kaedah

ack

ack

adalah seperti berikut -

declareOutputFields(OutputFieldsDeclarer declarer)

Kaedah ini mengesahkan bahawa tuple tertentu telah diproses. Tandatangan kaedah

gagal

nextTuple

adalah seperti berikut -

ack(Object msgId)
Kaedah ini memberitahu bahawa tupel tertentu belum diproses sepenuhnya. Ribut akan memproses semula tupel tertentu.

FakeCallLogReaderSpout

Dalam senario kami, kami perlu mengumpul butiran log panggilan. Maklumat log panggilan disertakan.

Nombor Panggilan
  • Nombor Penerimaan

  • Tempoh
  • Memandangkan kami tidak mempunyai maklumat masa nyata log panggilan, kami akan menjana log panggilan palsu. Maklumat palsu akan dibuat menggunakan kelas Rawak. Kod program lengkap adalah seperti berikut.

Pengekodan - FakeCallLogReaderSpout.java

ack(Object msgId)

Penciptaan bolt

Bolt ialah komponen yang mengambil tupel sebagai input, memproses tupel dan menghasilkan tupel baharu sebagai output. Bolt akan melaksanakan antara muka IRichBolt. Dalam program ini, dua kelas Bolts
CallLogCreatorBolt dan CallLogCounterBolt digunakan untuk melaksanakan operasi.

Antara muka IRichBolt mempunyai kaedah berikut -

  • prepare - Menyediakan persekitaran untuk bolt dilaksanakan. Pelaksana akan menjalankan kaedah ini untuk memulakan muncung.

  • execute - mengendalikan input satu tuple

  • cleanup - dipanggil apabila muncung hampir ditutup.

  • declareOutputFields - Isytiharkan mod output tupel. Tandatangan kaedah

Prepare

prepare adalah seperti berikut -


import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}
  • conf.conf untuk ini - menyediakan konfigurasi bolt.

  • konteks - Menyediakan maklumat lengkap tentang lokasi bolt dalam topologi, ID tugasnya, maklumat input dan output, dsb.

  • pengumpul - Membolehkan kami mengeluarkan tupel yang diproses. Tandatangan kaedah

execute

execute adalah seperti berikut -

prepare(Map conf, TopologyContext context, OutputCollector collector)

di mana tuple ialah tuple input untuk diproses. Kaedah

execute memproses satu tupel pada satu masa. Data Tuple boleh diakses melalui kaedah getValue kelas Tuple. Tuple input tidak perlu diproses dengan segera. Tuple boleh diproses dan dikeluarkan sebagai tuple keluaran tunggal. Tuple yang diproses boleh dipancarkan dengan menggunakan kelas OutputCollector. Tandatangan kaedah

pembersihan

pembersihan adalah seperti berikut -

execute(Tuple tuple)

isytiharOutputFields

isytiharnya ialahtandatangan -

cleanup()
Parameter di sini

pengisytihar

digunakan untuk mengisytiharkan id Strim keluaran, medan keluaran, dsb. Kaedah ini digunakan untuk menentukan mod keluaran tupel

.

Bolt pencipta log panggilan

Bolt pencipta log panggilan menerima tuple log panggilan. Tuple log panggilan mempunyai nombor pihak yang memanggil, nombor pihak yang menerima dan tempoh panggilan. Bolt ini hanya mencipta nilai baharu dengan menggabungkan nombor pihak yang memanggil dan nombor pihak yang menerima. Nilai baharu mempunyai format "Nombor Panggilan - Nombor Penerima" dan menamakan medan baharu "Panggilan". Kod lengkap ada di bawah.

Pengekodan - CallLogCreatorBolt.java

declareOutputFields(OutputFieldsDeclarer declarer)

Bolt Kaunter Log Panggilan

Bolt Pencipta Log Panggilan menerima tuple log panggilan. Tuple log panggilan mempunyai nombor pihak yang memanggil, nombor pihak yang menerima dan tempoh panggilan. Bolt ini hanya mencipta nilai baharu dengan menggabungkan nombor pihak yang memanggil dan nombor pihak yang menerima. Nilai baharu mempunyai format "Nombor Panggilan - Nombor Penerima" dan menamakan medan baharu "Panggilan". Kod lengkap ada di bawah.

Pengekodan - CallLogCounterBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Mencipta topologi

Topologi Ribut pada asasnya ialah struktur Jimat. Kelas TopologyBuilder menyediakan cara yang mudah dan mudah untuk mencipta topologi yang kompleks. Kelas TopologyBuilder mempunyai kaedah untuk menetapkan spout (setSpout) dan set bolt (setBolt) . Akhir sekali, TopologyBuilder mempunyai createTopology untuk mencipta topologi. Gunakan coretan kod berikut untuk mencipta topologi - kaedah

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

shuffleGrouping dan fieldsGrouping membantu menyediakan kumpulan aliran untuk muncung dan bolt.

Kluster Tempatan

Untuk tujuan pembangunan, kita boleh mencipta kluster tempatan menggunakan objek "LocalCluster" dan kemudian menyerahkan topologi menggunakan kaedah "submitTopology" kelas "LocalCluster". Salah satu parameter "submitTopology" ialah contoh kelas "Config". Kelas "Config" digunakan untuk menetapkan pilihan konfigurasi sebelum menyerahkan topologi. Pilihan konfigurasi ini akan digabungkan dengan konfigurasi kluster semasa runtime dan dihantar ke semua tugasan (spouts dan bolt) menggunakan kaedah penyediaan. Setelah topologi diserahkan kepada kluster, kami menunggu 10 saat untuk kluster mengira topologi yang diserahkan dan kemudian menutup kluster menggunakan kaedah "penutupan" "LocalCluster". Kod program lengkap adalah seperti berikut -

Pengekodan - LogAnalyserStorm.java

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

Bina dan jalankan aplikasi

Aplikasi lengkap mempunyai empat kod Java. Mereka adalah-

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java yang berikut boleh dibina


reee

Perintah untuk berlari -


import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Output

Setelah aplikasi bermula, ia akan mengeluarkan butiran lengkap tentang proses permulaan kluster, pengendalian muncung dan bolt, dan akhirnya proses penutupan kluster. Dalam "CallLogCounterBolt" kami mencetak panggilan dan butiran kiraannya. Maklumat ini akan dipaparkan pada konsol seperti berikut -


javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Bahasa bukan JVM ​​

Topologi Storm dilaksanakan melalui antara muka Thrift, yang memudahkan untuk menyerahkan topologi dalam mana-mana bahasa. Storm menyokong Ruby, Python dan banyak bahasa lain. Mari kita lihat ikatan ular sawa.

Python Bindings

Python ialah bahasa pengaturcaraan yang ditafsirkan untuk tujuan umum, interaktif, berorientasikan objek dan peringkat tinggi. Storm menyokong Python untuk melaksanakan topologinya. Python menyokong operasi penembakan, penambat, acking dan pembalakan.

Seperti yang anda tahu, bolt boleh ditakrifkan dalam mana-mana bahasa. Bolt yang ditulis dalam bahasa lain dilaksanakan sebagai subproses dan Storm berkomunikasi dengan mesej JSON melalui stdin/stdout. Mula-mula ambil contoh bolt WordCount yang menyokong pengikatan python.

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Kelas di sini

WordCountmelaksanakan antara muka IRichBolt dan berjalan dengan pelaksanaan python yang menyatakan parameter kaedah super "splitword.py". Sekarang buat pelaksanaan python yang dipanggil "splitword.py".

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Ini adalah contoh pelaksanaan dalam Python yang mengira perkataan dalam ayat yang diberikan. Begitu juga, anda boleh terikat dengan bahasa lain yang disokong juga.