Apache Storm Trident


Trident is an extension of Storm. Like Storm, Trident is also developed by Twitter. The main reason for developing Trident is to provide high-level abstractions on top of Storm, as well as stateful streaming processing and low-latency distributed queries.

Trident uses spouts and bolts, but these low-level components are automatically generated by Trident before execution. Trident has functions, filters, joins, grouping and aggregation.

Trident processes streams into a series of batches, called transactions. Typically, the size of these mini-batches will be on the order of thousands or millions of tuples, depending on the input stream. In this way, Trident differs from Storm in that it performs tuple-by-tuple processing.

The batch processing concept is very similar to database transactions. Each transaction is assigned a transaction ID. The transaction is considered successful once all its processing is completed. However, failure to process one of the transaction's tuples will cause the entire transaction to be retransmitted. For each batch, Trident will call beginCommit at the beginning of the transaction and commit at the end.

Trident Topology

The Trident API exposes a simple option to create a Trident topology using the "TridentTopology" class. Basically, the Trident topology receives an input stream from an outflow and performs an ordered sequence of operations (filtering, aggregation, grouping, etc.) on the stream. Storm tuples are replaced with Trident tuples, and bolts are replaced with operations. A simple Trident topology can be created as follows -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples is a named list of values. The TridentTuple interface is the data model for Trident topology. The TridentTuple interface is the basic unit of data that can be processed by a Trident topology.

Trident Spout

The Trident spout is similar to the Storm spout, with additional options using Trident's functionality. Actually, we can still use IRichSpout, we use it in Storm topology, but it is non-transactional in nature and we will not be able to use the advantages offered by Trident.

The base spout with all the functionality of using Trident's traits is "ITridentSpout". It supports transactions and opaque transaction semantics. Other spouts are IBatchSpout, IPartitionedTridentSpout and IOpaquePartitionedTridentSpout.

In addition to these generic spouts, Trident has many sample implementations of trident spouts. One of them is the FeederBatchSpout output, which we can use to send a named list of trident tuples without having to worry about batching, parallelism, etc.

FeederBatchSpout creation and data feeding can be done as follows -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operation

Trident relies on the "Trident Operation" to handle the input stream of trident tuples. The Trident API has multiple built-in operations to handle simple to complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples. Let's go through the most important and frequently used operations.

Filtering

A filter is an object used to perform input validation tasks. Trident filters take a subset of trident tuples fields as input and return true or false based on whether certain conditions are met. If true is returned, the tuple is saved in the output stream; otherwise, the tuple is removed from the stream. The filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is an example implementation of a filter operation -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Filter functions can be called in a topology using the "each" method. The "Fields" class can be used to specify inputs (subsets of trident tuples). The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function is an object used to perform simple operations on a single trident tuple. It takes a subset of the trident tuple fields and emits zero or more new trident tuple fields.

Function Basically inherits from the BaseFunction class and implements the execute method. An example implementation is given below:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Similar to filter operations, function operations can be called within the topology using each method. The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

Aggregation is an object used to perform aggregation operations on input batches or partitions or streams. Trident has three types of aggregations. They are as follows -

  • aggregate - Aggregate each batch of trident tuples individually. During the aggregation process, tuples are first repartitioned using global grouping to combine all partitions of the same batch into a single partition.

  • partitionAggregate - Aggregate each partition instead of the entire trident tuple. The output of a partitioned collection completely replaces the input tuples. The output of a partitioned collection contains a single field tuple.

  • persistentaggregate - Aggregates all trident tuples in all batches and stores the results in memory or database.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Aggregation operations can be created using the CombinerAggregator, ReducerAggregator or generic Aggregator interface. The "count" aggregator used in the above example is one of the built-in aggregators, which is implemented using "CombinerAggregator" and is implemented as follows -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

The grouping operation is a built-in operation , can be called by the groupBy method. The groupBy method repartitions the stream by performing partitionBy on the specified field, and then within each partition, it combines tuples whose group fields are equal. Usually, we use "groupBy" and "persistentAggregate" to obtain grouped aggregation. The sample code is as follows -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merge and Join

Merge and join can be done by using the "Merge" and "Join" methods respectively. Merge combines one or more streams. Joining is similar to merging, except for the fact that joining uses trident tuple fields from both sides to examine and join the two streams. Additionally, joins will only work at the batch level. The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

State maintenance

Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, or it can be stored in a separate database. The reason is to maintain a state and if any tuple fails during processing, the failed tuple is retried. This creates problems when updating state, because you are not sure if this tuple's state has been updated before. If the tuple has failed before updating the state, retrying the tuple will stabilize the state. However, if the tuple fails after updating the state, retrying the same tuple will increment the count in the database again and make the state unstable. The following steps are required to ensure messages are processed only once -

  • Process tuples in small batches.

  • Assign a unique ID to each batch. If the batch is retried, the same unique ID is given.

  • Status updates are sorted between batches. For example, the second batch of status updates will not be possible until the first batch of status updates is completed.

Distributed RPC

Distributed RPC is used to query and retrieve Trident topology results. Storm has a built-in distributed RPC server. The distributed RPC server receives RPC requests from clients and delivers them to the topology. The topology processes the request and sends the result to the distributed RPC server, which redirects it to the client. Trident's distributed RPC queries execute like normal RPC queries, except for the fact that these queries run in parallel.

When to use Trident?

In many use cases, if the requirement is to process a query only once, we can achieve this by writing the topology in Trident. On the other hand, exact one-shot processing will be difficult to achieve in the case of Storm. Therefore, Trident will be useful for those use cases that need to be processed in one go. Trident is not suitable for all use cases, especially high-performance use cases, as it adds complexity to Storm and manages state.

Working example of Trident

We will convert the call log analyzer application developed in the previous section into the Trident framework. Thanks to its advanced API, Trident applications will be easier than regular Storm. Storm basically needs to perform any one of the Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally, we will start the DRPC server using the LocalDRPC class and search for some keywords using the execute method of the LocalDRPC class.

Format call information

The purpose of the FormatCall class is to format call information including "caller number" and "receiver number". The complete program code is as follows -

Encoding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

The purpose of the CSVSplit class is to split input characters based on "comma(,)" string, and emits each word in the string. This function is used to parse the input parameters of a distributed query. The complete code is as follows -

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

This is the main application. Initially, the application will initialize the TridentTopology using FeederBatchSpout and provide the caller information. Trident topology streams can be created using the newStream method of the TridentTopology class. Similarly, a Trident topology DRPC stream can be created using the newDRCPStream method of the TridentTopology class. A simple DRCP server can be created using the LocalDRPC class. LocalDRPCThere is execute method to search for some keywords. The complete code is below.

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Building and running the application

The complete application has three Java codes. They are as follows -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

can be built using the following command Application -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run using the following command -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

Once the application starts, the application will output Complete details about the cluster startup process, operational processing, DRPC server and client information, and finally the cluster shutdown process. This output will be displayed on the console as shown below.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends