Maison  >  Article  >  Java  >  Développement Java : Comment utiliser Akka Streams pour le traitement des flux et le transfert de données

Développement Java : Comment utiliser Akka Streams pour le traitement des flux et le transfert de données

WBOY
WBOYoriginal
2023-09-22 08:30:26896parcourir

Java开发:如何使用Akka Streams进行流处理和数据传输

Développement Java : Comment utiliser Akka Streams pour le traitement des flux et la transmission de données

Introduction :
Avec le développement rapide du Big Data et du traitement des données en temps réel, la demande de traitement des flux et de transmission de données continue d'augmenter. Dans le développement Java, Akka Streams est une bibliothèque puissante qui simplifie la mise en œuvre du traitement des flux et du transfert de données. Cet article présentera les concepts de base et l'utilisation d'Akka Streams, et fournira des exemples de code détaillés.

1. Présentation d'Akka Streams :
1.1 Qu'est-ce qu'Akka Streams :
Akka Streams fait partie du framework Akka et fournit un modèle de traitement de flux asynchrone, composable et contrôlable. Il utilise un mécanisme de contre-pression pour gérer des vitesses incohérentes de flux de données. Akka Streams est hautement évolutif et flexible et peut facilement gérer des flux de données à grande échelle.

1.2 Concepts de base :

  • Source : la source du flux de données, qui peut être un fichier, une base de données, une connexion réseau, etc. Une source peut émettre zéro ou plusieurs éléments de données.
  • Flow : Composants qui exploitent et transforment les flux de données, tels que le filtrage, la cartographie, l'agrégation, etc. Le flux peut recevoir un ou plusieurs éléments de données et générer un ou plusieurs éléments de données.
  • Sink : Le point final du flux de données, qui peut être un fichier, une base de données, une connexion réseau, etc. Le point de terminaison reçoit les données traitées par Flow et les traite.

2. Utilisation d'Akka Streams :
2.1 Introduction des dépendances :
Tout d'abord, nous devons introduire les dépendances d'Akka Streams dans le projet Java. Ajoutez les dépendances suivantes dans le fichier pom.xml :

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.17</version>
</dependency>

2.2 Implémenter un traitement de flux simple :
Ci-dessous, nous utilisons un exemple simple pour montrer comment utiliser Akka Streams pour le traitement de flux.

Tout d'abord, créez une source de données contenant des entiers :

Source<Integer, NotUsed> source = Source.range(1, 10);

Ensuite, créez un flux qui multiplie les données source par 2 :

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);

Ensuite, créez un récepteur pour recevoir les données traitées en flux :

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

Placez la source, Flow et Sink sont combinés pour créer un traitement de flux complet :

RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);

Enfin, exécutez le traitement de flux :

CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);

Dans le code ci-dessus, nous utilisons différents composants fournis par Akka Streams pour implémenter un traitement de flux simple, y compris les sources de données, Flow et Couler. En connectant ces composants, nous pouvons définir et exécuter un processus complet de traitement de flux.

2.3 Implémenter la transmission de données :
En plus du traitement des flux, Akka Streams peut également être utilisé pour la transmission de données. Ci-dessous, nous prenons la transmission TCP comme exemple pour montrer comment utiliser Akka Streams pour la transmission de données.

Tout d'abord, créez un traitement de flux côté serveur :

final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

Ensuite, démarrez le serveur :

final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource =
    Tcp().bind("localhost", 8888);

final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create()
    .mapAsync(1, connection -> {
        connection.handleWith(serverFlow, materializer);
        return CompletableFuture.completedFuture(connection);
    });

final CompletionStage<Tcp.ServerBinding> binding =
    serverSource.via(handler).to(Sink.ignore()).run(materializer);

Ensuite, créez un traitement de flux côté client :

final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore();

final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow =
    Tcp().outgoingConnection("localhost", 8888);

final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow =
    Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right());

CompletableFuture<Tcp.OutgoingConnection> connection =
    Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);

Avec le code ci-dessus, nous créons un traitement de flux côté serveur et un traitement de flux client à l'extrémité et une transmission de données via TCP. Dans le traitement de flux côté serveur, nous traitons la chaîne reçue et l'envoyons au client. Dans le traitement du flux côté client, nous traitons la chaîne reçue et l'envoyons au serveur.

Résumé :
Cet article présente les concepts de base et l'utilisation d'Akka Streams, et fournit des exemples de code détaillés. Grâce à Akka Streams, nous pouvons facilement mettre en œuvre le traitement des flux et la transmission de données, améliorant ainsi l'efficacité et les performances du traitement des données. J'espère que cet article vous aidera à utiliser Akka Streams pour le traitement des flux et la transmission de données dans le développement Java.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn