Rumah  >  Artikel  >  Java  >  Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

WBOY
WBOYasal
2023-09-22 08:30:26944semak imbas

Java开发:如何使用Akka Streams进行流处理和数据传输

Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

Pengenalan:
Dengan data besar dan sebenar- data masa Dengan perkembangan pesat pemprosesan, permintaan untuk pemprosesan aliran dan penghantaran data semakin meningkat. Dalam pembangunan Java, Akka Streams ialah perpustakaan berkuasa yang memudahkan pelaksanaan pemprosesan aliran dan pemindahan data. Artikel ini akan memperkenalkan konsep asas dan penggunaan Akka Streams, dan memberikan contoh kod terperinci.

1. Gambaran keseluruhan Aliran Akka:
1.1 Apakah Aliran Akka:
Aliran Akka ialah sebahagian daripada rangka kerja Akka dan menyediakan model Proses tak segerak, boleh digubah dan boleh dipantau. Ia menggunakan mekanisme tekanan belakang untuk mengendalikan kelajuan aliran data yang tidak konsisten. Akka Streams sangat berskala dan fleksibel serta boleh mengendalikan aliran data berskala besar dengan mudah.

1.2 Konsep asas:

  • Sumber: Sumber aliran data, yang boleh menjadi fail, pangkalan data, sambungan rangkaian, dsb. Sumber boleh mengeluarkan sifar atau lebih elemen data.
  • Aliran: Komponen yang mengendalikan dan mengubah aliran data, seperti penapisan, pemetaan, pengagregatan, dsb. Aliran boleh menerima satu atau lebih elemen data dan mengeluarkan satu atau lebih elemen data.
  • Sink: Titik akhir aliran data, yang boleh menjadi fail, pangkalan data, sambungan rangkaian, dsb. Titik akhir menerima data yang diproses oleh Flow dan memprosesnya.

2. Penggunaan Aliran Akka:
2.1 Memperkenalkan kebergantungan:
Pertama, kita perlu memperkenalkan kebergantungan Aliran Akka ke dalam projek Java. Tambahkan kebergantungan berikut dalam fail pom.xml:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.17</version>
</dependency>

2.2 Laksanakan pemprosesan strim mudah:
Di bawah kami menggunakan contoh mudah untuk menunjukkan cara menggunakan Strim Akka untuk pemprosesan strim.

Pertama, cipta sumber data yang mengandungi integer:

Source<Integer, NotUsed> source = Source.range(1, 10);

Kemudian, buat Aliran yang mendarabkan data sumber dengan 2:

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);
#🎜Teruskan Seterusnya🎜 , cipta Sink untuk menerima data yang diproses strim:

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

Gabungkan Sumber, Aliran dan Sink untuk membina pemprosesan strim lengkap:

RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);

Akhir sekali, jalankan pemprosesan Strim: #🎜 🎜#
CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);

Dalam kod di atas, kami menggunakan komponen berbeza yang disediakan oleh Akka Streams untuk melaksanakan pemprosesan strim mudah, termasuk sumber data, Aliran dan Sink. Dengan menyambungkan komponen ini, kami boleh menentukan dan menjalankan proses pemprosesan strim yang lengkap.

2.3 Laksanakan penghantaran data:

Selain pemprosesan strim, Akka Streams juga boleh digunakan untuk penghantaran data. Di bawah ini kami mengambil penghantaran TCP sebagai contoh untuk menunjukkan cara menggunakan Akka Streams untuk penghantaran data.


Pertama, buat pemprosesan strim sebelah pelayan:

final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

Kemudian, mulakan pelayan:

final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource =
    Tcp().bind("localhost", 8888);

final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create()
    .mapAsync(1, connection -> {
        connection.handleWith(serverFlow, materializer);
        return CompletableFuture.completedFuture(connection);
    });

final CompletionStage<Tcp.ServerBinding> binding =
    serverSource.via(handler).to(Sink.ignore()).run(materializer);

Seterusnya, buat strim sebelah pelanggan pemprosesan:

final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore();

final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow =
    Tcp().outgoingConnection("localhost", 8888);

final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow =
    Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right());

CompletableFuture<Tcp.OutgoingConnection> connection =
    Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);

Melalui kod di atas, kami mencipta pemprosesan strim sebelah pelayan dan pemprosesan strim sebelah klien, dan menghantar data melalui TCP. Dalam pemprosesan strim sebelah pelayan, kami memproses rentetan yang diterima dan menghantarnya kepada pelanggan. Dalam pemprosesan strim sebelah pelanggan, kami memproses rentetan yang diterima dan menghantarnya ke pelayan.

Ringkasan:

Artikel ini memperkenalkan konsep asas dan penggunaan Akka Streams, dan menyediakan contoh kod terperinci. Melalui Akka Streams, kami boleh melaksanakan pemprosesan strim dan penghantaran data dengan mudah, meningkatkan kecekapan dan prestasi pemprosesan data. Saya harap artikel ini akan membantu anda menggunakan Akka Streams untuk pemprosesan strim dan penghantaran data dalam pembangunan Java.

Atas ialah kandungan terperinci Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn