Home >Web Front-end >JS Tutorial >Kafka fundamentals with a practical example
Over the past few weeks, I’ve been diving into Kafka and taking notes along the way, which I decided to organize and structure them a blog post, on it, apart from concepts and tips there is a practical example built with NestJS and KafkaJs.
Apache Kafka is a distributed event-streaming platform designed to handle real-time events. It enables storing, processing, and retrieving large-scale, high-throughput, low-latency data streams, making it suitable for building real-time data pipelines and event-driven applications.
These trade-offs are intentional design choices to maximize Kafka's performance but may pose challenges for use cases requiring greater flexibility:
Kafka integrates the features of both queuing and publish-subscribe messaging models, offering consumers the advantages of each approach.
Kafka employs a partitioned log system to combine the benefits of queuing and publish-subscribe models. Logs, which are ordered sequences of records, are divided into partitions, with each partition assigned to different subscribers (consumers). This setup enables multiple subscribers to share a topic while maintaining scalability.
We have seen that Kafka is a platform designed to handle real-time events, before talking about how those events are handled we need to have a definition for them:
An event is an action, incident, or change that's recorded applications, for example, a payment, a website click, or a temperature reading.
Events in Kafka are modeled as key/value pairs, where both keys and values are serialized into byte sequences.
Kafka organizes events into ordered logs called topics. When an external system writes an event to Kafka, it is appended to the end of a topic. Messages remain in topics for a configurable duration, even after being read. Unlike queues, topics are durable, replicated, and fault-tolerant, efficiently storing event records. However, logs can only be scanned sequentially, not queried.
Topics are stored as log files on disk, however, disks have limitations such as finite size and I/O. To overcome this, Kafka allows topics to be divided into partitions, breaking a single log into multiple logs that can be distributed across different servers. This partitioning enables Kafka to scale horizontally, enhancing its capacity to handle large volumes of events and high throughput.
Kafka assigns messages to partitions based on whether they have a key:
Kafka operates as a distributed data infrastructure using nodes called brokers, which collectively form a Kafka cluster. Brokers can run on bare metal hardware, a cloud instance, in a container managed by Kubernetes, in Docker on your laptop, or wherever JVM processes can run.
Brokers focus on:
They do not perform message computation or topic-to-topic routing, keeping their design simple and efficient.
Kafka ensures data durability and fault tolerance by replicating partition data across multiple brokers. The primary copy of a partition is the leader replica, while additional copies are follower replicas. Data is written to the leader, which automatically replicates updates to the followers.
This replication process ensures:
Developers benefit from these guarantees without needing to manage replication directly, as Kafka handles it transparently.
A Kafka producer is a client application that sends (publishes) data to Kafka topics. It’s responsible for creating and transmitting messages (records) to the Kafka cluster. Producers determine the topic and partition where messages will be stored based on their configuration and the presence of a message key. Producers are responsible for, but not limited to:
Compression:
Producers can compress messages to reduce network bandwidth and storage use.
A Kafka consumer is a client application that reads messages from Kafka topics, it retrieves messages from Kafka partitions at their own pace, allowing for real-time or batch processing of data. Notice that Kafka does not push messages to consumers, they pull messages from Kafka partitions by requesting the data.
Consumers also keep track of the offsets they have processed. Offsets can be committed automatically or manually, ensuring data is not lost if a consumer fails. This allows for flexible consumption, including replaying messages by resetting the offset.
A consumer group is a set of consumers that cooperate to consume data from some topics, which allows for distributed processing of a topic's messages.
Partitions of a topic are divided among the consumers in the group, ensuring each message is processed by only one consumer within the group. Multiple consumer groups can independently consume the same topic without interference.
When a new consumer joins a group or an existing consumer fails, Kafka reassigns partitions among the consumers in the group to ensure all partitions are covered.
Serialization and deserialization in Kafka are about converting data between its original format and a byte array for transmission and storage, allowing producers and consumers to communicate efficiently.
Is the process of converting an object or data structure into a byte stream so it can be transmitted or stored. Before a producer sends data to a Kafka topic, it serializes the data (key and value) into byte arrays.
Common Serialization Formats:
Is the reverse process, where a byte stream is converted back into its original object or data structure. When a consumer reads data from a Kafka topic, it deserializes the byte array back into a usable format for processing.
Compression is reducing the size of messages before they are stored or transmitted. It optimizes storage usage, reduces network bandwidth consumption, and improves overall performance by sending smaller payloads between producers, brokers, and consumers.
When a producer sends messages to a Kafka topic, it can compress the message before the transmission. The compressed message is stored on brokers as-is and decompressed by consumers when they read the messages.
While compression saves resources, it's essential to balance the trade-off between CPU usage and compression benefits, choosing the codec that suits your use case.
Optimizing Apache Kafka's performance involves fine-tuning various components to balance throughput and latency effectively. This article only scratches the surface of this subject, here are some aspects to consider when tuning Kafka:
Partition Management:
Producer Configuration:
Consumer Configuration:
Imagine an application that records the temperature in a room and transmits this data using Kafka, where another application processes it. For simplicity, we'll focus exclusively on the Kafka aspect, with both the producer and consumer implemented within the same application. In this scenario, each recorded temperature at a specific moment represents an event:
{ temperature: 42, timeStamp: new Date(), };
All the code will be in this repository.
First, we need a Kafka broker, but instead of installing Kafka in our machine let’s just this Docker Kafka image.
Start by pulling that image:
docker pull apache/kafka
Then run it mapping the port that Kafka listens on the same port on our machine:
docker run -d -p 9092:9092 --name broker apache/kafka:latest
That’s it, we have a Kafka broker running, before continuing you might want to play around with it by creating topics, sending and consume messages, to do that just follow the instructions on that image page.
To build our application we’re going to use NestJS with KafkaJS, start by creating the app with Nest CLI
nest new my-nest-project
Inside the project folder install kafkajs
npm i kafkajs
And generate the following modules
nest g mo kafka
nest g mo producer
nest g mo consumer
nest g mo temperature
The Kafka module will handle all Kafka-specific operations, including managing consumer and producer classes for connecting, disconnecting, sending, and receiving messages. This will be the only module directly interacting with the kafkajs package.
The Producer and Consumer modules will act as interfaces between the pub-sub platform (Kafka, in this case) and the rest of the application, abstracting platform-specific details.
The Temperature module will manage the events. It doesn't need to know which pub-sub platform is being used, it only requires a consumer and a producer to function.
With the modules created, let’s also create a folder src/interface and add the following interfaces in it:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
In src/kafka/ folder add the producer and consumer classes that implement those interfaces:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
Don’t forget to export these classes in kafka.module.ts
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
As it is now we could go to the temperature module and instantiate those Kafka classes and start using them. However, it would be better if the temperature module didn’t have to worry about which pub-sub platform it was using. Instead, it should simply work with an injected producer and/or consumer, focusing solely on sending and receiving messages, regardless of the underlying platform. This way, if we decide to switch to a different pub-sub platform in the future, we won’t need to make any changes to the temperature module.
To achieve this abstraction, we can create Producer and Consumer classes that handle the specifics of Kafka’s Producer and Consumer implementations. Let’s start with the Producer:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
Now, the Consumer:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
Now, we can focus on building the temperature module. In the temperature.service.ts file, we’ll create a method to register a temperature, which in this example will simply send the temperature data to the broker using a producer. Additionally, we’ll implement a method to handle incoming messages for demonstration purposes.
These methods can be invoked by another service or a controller. However, for simplicity, in this example, we’ll call them directly when the application starts, utilizing the onModuleInit method.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
That's it! With the broker running in the Docker container, you can start the application to send and receive messages. Additionally, you can open a shell inside the broker container using the following command:
docker exec --workdir /opt/kafka/bin/ -it broker sh
From there, you can interact with the broker directly and send messages to the application, receive messages from it, create new topics, etc.
This is the repository with the code of this example.
The above is the detailed content of Kafka fundamentals with a practical example. For more information, please follow other related articles on the PHP Chinese website!