為了確保事件在 Apache Kafka 中以完全一致的順序發送和消費,必須了解訊息分區和消費者分配的工作原理。
在 Kafka 中使用分區
-
主題分區:
- Kafka 將訊息組織到主題內的 分區 中。每個分區都維護它接收到的訊息的順序,這表示訊息按照發送到該分區的順序進行處理。
- 為了確保順序,與相同上下文(例如使用者 ID 或交易 ID)相關的所有訊息都傳送到同一分區至關重要。這是透過在發送訊息時使用分區鍵來實現的。 Kafka 使用此鍵來確定使用雜湊函數將訊息傳送到哪個分區[1][5]。
-
訊息鍵:
- 發送訊息時,可以指定鍵。具有相同密鑰的所有訊息將被發送到同一分區,這確保它們按照生成的順序被消費。例如,如果使用使用者 ID 作為鍵,則與該使用者相關的所有事件都將進入同一個分割區。
消費者群體
-
消費者分配:
- Kafka 中的消費者分為 消費者群。每個組可以有多個消費者,但每個分區一次只能被組內的一個消費者讀取。
- 這表示如果您的消費者多於分區,則有些消費者將處於不活動狀態。為了維持秩序並最大化效率,建議至少有與組中消費者一樣多的分區。
-
偏移量管理:
- Kafka 使用 偏移量 儲存每個消費者的讀取狀態,偏移量是分區中每個訊息的增量數字識別碼。這使得消費者可以在出現故障時從上次中斷的地方繼續。
附加策略
- 避免過載:選擇分割區鍵時,重要的是要考慮流量分配,以避免某些分割區過載而其他分割區未充分利用。
- 複製和容錯:確保為分區配置足夠的複製(大於1),這不僅提高可用性,而且提高系統對故障的恢復能力。
使用Avro在Kafka中實現訊息生產和消費系統,確保訊息按順序處理並處理可能的故障,這裡有一個完整的範例。這包括 Avro 架構的定義、生產者和消費者程式碼以及處理錯誤的策略。
Avro 方案
首先,我們為有效負載定義 Avro 架構。我們將建立一個名為 user_signed_up.avsc 的文件,用於描述訊息的結構。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
金鑰產生
為了確保訊息生產和消費的順序,我們將使用結構為 message-type-date 的 key,例如:user-signed-up-2024-11-04。
製片卡夫卡
以下是使用 Avro 方案向 Kafka 發送訊息的 生產者 的程式碼:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
重試注意事項
**要注意的是,啟用重試時,如果處理不當,可能會有訊息重新排序的風險。
為了避免這種情況:
**max.in.flight.requests.per.connection:您可以將此屬性設為 1,以確保一次發送一條訊息並按順序處理。但是,這可能會影響效能。
透過此配置和正確的錯誤處理,您可以確保您的 Kafka 生產者更加健壯,能夠處理訊息生產中的故障,同時保持必要的順序。
**卡夫卡消費者
**讀取並處理訊息的消費者:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
按鍵的現實範例
在具有許多不同事件和許多不同分區的環境中,實際的鍵可能類似於:
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
這允許將特定日期與特定類型相關的所有事件傳送到同一分割區並按順序處理。此外,如有必要,您可以透過包含更多詳細資訊(例如會話或交易 ID)來使金鑰多樣化。
透過這種使用 Avro 處理故障並確保 Kafka 中訊息順序的實作和策略,您可以建立一個健全且高效的事件管理系統。
現在是個能力更強的 Kafka 生產者和消費者。
具有斷路器、局部持久性和 DLQ 的 Kafka Producer。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<string byte> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<string byte> producerRecord = new ProducerRecord(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<string byte> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<genericrecord> datumWriter = new GenericDatumWriter(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } } </genericrecord></string></string></string></string>
具有 DLQ 管理功能的 Kafka 消費者。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<string byte> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<string byte> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string byte> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<genericrecord> reader = new GenericDatumReader(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<string byte> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<string byte> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } } </string></string></genericrecord></string></string></string>
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
代碼說明
斷路器:
Resilience4j 用於管理生產者的斷路器電路。配置失敗率閾值和開啟狀態下的等待時間。
局部持久化與DLQ:
失敗的訊息會儲存到本機檔案 (failed_messages.log) 和錯誤佇列 (dead_letter_queue.log)。
錯誤處理:
在生產者和消費者中,錯誤被適當處理並記錄。
DLQ 處理:
消費者在消費完主主題的訊息後,也會處理儲存在 DLQ 中的訊息。
記錄:
System.err 和 System.out 訊息用於記錄錯誤和重要事件。
最終考慮因素
透過此實現:
它確保訊息以彈性方式發送和處理。
針對臨時或持續錯誤提供適當的處理。
邏輯允許使用死信隊列進行有效恢復。
斷路器電路有助於防止系統在長時間故障時變得飽和。
這種方法創建了一個強大的系統,可以處理物理和邏輯問題,同時保持 Kafka 中訊息的有序傳遞。
以上是如何取得和消費使用 Kafka 傳遞和訂購的訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!

