To ensure that events are sent and consumed in a completely consistent order in Apache Kafka, it is essential to understand how message partitioning and consumer assignment works.
Topic Partitioning:
Message Keys:
Consumer Assignment:
Offset Management:
To implement a message production and consumption system in Kafka using Avro, ensuring that messages are processed in order and handling possible failures, here is a complete example. This includes the definition of the Avro schema, the producer and consumer code, as well as strategies for handling errors.
Avro scheme
First, we define the Avro schema for our payload. We will create a file called user_signed_up.avsc that describes the structure of the message.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
Key Generation
To ensure order in the production and consumption of messages, we will use a key structured as message-type-date, for example: user-signed-up-2024-11-04.
Producer Kafka
Here is the code for the producer that sends messages to Kafka using the Avro scheme:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
Retry Considerations
**It is important to note that when enabling retries, there may be a risk of message reordering if not handled properly.
To avoid this:
**max.in.flight.requests.per.connection: You can set this property to 1 to ensure that messages are sent one at a time and processed in order. However, this may affect performance.
With this configuration and proper error handling, you can ensure that your Kafka producer is more robust and capable of handling failures in message production while maintaining the necessary order.
**Kafka Consumer
**The consumer who reads and processes the messages:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
Realistic Example of Keys
In an environment with many different events and many different partitions, a realistic key might be something like:
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
This allows all events related to a specific type on a specific date to be sent to the same partition and processed in order. Additionally, you can diversify the keys by including more details if necessary (such as a session or transaction ID).
With this implementation and strategies for handling failures and ensuring message order in Kafka using Avro, you can build a robust and efficient system for managing events.
Now a somewhat more capable Kafka Producer and consumer.
Kafka Producer with Circuit Breaker, Local Persistence and DLQ.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
Kafka Consumer with DLQ Management.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
Code Explanation
Circuit Breaker:
Resilience4j is used to manage the producer's breaker circuit. A failure rate threshold and a waiting time in the open state are configured.
Local Persistence and DLQ:
Failed messages are saved to both a local file (failed_messages.log) and an error queue (dead_letter_queue.log).
Error Handling:
In the producer and consumer, errors are handled appropriately and logged.
DLQ processing:
The consumer also processes messages stored in the DLQ after consuming messages from the main topic.
Logging:
System.err and System.out messages are used to log errors and important events.
Final Considerations
With this implementation:
It ensures that messages are sent and processed in a resilient manner.
Proper handling is provided for temporary or persistent errors.
Logic allows for effective recovery by using a Dead Letter Queue.
The breaker circuit helps prevent the system from becoming saturated in the event of prolonged failures.
This approach creates a robust system that can handle physical and logical problems while maintaining orderly delivery of messages in Kafka.
The above is the detailed content of How to get and consume messages delivered and ordered with Kafka. For more information, please follow other related articles on the PHP Chinese website!