Apache Storm Trident


Trident ialah lanjutan daripada Storm. Seperti Storm, Trident juga dibangunkan oleh Twitter. Sebab utama untuk membangunkan Trident adalah untuk menyediakan abstraksi peringkat tinggi di atas Storm, serta pemprosesan penstriman stateful dan pertanyaan teragih kependaman rendah.

Trident menggunakan muncung dan bolt, tetapi komponen aras rendah ini dijana secara automatik oleh Trident sebelum pelaksanaan. Trident mempunyai fungsi, penapis, cantuman, pengelompokan dan pengagregatan.

Trident memproses aliran ke dalam satu siri kelompok, dipanggil transaksi. Biasanya, saiz kumpulan mini ini akan mengikut susunan beribu atau berjuta tupel, bergantung pada aliran input. Dengan cara ini, Trident berbeza daripada Storm kerana ia melakukan pemprosesan tuple-by-tuple.

Konsep pemprosesan kelompok sangat serupa dengan transaksi pangkalan data. Setiap transaksi diberikan ID transaksi. Urus niaga dianggap berjaya setelah semua pemprosesannya selesai. Walau bagaimanapun, kegagalan untuk memproses salah satu tupel transaksi akan menyebabkan keseluruhan transaksi dihantar semula. Untuk setiap kumpulan, Trident akan memanggil beginCommit pada permulaan transaksi dan commit pada penghujung.

Trident Topology

API Trident mendedahkan pilihan mudah untuk mencipta topologi Trident menggunakan kelas "TridentTopology". Pada asasnya, topologi Trident menerima aliran input daripada aliran keluar dan melaksanakan urutan operasi yang tertib (penapisan, pengagregatan, pengelompokan, dll.) pada aliran. Tuple ribut digantikan dengan tupel Trident, dan bolt digantikan dengan operasi. Topologi Trident mudah boleh dibuat seperti berikut -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples ialah senarai nilai yang dinamakan. Antara muka TridentTuple ialah model data untuk topologi Trident. Antara muka TridentTuple ialah unit asas data yang boleh diproses oleh topologi Trident.

Trident Spout

Trident spout adalah serupa dengan Storm, dengan pilihan tambahan menggunakan fungsi Trident. Sebenarnya, kami masih boleh menggunakan IRichSpout, kami menggunakannya dalam topologi Storm, tetapi ia tidak bersifat transaksional dan kami tidak akan dapat menggunakan kelebihan yang ditawarkan oleh Trident.

Pancaran asas dengan semua ciri menggunakan ciri Trident ialah "ITridentSpout". Ia menyokong transaksi dan semantik transaksi legap. Spout lain ialah IBatchSpout, IPartitionedTridentSpout dan IOpaquePartitionedTridentSpout.

Selain muncung generik ini, Trident mempunyai banyak contoh pelaksanaan muncung trisula. Salah satunya ialah output FeederBatchSpout, yang boleh kami gunakan untuk menghantar senarai nama tuple trisula tanpa perlu risau tentang batching, paralelisme, dsb.

Penciptaan FeederBatchSpout dan suapan data boleh dilakukan seperti berikut -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Operasi Trident

Trident bergantung pada "Operasi Trident" untuk mengendalikan aliran input tupel trisula. API Trident mempunyai berbilang operasi terbina dalam untuk mengendalikan pemprosesan strim yang mudah kepada kompleks. Operasi ini terdiri daripada pengesahan mudah kepada pengumpulan kompleks dan pengagregatan tupel trisula. Mari kita lalui operasi yang paling penting dan kerap digunakan.

Penapis

Penapis ialah objek yang digunakan untuk melaksanakan tugas pengesahan input. Penapis trisula mengambil subset medan tupel trisula sebagai input dan mengembalikan benar atau salah berdasarkan sama ada syarat tertentu dipenuhi. Jika benar dikembalikan, tupel disimpan dalam aliran keluaran jika tidak, tupel dikeluarkan daripada aliran. Penapis pada asasnya akan mewarisi daripada kelas BaseFilter dan melaksanakan kaedah isKeep. Berikut ialah contoh pelaksanaan operasi penapis -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Fungsi penapis boleh dipanggil dalam topologi menggunakan kaedah "setiap". Kelas "Fields" boleh digunakan untuk menentukan input (subset tuple trisula). Kod sampel adalah seperti berikut -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Fungsi

