Rumah >Java >javaTutorial >Bina aplikasi pemprosesan strim menggunakan Spring Boot dan Apache Kafka Streams
Dengan kemunculan era data besar, semakin banyak perusahaan mula memberi perhatian kepada teknologi pemprosesan aliran untuk memenuhi keperluan pemprosesan dan analisis data masa nyata. Apache Kafka ialah sistem baris gilir mesej teragih berkemampuan tinggi dan berskala yang telah menjadi standard de facto dalam bidang pemprosesan strim. Spring Boot ialah alat untuk membangunkan aplikasi Spring dengan cepat, yang boleh membantu kami membina aplikasi pemprosesan strim dengan lebih pantas dan lebih mudah. Artikel ini akan memperkenalkan cara membina aplikasi pemprosesan strim menggunakan Spring Boot dan Apache Kafka Streams, dan membincangkan kelebihan dan kekurangan kedua-dua alatan ini dan cara mengoptimumkan prestasi aplikasi.
Sebelum kita mula membina aplikasi, kita perlu mencipta topik Kafka dahulu. Dalam artikel ini, kami akan mencipta topik yang dipanggil "klik pengguna" yang akan menyimpan peristiwa klik pengguna di tapak web.
Laksanakan arahan berikut dalam baris arahan:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-clicks
Ini akan mencipta topik bernama "klik pengguna" pada pelayan Kafka dengan hanya satu partition dan salinan setempat .
Seterusnya, kami akan mencipta aplikasi asas menggunakan Spring Boot. Dalam Spring Boot, kita boleh menggunakan Spring Initializr untuk mencipta aplikasi asas dengan cepat. Semasa membuat aplikasi, pastikan anda memilih kebergantungan berikut:
Selepas mencipta aplikasi, kami akan Menambah pergantungan berikut:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version> </dependency>
Ini akan memberikan kami API pemprosesan aliran Kafka.
Sekarang kita boleh mula menulis kod pemprosesan strim Kafka. Semasa membuat aplikasi, kami menentukan kelas pengawal yang dipanggil "Pengontrol Pengguna". Sekarang kami akan menambah pengendali permintaan POST bernama "klik" dalam kelas pengawal. Pengendali ini akan mendapatkan peristiwa klik pengguna daripada permintaan POST dan menghantarnya ke topik Kafka bernama "klik pengguna". Kodnya adalah seperti berikut:
@RestController public class UserController { private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public UserController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @PostMapping("/clicks") public void clicks(@RequestBody String click) { kafkaTemplate.send("user-clicks", click); } }
Dalam kod di atas, kami menggunakan fungsi suntikan kebergantungan Spring untuk menyuntik objek KafkaTemplate bernama "kafkaTemplate". Objek ini boleh digunakan untuk menghantar mesej kepada topik Kafka.
Seterusnya, kami akan mencipta Topologi Penstriman Kafka untuk mengendalikan acara klik yang diterima daripada topik "klik pengguna". Dalam contoh kami, kami akan menggunakan API Aliran Kafka untuk melaksanakan topologi pemprosesan aliran.
Dalam aplikasi Spring Boot, kami akan mencipta kelas yang dipanggil "UserClicksStream" yang akan menggunakan API Kafka Streams untuk mengendalikan acara klik. Kodnya adalah seperti berikut:
@Configuration @EnableKafkaStreams public class UserClicksStream { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KStream<String, String> kStream(StreamsBuilder builder) { KStream<String, String> stream = builder.stream("user-clicks"); stream.foreach((key, value) -> { System.out.println("Received: " + value); }); return stream; } @Bean public KafkaStreams kafkaStreams(StreamsBuilder builder) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return new KafkaStreams(builder.build(), props); } }
Dalam kod di atas, kami menggunakan fungsi suntikan kebergantungan Spring untuk menyuntik objek StreamsBuilder bernama "StreamsBuilder". Objek ini digunakan untuk mencipta topologi pemprosesan aliran Kafka.
Dalam kaedah kStream, kami mencipta objek KStream daripada topik "klik pengguna" dan mencetak acara yang diterima menggunakan kaedah foreach. froeach ialah operasi terminal yang akan kami gunakan dalam langkah seterusnya.
Dalam kaedah kafkaStreams, kami mencipta aplikasi bernama "user-clicks-stream" dan menentukan alamat pelayan Kafka. Aplikasi ini secara automatik akan melaksanakan operasi pemprosesan strim yang kami takrifkan dalam topologi sebelumnya.
Sekarang kami telah menulis semua kod untuk aplikasi itu. Sebelum menjalankan aplikasi, kita perlu memulakan pelayan Kafka.
Laksanakan arahan berikut dalam baris arahan:
bin/kafka-server-start.sh config/server.properties
Ini akan memulakan pelayan Kafka. Sekarang kita boleh mulakan permohonan kita.
Laksanakan arahan berikut dalam baris arahan:
mvn spring-boot:run
Ini akan melancarkan aplikasi kami. Kini kami boleh menghantar permintaan POST kepada aplikasi menggunakan mana-mana klien HTTP seperti cURL atau Postman. Setiap permintaan akan menjana acara klik dan mencetaknya dalam konsol.
Jika kami ingin melaksanakan lebih banyak operasi dalam topologi (seperti pengagregatan, pengiraan tetingkap, dll.), kami boleh menggunakan operasi lain yang disediakan oleh API Aliran Kafka untuk membina topologi.
Membina aplikasi pemprosesan strim menggunakan Spring Boot dan Apache Kafka Streams ialah cara yang pantas dan mudah untuk membantu kami memproses data masa nyata dengan lebih mudah. Walau bagaimanapun, kita perlu memberi perhatian kepada beberapa isu prestasi pengoptimuman, seperti reka bentuk topologi, saiz penimbal, masa pemprosesan aliran, dsb. Dengan memahami isu ini, kami boleh membina aplikasi pemprosesan strim yang cekap dengan lebih baik.
Atas ialah kandungan terperinci Bina aplikasi pemprosesan strim menggunakan Spring Boot dan Apache Kafka Streams. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!