Rumah >Java >javaTutorial >Pemahaman mendalam tentang mekanisme pelaksanaan asas baris gilir mesej Kafka
. Kafka pada asalnya dibangunkan oleh LinkedIn dan kini merupakan projek peringkat tertinggi Yayasan Perisian Apache.
SenibinaKafka ialah sistem teragih yang terdiri daripada berbilang pelayan. Setiap pelayan dipanggil nod, dan setiap nod adalah proses bebas. Nod disambungkan melalui rangkaian untuk membentuk kelompok.
Data dalam gugusan Kafka disimpan dalam partition, dan setiap partition ialah fail log tidak boleh ubah yang teratur. Partition ialah unit asas storan data Kafka dan unit asas Kafka untuk replikasi dan failover data. Data dalam kelompok Kafka diakses oleh pengeluar dan pengguna. Pengeluar menulis data ke gugusan Kafka, dan pengguna membaca data daripada gugusan Kafka.
Storan DataData dalam Kafka disimpan dalam partition, dan setiap partition ialah fail log yang teratur dan tidak boleh diubah. Partition ialah unit asas storan data Kafka dan unit asas Kafka untuk replikasi dan failover data.
Setiap partition mempunyai ID unik dan terdiri daripada nod pendahulu dan berbilang nod replika. Nod pemimpin bertanggungjawab untuk menulis data ke partition, dan nod replika bertanggungjawab untuk menyalin data daripada nod ketua.
Apabila pengeluar menulis data ke gugusan Kafka, data akan ditulis ke nod ketua. Nod pemimpin akan mereplikasi data ke nod replika. Apabila pengguna membaca data daripada gugusan Kafka, data itu dibaca daripada nod replika.
Replikasi dataReplikasi data dalam Kafka dicapai melalui mekanisme penyalinan. Setiap partition mempunyai nod ketua dan berbilang nod replika. Nod pemimpin bertanggungjawab untuk menulis data ke partition, dan nod replika bertanggungjawab untuk menyalin data daripada nod ketua.
Apabila nod ketua gagal, salah satu nod replika menjadi nod ketua baharu. Nod pemimpin baharu akan terus menulis data ke partition dan menyalin data daripada nod replika lain.
Mekanisme replikasi data dalam Kafka boleh memastikan kebolehpercayaan dan ketersediaan data. Walaupun nod pemimpin gagal, data tidak hilang dan pengguna masih boleh membaca data daripada gugusan Kafka.
FailoverFailover dalam Kafka dilaksanakan melalui mekanisme replika. Apabila nod pemimpin gagal, salah satu nod replika menjadi nod pemimpin baharu. Nod pemimpin baharu akan terus menulis data ke partition dan menyalin data daripada nod replika lain.
Mekanisme failover dalam Kafka memastikan kebolehpercayaan dan ketersediaan data. Walaupun nod pemimpin gagal, data tidak hilang dan pengguna masih boleh membaca data daripada gugusan Kafka.
PengeluarPengeluar ialah pelanggan yang menulis data ke gugusan Kafka. Pengeluar boleh menjadi mana-mana pelanggan yang boleh menghantar permintaan HTTP, seperti aplikasi Java, aplikasi Python atau aplikasi C++. Apabila pengeluar menulis data ke gugusan Kafka, ia perlu menentukan partition yang hendak ditulis. Pengeluar boleh memilih untuk menulis data ke partition tertentu atau menulis data ke partition rawak.
Pengeluar juga boleh menentukan kunci mesej dan nilai mesej data. Kekunci mesej digunakan untuk mengenal pasti secara unik mesej, dan nilai mesej ialah kandungan sebenar mesej itu.
PenggunaPengguna ialah pelanggan yang membaca data daripada gugusan Kafka. Pengguna boleh menjadi mana-mana pelanggan yang boleh menerima permintaan HTTP, seperti aplikasi Java, aplikasi Python atau aplikasi C++.
Apabila pengguna membaca data daripada gugusan Kafka, mereka perlu menentukan partition untuk dibaca. Pengguna boleh memilih untuk membaca data daripada partition tertentu atau daripada semua partition.
Pengguna juga boleh menentukan offset untuk dibaca. Offset digunakan untuk mengenal pasti secara unik mesej dalam partition. Pengguna boleh memilih untuk mula membaca data daripada offset tertentu atau mula membaca data daripada offset terkini.
Senario aplikasiKafka boleh digunakan dalam pelbagai senario aplikasi, seperti:
Pengumpulan log: Kafka boleh digunakan untuk mengumpul dan menyimpan data log daripada sistem yang berbeza. Analisis data: Kafka boleh digunakan untuk mengumpul dan menyimpan data daripada sistem yang berbeza, dan kemudian menganalisis data. Pemprosesan strim: Kafka boleh digunakan untuk memproses aliran data daripada sistem yang berbeza.Seni bina dipacu acara: Kafka boleh digunakan untuk melaksanakan seni bina dipacu acara.
Contoh Kodimport org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Create a Kafka producer Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // Create a Kafka record ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "hello, world"); // Send the record to Kafka producer.send(record); // Close the producer producer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Create a Kafka consumer Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to a topic consumer.subscribe(Collections.singletonList("my-topic")); // Poll for new records while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ": " + record.value()); } } // Close the consumer consumer.close(); } }
Atas ialah kandungan terperinci Pemahaman mendalam tentang mekanisme pelaksanaan asas baris gilir mesej Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!