Fungsi ialah objek yang digunakan untuk melakukan operasi mudah pada satu tupel trisula. Ia memerlukan subset medan tupel trisula dan mengeluarkan sifar atau lebih medan tupel trisula baharu.

Fungsi pada asasnya mewarisi daripada kelas BaseFunction dan melaksanakan kaedah execute. Contoh pelaksanaan diberikan di bawah:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Sama seperti operasi penapis, setiap kaedah boleh digunakan untuk memanggil operasi fungsi dalam topologi. Kod sampel adalah seperti berikut -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

Aggregation ialah objek yang digunakan untuk melakukan operasi pengagregatan pada kelompok input atau partition atau strim. Trident mempunyai tiga jenis agregasi. Ia adalah seperti berikut -

  • agregat - agregat setiap kumpulan tupel trisula secara individu. Semasa proses pengagregatan, tupel mula-mula dipartisikan semula menggunakan pengelompokan global untuk menggabungkan semua partition kumpulan yang sama menjadi satu partition.

  • partitionAggregate - Agregat setiap partition dan bukannya keseluruhan tuple trisula. Output koleksi terbahagi sepenuhnya menggantikan tupel input. Output koleksi terbahagi mengandungi tuple medan tunggal.

  • persistentaggregate - Agregat semua tupel trisula dalam semua kelompok dan simpan hasilnya dalam memori atau pangkalan data.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Operasi pengagregatan boleh dibuat menggunakan antara muka CombinerAggregator, ReducerAggregator atau Aggregator generik. Agregator "count" yang digunakan dalam contoh di atas adalah salah satu agregator terbina dalam, yang dilaksanakan menggunakan "CombinerAggregator" dan dilaksanakan seperti berikut -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Group

Operasi pengelompokan ialah operasi terbina dalam yang boleh dipanggil dengan kaedah kumpulanBy. Kaedah groupBy membahagikan semula strim dengan melakukan partitionBy pada medan yang ditentukan, dan kemudian dalam setiap partition, ia menggabungkan tupel yang medan kumpulannya adalah sama. Biasanya, kami menggunakan "groupBy" dan "persistentAggregate" untuk mendapatkan pengagregatan berkumpulan. Kod contoh adalah seperti berikut -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Gabung dan Sertai

Gabung dan Sertai boleh dilakukan dengan menggunakan kaedah "Gabung" dan "Sertai" masing-masing. Gabung menggabungkan satu atau lebih strim. Pencantuman adalah serupa dengan penggabungan, kecuali fakta bahawa pencantuman menggunakan medan tuple trisula dari kedua-dua belah pihak untuk memeriksa dan menyertai kedua-dua aliran. Selain itu, penyertaan hanya akan berfungsi pada peringkat kelompok. Kod sampel adalah seperti berikut -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Penyelenggaraan negeri

Trident menyediakan mekanisme untuk penyelenggaraan negeri. Maklumat keadaan boleh disimpan dalam topologi itu sendiri, atau ia boleh disimpan dalam pangkalan data yang berasingan. Sebabnya adalah untuk mengekalkan keadaan dan jika mana-mana tuple gagal semasa pemprosesan, tuple yang gagal akan dicuba semula. Ini menimbulkan masalah semasa mengemas kini keadaan, kerana anda tidak pasti sama ada keadaan tupel ini telah dikemas kini sebelum ini. Jika tupel telah gagal sebelum mengemas kini keadaan, mencuba semula tupel akan menstabilkan keadaan. Walau bagaimanapun, jika tuple gagal selepas mengemas kini keadaan, mencuba semula tupel yang sama akan menambah kiraan dalam pangkalan data sekali lagi dan menjadikan keadaan tidak stabil. Langkah berikut diperlukan untuk memastikan mesej diproses sekali sahaja -

  • Proses tupel dalam kelompok kecil.

  • Tetapkan ID unik kepada setiap kumpulan. Jika kumpulan dicuba semula, ID unik yang sama diberikan.

  • Kemas kini status diisih antara kelompok. Sebagai contoh, kumpulan kedua kemas kini status tidak akan dapat dilakukan sehingga kumpulan pertama kemas kini status selesai.

