Trident de tempête Apache


Trident est une extension de Storm. Comme Storm, Trident est également développé par Twitter. La principale raison du développement de Trident est de fournir des abstractions de haut niveau au-dessus de Storm, ainsi qu'un traitement de streaming avec état et des requêtes distribuées à faible latence.

Trident utilise des becs et des boulons, mais ces composants de bas niveau sont automatiquement générés par Trident avant l'exécution. Trident a des fonctions, des filtres, des jointures, du regroupement et de l'agrégation.

Trident traite les flux en une série de lots, appelés transactions. Généralement, la taille de ces mini-lots sera de l'ordre de milliers ou de millions de tuples, selon le flux d'entrée. De cette manière, Trident diffère de Storm en ce sens qu'il effectue un traitement tuple par tuple.

Le concept de traitement par lots est très similaire aux transactions de base de données. Chaque transaction se voit attribuer un identifiant de transaction. La transaction est considérée comme réussie une fois que tout son traitement est terminé. Cependant, l'échec du traitement de l'un des tuples de la transaction entraînera la retransmission de l'intégralité de la transaction. Pour chaque lot, Trident appellera beginCommit au début de la transaction et commit à la fin.

Topologie Trident

L'API Trident expose une option simple pour créer une topologie Trident à l'aide de la classe "TridentTopology". Fondamentalement, la topologie Trident reçoit un flux d'entrée d'un flux sortant et effectue une séquence ordonnée d'opérations (filtrage, agrégation, regroupement, etc.) sur le flux. Les tuples Storm sont remplacés par des tuples Trident et les boulons sont remplacés par des opérations. Une topologie Trident simple peut être créée comme suit -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples est une liste nommée de valeurs. L'interface TridentTuple est le modèle de données pour la topologie Trident. L'interface TridentTuple est l'unité de données de base qui peut être traitée par une topologie Trident.

Trident Bec

Le bec Trident est similaire au bec Storm, avec des options supplémentaires utilisant la fonctionnalité Trident. En fait, nous pouvons toujours utiliser IRichSpout, nous l'utilisons dans la topologie Storm, mais il est de nature non transactionnelle et nous ne pourrons pas profiter des avantages offerts par Trident.

Le bec de base avec toutes les fonctionnalités utilisant les fonctionnalités de Trident est "ITridentSpout". Il prend en charge les transactions et la sémantique opaque des transactions. Les autres becs sont IBatchSpout, IPartitionedTridentSpout et IOpaquePartitionedTridentSpout.

En plus de ces becs génériques, Trident propose de nombreux exemples d'implémentations de becs trident. L'un d'eux est la sortie FeederBatchSpout, que nous pouvons utiliser pour envoyer une liste nommée de tuples tridents sans avoir à nous soucier du traitement par lots, du parallélisme, etc.

La création de FeederBatchSpout et l'alimentation des données peuvent être effectuées comme suit -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Opération Trident

Trident s'appuie sur "Trident Operation" pour gérer le flux d'entrée des tuples trident. L'API Trident dispose de plusieurs opérations intégrées pour gérer le traitement de flux simple à complexe. Ces opérations vont de la simple validation au regroupement et à l'agrégation complexes de tuples tridents. Passons en revue les opérations les plus importantes et les plus fréquemment utilisées.

Filtering

Les filtres sont des objets utilisés pour effectuer des tâches de validation d'entrée. Les filtres Trident prennent un sous-ensemble de champs de tuples trident en entrée et renvoient vrai ou faux selon que certaines conditions sont remplies. Si true est renvoyé, le tuple est enregistré dans le flux de sortie ; sinon, le tuple est supprimé du flux. Le filtre héritera essentiellement de la classe BaseFilter et implémentera la méthode isKeep. Voici un exemple d'implémentation d'une opération de filtrage -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Les fonctions de filtre peuvent être appelées dans une topologie en utilisant la méthode "each". La classe "Fields" peut être utilisée pour spécifier des entrées (sous-ensembles de tuples trident). L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function est un objet utilisé pour effectuer des opérations simples sur un seul tuple trident. Il prend un sous-ensemble des champs de tuple trident et émet zéro ou plusieurs nouveaux champs de tuple trident.

Function hérite essentiellement de la classe BaseFunction et implémente la méthode execute. Un exemple d'implémentation est donné ci-dessous :

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Semblable à l'opération de filtrage, chaque méthode peut être utilisée pour appeler des opérations de fonction dans la topologie. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

L'agrégation est un objet utilisé pour effectuer des opérations d'agrégation sur des lots d'entrée, des partitions ou des flux. Trident dispose de trois types d'agrégations. Ils sont les suivants -

  • aggregate - agrège chaque lot de tuples trident individuellement. Au cours du processus d'agrégation, les tuples sont d'abord répartis à l'aide d'un regroupement global pour combiner toutes les partitions du même lot en une seule partition.

  • partitionAggregate - Agrégez chaque partition au lieu du tuple trident entier. La sortie de la collection partitionnée remplace complètement les tuples d'entrée. La sortie d'une collection partitionnée contient un seul tuple de champ.

  • persistentaggregate - Agrégez tous les tuples tridents dans tous les lots et stockez les résultats en mémoire ou dans une base de données.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Les opérations d'agrégation peuvent être créées à l'aide de CombinerAggregator, ReductionrAggregator ou de l'interface générique Aggregator. L'agrégateur "count" utilisé dans l'exemple ci-dessus est l'un des agrégateurs intégrés, qui est implémenté à l'aide de "CombinerAggregator" et est implémenté comme suit -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Group

