Apache Storm working example
We have gone through the core technical details of Apache Storm, now it is time to write some simple scenarios.
Scenario - Mobile Call Log Analyzer
Mobile calls and their duration will be used as input to Apache Storm, Storm will process and group calls between the same caller and receiver and Its total number of calls.
Spout creation
Spout is a component used for data generation. Basically, a spout will implement an IRichSpout interface. The "IRichSpout" interface has the following important methods -
open - Provides an execution environment for Spout. The executor will run this method to initialize the sprinkler head.
nextTuple - Emit the generated data through the collector.
#close - This method is called when the spout is about to close.
declareOutputFields - Declare the output mode of the tuple.
#ack - Confirm that a specific tuple was processed.
#fail - Specifies not to process and not reprocess specific tuples.
open
The signature of the open method is as follows -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - Provide storm configuration for this spout.
context - Provides complete information about the spout's location in the topology, its task ID, input and output information.
collector - enables us to emit tuples that will be processed by bolts.
nextTuple
The signature of nextTuple method is as follows -
nextTuple()
nextTuple() starts from the same as ack() and fail() methods Called periodically in the loop. It must release control of the thread when there is no work to do so that other methods have a chance to be called. Therefore, the first line of nextTuple checks whether processing has completed. If so, it should sleep for at least one millisecond to reduce the load on the processor before returning.
close
closeThe signature of the method is as follows-
close()
declareOutputFields
declareOutputFieldsThe signature of the method is as follows -
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -It is used to declare the output stream id, output fields, etc.
This method is used to specify the output mode of the tuple.
ack
ackThe signature of the method is as follows -
ack(Object msgId)
This method confirms that the specific tuple has been processed .
fail
nextTupleThe signature of the method is as follows -
ack(Object msgId)
This method notifies that a specific tuple has not been fully processed. Storm will reprocess specific tuples.
FakeCallLogReaderSpout
In our scenario we need to collect call log details. Call log information is included.
- Calling Number
- Received Number
- Duration
Since we do not have real-time information of call logs, we Fake call logs will be generated. Fake information will be created using the Random class. The complete program code is as follows.
Coding - FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bolt Creation
Bolt is a function that uses tuples as input, processes the tuples, and produces new tuples as output s component. Bolts will implement the IRichBolt interface. In this program, two Bolts
Classes CallLogCreatorBolt and CallLogCounterBolt are used to perform operations.
The IRichBolt interface has the following methods -
prepare - Provides the environment for bolt to execute. The executor will run this method to initialize the spout.
execute - Process input of a single tuple
cleanup - Called when the spout is to be closed.
declareOutputFields - Declare the output mode of the tuple.
Prepare
prepare The signature of the method is as follows -prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - Provides Storm configuration for this bolt.
context - Provides complete information about the bolt's location in the topology, its task ID, input and output information, etc.
#collector - Enables us to emit processed tuples.
execute
executeThe signature of the method is as follows -
execute(Tuple tuple)
The element here Group is the input tuple to be processed. The
execute method processes a single tuple at a time. Tuple data can be accessed through the getValue method of the Tuple class. The input tuples do not have to be processed immediately. Tuples can be processed and output as a single output tuple. Processed tuples can be emitted by using the OutputCollector class.
cleanup
cleanupThe signature of the method is as follows-
cleanup()
declareOutputFields
## The signature of the #declareOutputFields method is as follows -
declareOutputFields(OutputFieldsDeclarer declarer)The parameters here
declarer are used to declare the output stream id, output fields, etc.
This method is used to specify the output modeof the tuple.
Call log creator boltCall log creator bolt receives call log tuples. The call log tuple has the calling party number, the receiving party number and the call duration. This bolt simply creates a new value by combining the calling party number and the receiving party number. The new value has the format "Calling Number - Receiver Number" and names the new field "Call". The complete code is below. Encoding - CallLogCreatorBolt.java//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Call Log Counter BoltThe call log creator bolt receives call log tuples. The call log tuple has the calling party number, the receiving party number and the call duration. This bolt simply creates a new value by combining the calling party number and the receiving party number. The new value has the format "Calling Number - Receiver Number" and names the new field "Call". The complete code is below. Coding - CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Create topology
Storm topology is basically a Thrift structure. The TopologyBuilder class provides simple and easy way to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and set bolt (setBolt) . Finally, TopologyBuilder has createTopology to create topologies. Use the following code snippet to create the topology -
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping and fieldsGrouping methods help in setting up flow grouping for spouts and bolts.
Local Cluster
For development purposes, we can use the "LocalCluster" object to create a local cluster and then submit the topology using the "submitTopology" method of the "LocalCluster" class. One of the parameters of "submitTopology" is an instance of the "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at runtime and sent to all tasks (spouts and bolts) using the prepare method. Once the topology is submitted to the cluster, we wait 10 seconds for the cluster to calculate the submitted topology and then shut down the cluster using the "shutdown" method of "LocalCluster". The complete program code is as follows -
Coding - LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
Building and running the application
The complete application has four Java codes. They are -
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
The application can be built using the following command -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
The application can be run using the following command -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
Once the application startup, it will output complete details about the cluster startup process, spout and bolt handling, and finally the cluster shutdown process. In "CallLogCounterBolt" we print the call and its count details. This information will be displayed on the console as follows -
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
Non-JVM languages
Storm topology is implemented through the Thrift interface, which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let's take a look at the python bindings.
Python Bindings
Python is a general-purpose interpreted, interactive, object-oriented and high-level programming language. Storm supports Python to implement its topology. Python supports firing, anchoring, acking and logging operations.
As you know, bolt can be defined in any language. A bolt written in another language executes as a subprocess and Storm communicates with JSON messages via stdin/stdout. First take a sample bolt WordCount that supports python bindings.
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
The class here WordCount implements the IRichBolt interface and runs with the python implementation specifying the super method parameter "splitword.py". Now create a python implementation called "splitword.py".
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
This is a sample implementation in Python that counts words in a given sentence. Likewise, you can bind to other supported languages as well.