RPC Teragih

RPC Teragih digunakan untuk bertanya dan mendapatkan keputusan topologi Trident. Storm mempunyai pelayan RPC teragih terbina dalam. Pelayan RPC yang diedarkan menerima permintaan RPC daripada pelanggan dan menghantarnya ke topologi. Topologi memproses permintaan dan menghantar hasilnya kepada pelayan RPC yang diedarkan, yang mengalihkannya kepada klien. Pertanyaan RPC teragih Trident dilaksanakan seperti pertanyaan RPC biasa, kecuali fakta bahawa pertanyaan ini dijalankan secara selari.

Bila hendak menggunakan Trident?

Dalam banyak kes penggunaan, jika keperluan adalah untuk memproses pertanyaan sekali sahaja, kita boleh mencapainya dengan menulis topologi dalam Trident. Sebaliknya, pemprosesan satu pukulan yang tepat akan sukar dicapai dalam kes Storm. Oleh itu, Trident akan berguna untuk kes penggunaan yang perlu diproses sekali gus. Trident tidak sesuai untuk semua kes penggunaan, terutamanya kes penggunaan berprestasi tinggi, kerana ia menambahkan kerumitan pada Storm dan mengurus keadaan.

Contoh kerja Trident

Kami akan menukar aplikasi penganalisis log panggilan yang dibangunkan di bahagian sebelumnya kepada rangka kerja Trident. Terima kasih kepada API peringkat tingginya, aplikasi Trident akan menjadi lebih mudah daripada Storm biasa. Ribut pada asasnya perlu melaksanakan mana-mana satu operasi Fungsi, Penapis, Agregat, GroupBy, Sertai dan Gabung dalam Trident. Akhir sekali, kami akan memulakan pelayan DRPC menggunakan kelas LocalDRPC dan mencari beberapa kata kunci menggunakan kaedah pelaksanaan kelas LocalDRPC.

Format maklumat panggilan

Tujuan kelas FormatCall adalah untuk memformat maklumat panggilan termasuk "nombor pemanggil" dan "nombor penerima". Kod program lengkap adalah seperti berikut -

Pengekodan: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Tujuan kelas CSVSplit adalah untuk memisahkan rentetan input berdasarkan "koma(,)" dan mengeluarkan setiap perkataan dalam rentetan. Fungsi ini digunakan untuk menghuraikan parameter input pertanyaan yang diedarkan. Kod lengkap adalah seperti berikut -

Pengekodan: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Ini adalah aplikasi utama. Pada mulanya, aplikasi akan memulakan TridentTopology menggunakan FeederBatchSpout dan memberikan maklumat pemanggil. Aliran topologi Trident boleh dibuat menggunakan kaedah newStream kelas TridentTopology. Begitu juga, aliran DRPC topologi Trident boleh dibuat menggunakan kaedah newDRCPStream kelas TridentTopology. Pelayan DRCP mudah boleh dibuat menggunakan kelas LocalDRPC. LocalDRPCtelah melaksanakan kaedah untuk mencari beberapa kata kunci. Kod lengkap ada di bawah.

Pengekodan: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Membina dan menjalankan aplikasi

Aplikasi lengkap mempunyai tiga kod Java. Ia adalah seperti berikut-

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Aplikasi boleh dibina menggunakan arahan berikut-

rreee boleh dijalankan menggunakan arahan berikut

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Output


Sebaik sahaja aplikasi dimulakan, aplikasi akan mengeluarkan butiran lengkap tentang proses permulaan kluster, pemprosesan operasi, pelayan DRPC dan maklumat pelanggan, dan akhirnya proses penutupan kluster. Output ini akan dipaparkan pada konsol seperti yang ditunjukkan di bawah.

rreeee