JVM通過JavaNativeInterface(JNI)和Java標準庫處理操作系統API差異:1.JNI允許Java代碼調用本地代碼,直接與操作系統API交互。 2.Java標準庫提供統一API,內部映射到不同操作系統API,確保代碼跨平台運行。

modularitydoesnotdirectlyaffectJava'splatformindependence.Java'splatformindependenceismaintainedbytheJVM,butmodularityinfluencesapplicationstructureandmanagement,indirectlyimpactingplatformindependence.1)Deploymentanddistributionbecomemoreefficientwi

BytecodeinJavaistheintermediaterepresentationthatenablesplatformindependence.1)Javacodeiscompiledintobytecodestoredin.classfiles.2)TheJVMinterpretsorcompilesthisbytecodeintomachinecodeatruntime,allowingthesamebytecodetorunonanydevicewithaJVM,thusfulf

javaachievesplatformIndependencEthroughThoJavavIrtualMachine(JVM),wodecutesbytecodeonyanydenanydevicewithajvm.1)javacodeiscompiledintobytecode.2)

JavaGUI開發中的平台獨立性面臨挑戰,但可以通過使用Swing、JavaFX,統一外觀,性能優化,第三方庫和跨平台測試來應對。 JavaGUI開發依賴於AWT和Swing,Swing旨在提供跨平台一致性,但實際效果因操作系統不同而異。解決方案包括:1)使用Swing和JavaFX作為GUI工具包;2)通過UIManager.setLookAndFeel()統一外觀;3)優化性能以適應不同平台;4)使用如ApachePivot或SWT的第三方庫;5)進行跨平台測試以確保一致性。

JavadevelovermentIrelyPlatForm-DeTueTososeVeralFactors.1)JVMVariationsAffectPerformanceNandBehaviorAcroSsdifferentos.2)Nativelibrariesviajnijniiniininiinniinindrododerplatefform.3)

Java代碼在不同平台上運行時會有性能差異。 1)JVM的實現和優化策略不同,如OracleJDK和OpenJDK。 2)操作系統的特性,如內存管理和線程調度,也會影響性能。 3)可以通過選擇合適的JVM、調整JVM參數和代碼優化來提升性能。

Java'splatFormentenceHaslimitations不包括PerformanceOverhead,versionCompatibilityIsissues,挑戰WithnativelibraryIntegration,Platform-SpecificFeatures,andjvminstallation/jvminstallation/jvmintenance/jeartenance.therefactorscomplicatorscomplicatethe“ writeOnce”


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

SublimeText3 Linux新版
SublimeText3 Linux最新版

SAP NetWeaver Server Adapter for Eclipse
將Eclipse與SAP NetWeaver應用伺服器整合。

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器

ZendStudio 13.5.1 Mac
強大的PHP整合開發環境

SublimeText3漢化版
中文版,非常好用