Apache Storm working example


We have gone through the core technical details of Apache Storm, now it is time to write some simple scenarios.

Scenario - Mobile Call Log Analyzer

Mobile calls and their duration will be used as input to Apache Storm, Storm will process and group calls between the same caller and receiver and Its total number of calls.

Spout creation

Spout is a component used for data generation. Basically, a spout will implement an IRichSpout interface. The "IRichSpout" interface has the following important methods -

  • open - Provides an execution environment for Spout. The executor will run this method to initialize the sprinkler head.

  • nextTuple - Emit the generated data through the collector.

  • #close - This method is called when the spout is about to close.

  • declareOutputFields - Declare the output mode of the tuple.

  • #ack - Confirm that a specific tuple was processed.

  • #fail - Specifies not to process and not reprocess specific tuples.

open

The signature of the open method is as follows -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Provide storm configuration for this spout.

  • context - Provides complete information about the spout's location in the topology, its task ID, input and output information.

  • collector - enables us to emit tuples that will be processed by bolts.

nextTuple

The signature of nextTuple method is as follows -

nextTuple()

nextTuple() starts from the same as ack() and fail() methods Called periodically in the loop. It must release control of the thread when there is no work to do so that other methods have a chance to be called. Therefore, the first line of nextTuple checks whether processing has completed. If so, it should sleep for at least one millisecond to reduce the load on the processor before returning.

close

closeThe signature of the method is as follows-

close()

declareOutputFields

declareOutputFieldsThe signature of the method is as follows -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -It is used to declare the output stream id, output fields, etc.

This method is used to specify the output mode of the tuple.

ack

ackThe signature of the method is as follows -

ack(Object msgId)

This method confirms that the specific tuple has been processed .

fail

nextTupleThe signature of the method is as follows -

ack(Object msgId)

This method notifies that a specific tuple has not been fully processed. Storm will reprocess specific tuples.

FakeCallLogReaderSpout

In our scenario we need to collect call log details. Call log information is included.

  • Calling Number
  • Received Number
  • Duration

Since we do not have real-time information of call logs, we Fake call logs will be generated. Fake information will be created using the Random class. The complete program code is as follows.

Coding - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt Creation

Bolt is a function that uses tuples as input, processes the tuples, and produces new tuples as output s component. Bolts will implement the IRichBolt interface. In this program, two Bolts
Classes CallLogCreatorBolt and CallLogCounterBolt are used to perform operations.

The IRichBolt interface has the following methods -

  • prepare - Provides the environment for bolt to execute. The executor will run this method to initialize the spout.

  • execute - Process input of a single tuple

  • cleanup - Called when the spout is to be closed.

  • declareOutputFields - Declare the output mode of the tuple.

Prepare

prepare The signature of the method is as follows -


prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Provides Storm configuration for this bolt.

  • context - Provides complete information about the bolt's location in the topology, its task ID, input and output information, etc.

  • #collector - Enables us to emit processed tuples.

execute

executeThe signature of the method is as follows -

execute(Tuple tuple)

The element here Group is the input tuple to be processed. The

execute method processes a single tuple at a time. Tuple data can be accessed through the getValue method of the Tuple class. The input tuples do not have to be processed immediately. Tuples can be processed and output as a single output tuple. Processed tuples can be emitted by using the OutputCollector class.

cleanup

cleanupThe signature of the method is as follows-

cleanup()

declareOutputFields

## The signature of the #declareOutputFields method is as follows -

declareOutputFields(OutputFieldsDeclarer declarer)

The parameters here

declarer are used to declare the output stream id, output fields, etc.

This method is used to specify the output mode

of the tuple.

Call log creator bolt

Call log creator bolt receives call log tuples. The call log tuple has the calling party number, the receiving party number and the call duration. This bolt simply creates a new value by combining the calling party number and the receiving party number. The new value has the format "Calling Number - Receiver Number" and names the new field "Call". The complete code is below.

Encoding - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call Log Counter Bolt

The call log creator bolt receives call log tuples. The call log tuple has the calling party number, the receiving party number and the call duration. This bolt simply creates a new value by combining the calling party number and the receiving party number. The new value has the format "Calling Number - Receiver Number" and names the new field "Call". The complete code is below.

Coding - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Create topology

Storm topology is basically a Thrift structure. The TopologyBuilder class provides simple and easy way to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and set bolt (setBolt) . Finally, TopologyBuilder has createTopology to create topologies. Use the following code snippet to create the topology -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping and fieldsGrouping methods help in setting up flow grouping for spouts and bolts.

Local Cluster

For development purposes, we can use the "LocalCluster" object to create a local cluster and then submit the topology using the "submitTopology" method of the "LocalCluster" class. One of the parameters of "submitTopology" is an instance of the "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at runtime and sent to all tasks (spouts and bolts) using the prepare method. Once the topology is submitted to the cluster, we wait 10 seconds for the cluster to calculate the submitted topology and then shut down the cluster using the "shutdown" method of "LocalCluster". The complete program code is as follows -

Coding - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Building and running the application

The complete application has four Java codes. They are -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

The application can be built using the following command -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run using the following command -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

Once the application startup, it will output complete details about the cluster startup process, spout and bolt handling, and finally the cluster shutdown process. In "CallLogCounterBolt" we print the call and its count details. This information will be displayed on the console as follows -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Non-JVM languages

Storm topology is implemented through the Thrift interface, which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let's take a look at the python bindings.

Python Bindings

Python is a general-purpose interpreted, interactive, object-oriented and high-level programming language. Storm supports Python to implement its topology. Python supports firing, anchoring, acking and logging operations.

As you know, bolt can be defined in any language. A bolt written in another language executes as a subprocess and Storm communicates with JSON messages via stdin/stdout. First take a sample bolt WordCount that supports python bindings.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

The class here WordCount implements the IRichBolt interface and runs with the python implementation specifying the super method parameter "splitword.py". Now create a python implementation called "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

This is a sample implementation in Python that counts words in a given sentence. Likewise, you can bind to other supported languages ​​as well.