Exemple de travail d'Apache Storm


Nous avons passé en revue les principaux détails techniques d'Apache Storm, il est maintenant temps d'écrire quelques scénarios simples.

Scénario - Analyseur de journal d'appels mobiles

Les appels mobiles et leur durée seront pris en compte comme entrée dans Apache Storm, Storm traitera et regroupera les appels entre le même appelant et le même destinataire ainsi que leur nombre total d'appels.

Création de Spout

Spout est un composant utilisé pour la génération de données. Fondamentalement, un spout implémentera une interface IRichSpout. L'interface "IRichSpout" dispose des méthodes importantes suivantes -

  • open - pour fournir un environnement d'exécution pour Spout. L'exécuteur exécutera cette méthode pour initialiser la tête d'arrosage.

  • nextTuple - Émet les données générées via le collecteur.

  • close - Cette méthode est appelée lorsque le bec est sur le point de se fermer.

  • declareOutputFields - Déclarez le mode de sortie du tuple.

  • ack - Confirmez qu'un tuple spécifique a été traité.

  • fail - Spécifie de ne pas traiter et de ne pas retraiter un tuple spécifique. La signature de la méthode

open

open est la suivante -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - fournit une configuration de tempête pour ce bec.

  • context - Fournit des informations complètes sur l'emplacement du bec dans la topologie, son ID de tâche, les informations d'entrée et de sortie.

  • collector - nous permet d'émettre des tuples qui seront traités par des boulons. La signature de la méthode

nextTuple

nextTuple est la suivante -

nextTuple()

nextTuple() est appelée périodiquement à partir de la même boucle que les méthodes ack() et fail(). Il doit libérer le contrôle du thread lorsqu'il n'y a aucun travail à faire afin que d'autres méthodes aient une chance d'être appelées. Par conséquent, la première ligne de nextTuple vérifie si le traitement est terminé. Si tel est le cas, il doit rester en veille pendant au moins une milliseconde pour réduire la charge du processeur avant de revenir. La signature de la méthode

close

close est la suivante-

close()

declareOutputFields

declareOutputFieldsLa signature de la méthode est la suivante-

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Il est utilisé pour déclarer la sortie identifiant de flux, champs de sortie, etc.

cette méthode utilisée pour spécifier le mode de sortie des tuples. La signature de la méthode

ack

ack est la suivante -

ack(Object msgId)

Cette méthode confirme qu'un tuple spécifique a été traité. La signature de la méthode

fail

nextTuple est la suivante -

ack(Object msgId)

Cette méthode notifie qu'un tuple spécifique n'a pas été entièrement traité. Storm retraitera des tuples spécifiques.

FakeCallLogReaderSpout

Dans notre scénario, nous devons collecter les détails du journal d'appels. Les informations du journal des appels sont incluses.

    Numéro d'appel
  • Numéro de réception

  • Durée
Comme nous n'avons pas d'informations en temps réel sur les journaux d'appels, nous générerons de faux journaux d'appels. De fausses informations seront créées à l’aide de la classe Random. Le code complet du programme est le suivant.

Codage - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Création de Bolt

Bolt est un composant qui prend des tuples en entrée, traite les tuples et produit de nouveaux tuples en sortie. Bolts implémentera l'interface IRichBolt. Dans ce programme, deux classes Bolts
CallLogCreatorBolt et CallLogCounterBolt sont utilisées pour effectuer des opérations.

L'interface IRichBolt a les méthodes suivantes -

  • prepare - Fournit l'environnement pour que Bolt s'exécute. L'exécuteur exécutera cette méthode pour initialiser le bec.

  • execute - gère l'entrée d'un seul tuple

  • cleanup - appelé lorsque le bec est sur le point de se fermer.

  • declareOutputFields - Déclarez le mode de sortie du tuple. La signature de la méthode

Prepare

prepare est la suivante -


prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - fournit la configuration Storm pour ce boulon.

  • context - Fournit des informations complètes sur l'emplacement du boulon dans la topologie, son ID de tâche, les informations d'entrée et de sortie, etc.

  • collector - Nous permet d'émettre des tuples traités. La signature de la méthode

execute

execute est la suivante -

execute(Tuple tuple)

où le tuple est le tuple d'entrée à traiter. La méthode

execute traite un seul tuple à la fois. Les données Tuple sont accessibles via la méthode getValue de la classe Tuple. Les tuples d'entrée ne doivent pas être traités immédiatement. Les tuples peuvent être traités et générés sous la forme d'un tuple de sortie unique. Les tuples traités peuvent être émis à l'aide de la classe OutputCollector. La signature de la méthode