L'opération de regroupement est une opération intégrée qui peut être appelé par la méthode groupBy. La méthode groupBy répartit le flux en effectuant partitionBy sur le champ spécifié, puis au sein de chaque partition, elle combine des tuples dont les champs de groupe sont égaux. Habituellement, nous utilisons « groupBy » et « persistentAggregate » pour obtenir une agrégation groupée. L'exemple de code est le suivant : 

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merge et Join

Merge et Join peuvent être effectués respectivement en utilisant les méthodes "Merge" et "Join". La fusion combine un ou plusieurs flux. La jointure est similaire à la fusion, à l'exception du fait que la jointure utilise des champs de tuples tridents des deux côtés pour examiner et joindre les deux flux. De plus, les jointures ne fonctionneront qu’au niveau du lot. L'exemple de code est le suivant -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Maintenance de l'état

Trident fournit un mécanisme de maintenance de l'état. Les informations d'état peuvent être stockées dans la topologie elle-même ou dans une base de données distincte. La raison est de maintenir un état et si un tuple échoue pendant le traitement, le tuple ayant échoué est réessayé. Cela crée des problèmes lors de la mise à jour de l'état, car vous n'êtes pas sûr que l'état de ce tuple ait déjà été mis à jour. Si le tuple a échoué avant de mettre à jour l'état, réessayer le tuple stabilisera l'état. Cependant, si le tuple échoue après la mise à jour de l'état, réessayer le même tuple incrémentera à nouveau le décompte dans la base de données et rendra l'état instable. Les étapes suivantes sont nécessaires pour garantir que les messages ne sont traités qu'une seule fois : 

  • Traitez les tuples par petits lots.

  • Attribuez un identifiant unique à chaque lot. Si le lot est réessayé, le même identifiant unique est attribué.

  • Mises à jour de statut triées entre les lots. Par exemple, le deuxième lot de mises à jour de statut ne sera pas possible tant que le premier lot de mises à jour de statut ne sera pas terminé.

Distributed RPC

Distributed RPC est utilisé pour interroger et récupérer les résultats de la topologie Trident. Storm dispose d'un serveur RPC distribué intégré. Le serveur RPC distribué reçoit les requêtes RPC des clients et les transmet à la topologie. La topologie traite la requête et envoie le résultat au serveur RPC distribué, qui le redirige vers le client. Les requêtes RPC distribuées de Trident s'exécutent comme des requêtes RPC normales, à l'exception du fait qu'elles s'exécutent en parallèle.

Quand utiliser Trident ?

Dans de nombreux cas d'utilisation, si l'exigence est de traiter une requête une seule fois, nous pouvons y parvenir en écrivant la topologie dans Trident. En revanche, un traitement précis en une seule fois sera difficile à réaliser dans le cas de Storm. Par conséquent, Trident sera utile pour les cas d'utilisation qui doivent être traités en une seule fois. Trident ne convient pas à tous les cas d'utilisation, en particulier aux cas d'utilisation hautes performances, car il ajoute de la complexité à Storm et gère l'état.

Exemple fonctionnel de Trident

Nous allons convertir l'application d'analyse de journal d'appels développée dans la section précédente vers le framework Trident. Grâce à son API de haut niveau, les applications Trident seront plus simples que celles de Storm classique. Storm doit essentiellement effectuer l’une des opérations Fonction, Filtre, Agrégation, GroupBy, Rejoindre et Fusionner dans Trident. Enfin, nous allons démarrer le serveur DRPC en utilisant la classe LocalDRPC et rechercher quelques mots-clés en utilisant la méthode d'exécution de la classe LocalDRPC.

Formater les informations sur l'appel

Le but de la classe FormatCall est de formater les informations sur l'appel, y compris le "numéro de l'appelant" et le "numéro du destinataire". Le code complet du programme est le suivant -

Encodage : FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Le but de la classe CSVSplit est de diviser la chaîne d'entrée en fonction de "virgule(,)" et d'émettre chaque mot de la chaîne. Cette fonction est utilisée pour analyser les paramètres d'entrée d'une requête distribuée. Le code complet est le suivant -

Codage : CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

C'est l'application principale. Initialement, l'application initialisera TridentTopology à l'aide de FeederBatchSpout et fournira des informations sur l'appelant. Les flux de topologie Trident peuvent être créés à l'aide de la méthode newStream de la classe TridentTopology. De même, un flux DRPC de topologie Trident peut être créé à l'aide de la méthode newDRPCStream de la classe TridentTopology. Un simple serveur DRCP peut être créé à l'aide de la classe LocalDRPC. LocalDRPCdispose d'une méthode d'exécution pour rechercher certains mots-clés. Le code complet est ci-dessous.

Codage : LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Création et exécution de l'application

L'application complète comporte trois codes Java. Ils sont les suivants : 

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

L'application peut être construite à l'aide de la commande suivante-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

L'application peut être exécutée à l'aide de la commande suivante-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Sortie

Une fois l'application démarrée, l'application affichera des détails complets sur le processus de démarrage du cluster, le traitement des opérations, les informations sur le serveur et le client DRPC, et enfin le processus d'arrêt du cluster. Cette sortie sera affichée sur la console comme indiqué ci-dessous.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends