Heim >Java >javaLernprogramm >Java-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung
Java-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung
Einführung:
Mit der rasanten Entwicklung von Big Data und Echtzeit-Datenverarbeitung steigt die Nachfrage nach Stream-Verarbeitung und Datenübertragung weiter. In der Java-Entwicklung ist Akka Streams eine leistungsstarke Bibliothek, die die Implementierung der Stream-Verarbeitung und Datenübertragung vereinfacht. In diesem Artikel werden die grundlegenden Konzepte und die Verwendung von Akka Streams vorgestellt und detaillierte Codebeispiele bereitgestellt.
1. Übersicht über Akka Streams:
1.1 Was ist Akka Streams:
Akka Streams ist Teil des Akka-Frameworks und bietet ein asynchrones, zusammensetzbares und überwachbares Stream-Verarbeitungsmodell. Es verwendet einen Gegendruckmechanismus, um inkonsistente Geschwindigkeiten von Datenströmen zu bewältigen. Akka Streams ist hoch skalierbar und flexibel und kann problemlos große Datenströme verarbeiten.
1.2 Grundkonzepte:
2. Verwendung von Akka Streams:
2.1 Einführung von Abhängigkeiten:
Zuerst müssen wir die Abhängigkeiten von Akka Streams in das Java-Projekt einführen. Fügen Sie die folgenden Abhängigkeiten in der pom.xml-Datei hinzu:
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>2.6.17</version> </dependency>
2.2 Implementieren Sie eine einfache Stream-Verarbeitung:
Im Folgenden zeigen wir anhand eines einfachen Beispiels, wie Sie Akka Streams für die Stream-Verarbeitung verwenden.
Erstellen Sie zunächst eine Datenquelle mit Ganzzahlen:
Source<Integer, NotUsed> source = Source.range(1, 10);
Erstellen Sie dann einen Fluss, der die Quelldaten mit 2 multipliziert:
Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);
Als nächstes erstellen Sie eine Senke, um die streamverarbeiteten Daten zu empfangen:
Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
Platzieren Sie die Quelle. Flow und Sink werden kombiniert, um eine vollständige Stream-Verarbeitung zu erstellen:
RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);
Schließlich führen Sie die Stream-Verarbeitung aus:
CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);
Im obigen Code verwenden wir verschiedene von Akka Streams bereitgestellte Komponenten, um eine einfache Stream-Verarbeitung zu implementieren, einschließlich Datenquellen, Flow und Waschbecken. Durch die Verbindung dieser Komponenten können wir einen vollständigen Stream-Verarbeitungsprozess definieren und ausführen.
2.3 Datenübertragung implementieren:
Neben der Stream-Verarbeitung können Akka Streams auch zur Datenübertragung verwendet werden. Im Folgenden nehmen wir die TCP-Übertragung als Beispiel, um zu demonstrieren, wie Akka Streams für die Datenübertragung verwendet wird.
Erstellen Sie zunächst eine serverseitige Stream-Verarbeitung:
final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class) .via(Tcp().delimiter(ByteString.fromString(" "), 256, true)) .map(ByteString::utf8String) .map(s -> s + " processed") .map(ByteString::fromString);
Dann starten Sie den Server:
final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource = Tcp().bind("localhost", 8888); final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create() .mapAsync(1, connection -> { connection.handleWith(serverFlow, materializer); return CompletableFuture.completedFuture(connection); }); final CompletionStage<Tcp.ServerBinding> binding = serverSource.via(handler).to(Sink.ignore()).run(materializer);
Als nächstes erstellen Sie eine clientseitige Stream-Verarbeitung:
final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore(); final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow = Tcp().outgoingConnection("localhost", 8888); final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class) .via(Tcp().delimiter(ByteString.fromString(" "), 256, true)) .map(ByteString::utf8String) .map(s -> s + " processed") .map(ByteString::fromString); final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow = Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right()); CompletableFuture<Tcp.OutgoingConnection> connection = Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);
Mit dem obigen Code erstellen wir eine serverseitige Stream-Verarbeitung und eine Client-Stream-Verarbeitung am Ende und Datenübertragung über TCP. Bei der serverseitigen Stream-Verarbeitung verarbeiten wir den empfangenen String und senden ihn an den Client. Bei der clientseitigen Stream-Verarbeitung verarbeiten wir den empfangenen String und senden ihn an den Server.
Zusammenfassung:
Dieser Artikel stellt die grundlegenden Konzepte und die Verwendung von Akka Streams vor und bietet detaillierte Codebeispiele. Durch Akka Streams können wir Stream-Verarbeitung und Datenübertragung einfach implementieren und so die Effizienz und Leistung der Datenverarbeitung verbessern. Ich hoffe, dieser Artikel hilft Ihnen dabei, Akka Streams für die Stream-Verarbeitung und Datenübertragung in der Java-Entwicklung zu verwenden.
Das obige ist der detaillierte Inhalt vonJava-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!