cleanup

cleanup est la suivante -

cleanup()

declareOutputFields

declareOutputFieldsLa signature de la méthode est la suivante -

declareOutputFields(OutputFieldsDeclarer declarer)

Le paramètre ici declarer est utilisé pour déclarez l'identifiant du flux de sortie, les champs de sortie, etc.

Cette méthode est utilisée pour spécifier le mode de sortie des tuples.

Boulon du créateur du journal d'appels

Le boulon du créateur du journal d'appels reçoit un tuple du journal d'appels. Le tuple du journal d'appels contient le numéro de l'appelant, le numéro du destinataire et la durée de l'appel. Ce boulon crée simplement une nouvelle valeur en combinant le numéro de l'appelant et le numéro du destinataire. La nouvelle valeur a le format "Numéro appelant - Numéro du destinataire" et nomme le nouveau champ "Appel". Le code complet est ci-dessous.

Encodage - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call Log Counter Bolt

Call Log Creator Bolt reçoit des tuples de journal d'appels. Le tuple du journal d'appels contient le numéro de l'appelant, le numéro du destinataire et la durée de l'appel. Ce boulon crée simplement une nouvelle valeur en combinant le numéro de l'appelant et le numéro du destinataire. La nouvelle valeur a le format "Numéro appelant - Numéro du destinataire" et nomme le nouveau champ "Appel". Le code complet est ci-dessous.

Codage - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Création de la topologie

La topologie Storm est fondamentalement une structure Thrift. La classe TopologyBuilder offre un moyen simple et facile de créer des topologies complexes. La classe TopologyBuilder a des méthodes pour définir le bec (setSpout) et le boulon (setBolt) . Enfin, TopologyBuilder dispose de createTopology pour créer des topologies. Utilisez l'extrait de code suivant pour créer la topologie - les méthodes

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping et fieldsGrouping aident à configurer le regroupement de flux pour les becs et les boulons.

Cluster Local

À des fins de développement, nous pouvons créer un cluster local à l'aide de l'objet "LocalCluster" puis soumettre la topologie à l'aide de la méthode "submitTopology" de la classe "LocalCluster". L'un des paramètres de "submitTopology" est une instance de la classe "Config". La classe "Config" est utilisée pour définir les options de configuration avant de soumettre la topologie. Cette option de configuration sera fusionnée avec la configuration du cluster au moment de l'exécution et envoyée à toutes les tâches (becs et boulons) à l'aide de la méthode de préparation. Une fois la topologie soumise au cluster, nous attendons 10 secondes que le cluster calcule la topologie soumise, puis arrêtons le cluster en utilisant la méthode "shutdown" de "LocalCluster". Le code complet du programme est le suivant -

Codage - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Construisez et exécutez l'application

L'application complète comporte quatre codes Java. Ils sont-

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

L'application peut être construite à l'aide de la commande suivante-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Commande pour courir -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Résultat

Une fois l'application démarrée, elle affichera des détails complets sur le processus de démarrage du cluster, la gestion du bec et des boulons, et enfin le processus d'arrêt du cluster. Dans "CallLogCounterBolt", nous imprimons l'appel et ses détails de comptage. Ces informations seront affichées sur la console comme suit -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Langues non JVM

La topologie Storm est implémentée via l'interface Thrift, ce qui facilite la soumission de topologies dans n'importe quelle langue. Storm prend en charge Ruby, Python et de nombreux autres langages. Jetons un coup d'œil aux liaisons Python.

Python Bindings

Python est un langage de programmation interprété, interactif, orienté objet et de haut niveau à usage général. Storm prend en charge Python pour implémenter sa topologie. Python prend en charge les opérations de déclenchement, d'ancrage, d'accusé de réception et de journalisation.

Comme vous le savez, le boulon peut être défini dans n'importe quelle langue. Un boulon écrit dans un autre langage s'exécute en tant que sous-processus et Storm communique avec les messages JSON via stdin/stdout. Prenez d’abord un exemple de boulon WordCount qui prend en charge les liaisons Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

La classe iciWordCountimplémente l'interface IRichBoltet s'exécute avec l'implémentation python en spécifiant le paramètre de super méthode "splitword.py". Créez maintenant une implémentation Python appelée "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Il s'agit d'un exemple d'implémentation en Python qui compte les mots dans une phrase donnée. De même, vous pouvez également vous lier à d’autres langues prises en charge.