Apache Kafka 是一個分散式事件流平台,旨在處理即時事件。它能夠儲存、處理和檢索大規模、高吞吐量、低延遲的資料流,使其適合建立即時資料管道和事件驅動的應用程式。
這些權衡是有意的設計選擇,旨在最大限度地提高 Kafka 的性能,但可能會給需要更大靈活性的用例帶來挑戰:
Kafka 整合了隊列和發布-訂閱訊息傳遞模型的功能,為消費者提供每種方法的優勢。
Kafka 採用分區日誌系統來結合佇列和發布-訂閱模型的優點。日誌是有序的記錄序列,被分成多個分區,每個分區分配給不同的訂閱者(消費者)。此設定使多個訂閱者能夠共享一個主題,同時保持可擴展性。
我們已經看到 Kafka 是一個旨在處理即時事件的平台,在討論如何處理這些事件之前,我們需要對它們進行定義:
Kafka 中的事件被建模為鍵/值對,其中鍵和值都被序列化為位元組序列。
Kafka 將事件組織成有序日誌,稱為主題。當外部系統將事件寫入 Kafka 時,它會被附加到主題的末端。即使在閱讀後,訊息也會在主題中保留可配置的持續時間。與佇列不同,主題具有持久性、可複製性和容錯性,可以有效地儲存事件記錄。但日誌只能順序掃描,不能查詢。
主題作為日誌檔案儲存在磁碟上,但是磁碟具有有限的大小和 I/O 等限制。為了克服這個問題,Kafka 允許主題分為分區,將單一日誌分解為多個可以分佈在不同伺服器上的日誌。這種分區使 Kafka 能夠水平擴展,增強其處理大量事件和高吞吐量的能力。
Kafka 根據分割區是否有 key:
將訊息分配給分割區Kafka 使用名為 brokers 的節點作為分散式資料基礎設施運行,這些節點共同形成 Kafka 叢集。代理程式可以在裸機硬體、雲端執行個體、Kubernetes 管理的容器中、筆記型電腦上的 Docker 或任何可以執行 JVM 程序的地方運作。
Kafka 透過跨多個代理複製分區資料來確保資料的持久性和容錯性。分區的主要副本是領導副本,而其他副本是跟隨副本。資料被寫入領導者,領導者自動將更新複製到追隨者。
開發人員可以從這些保證中受益,而無需直接管理複製,因為 Kafka 會透明地處理它。
Kafka 生產者 是一個客戶端應用程序,它將資料發送(發布)到 Kafka 主題。它負責建立訊息(記錄)並將其傳送到 Kafka 叢集。生產者根據其配置和訊息金鑰的存在來決定儲存訊息的主題和分區。生產者負責但不限於:
Kafka 消費者 是一個客戶端應用程序,它從Kafka 主題讀取訊息, 它按照自己的節奏從Kafka 分區檢索訊息,允許即時或批量處理數據。請注意,Kafka 不會將訊息推送給消費者,而是透過請求資料從 Kafka 分區中拉取訊息。
當新的消費者加入群組或現有消費者失敗時,Kafka 會在群組中的消費者之間重新分配分區,以確保覆蓋所有分區。
是將物件或資料結構轉換為位元組流以便傳輸或儲存的過程。在生產者將資料傳送到 Kafka 主題之前,它將資料(鍵和值)序列化為位元組數組。
是相反的過程,其中位元組流被轉換回其原始物件或資料結構。當消費者從 Kafka 主題讀取資料時,它將位元組數組反序列化回可用的格式進行處理。
當生產者向 Kafka 主題發送訊息時,它可以在傳輸之前對訊息進行壓縮。壓縮的訊息原樣儲存在代理程式上,並由消費者在讀取訊息時解壓縮。
雖然壓縮可以節省資源,但必須平衡 CPU 使用率和壓縮優勢之間的權衡,選擇適合您的用例的編解碼器。
最佳化 Apache Kafka 的效能涉及微調各個元件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調優 Kafka 時需要考慮的一些方面:
想像一個應用程式記錄房間內的溫度並使用 Kafka 傳輸該數據,然後另一個應用程式處理該數據。為簡單起見,我們將僅關注 Kafka 方面,生產者和消費者都在同一應用程式中實現。在這種情況下,特定時刻記錄的每個溫度都代表一個事件:
首先,我們需要一個 Kafka 代理,但我們不需要在我們的機器中安裝 Kafka,只需使用這個 Docker Kafka 映像即可。
docker 拉 apache/kafka
然後運行它,映射 Kafka 在我們機器上的同一連接埠上偵聽的連接埠:
docker run -d -p 9092:9092 --name Broker apache/kafka:latest
就是這樣,我們有一個正在運行的 Kafka 代理,在繼續之前,您可能想通過創建主題、發送和使用訊息來嘗試它,只需按照該圖像頁面上的說明進行操作即可。
為了建立我們的應用程序,我們將結合 NestJS 和 KafkaJS,首先使用 Nest CLI 建立應用程式
npm 我卡夫卡
nest g mo 製作人
巢 g mo 消費者
Kafka 模組 將處理所有 Kafka 特定的操作,包括管理用於連接、斷開連接、發送和接收訊息的消費者和生產者類別。這將是唯一直接與 kafkajs 套件互動的模組。
生產者和消費者模組將充當發布-訂閱平台(在本例中為 Kafka)與應用程式其餘部分之間的接口,抽象平台特定的詳細資訊。
建立模組後,我們也會建立一個資料夾 src/interface 並在其中新增以下介面:
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
在 src/kafka/ 資料夾中新增實作這些介面的生產者和消費者類別:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
不要忘記在 kafka.module.ts 中匯出這些類別
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
現在我們可以轉到溫度模組並實例化這些 Kafka 類別並開始使用它們。然而,如果溫度模組不必擔心它正在使用哪個 pub-sub 平台,那就更好了。相反,它應該簡單地與注入的生產者和/或消費者一起工作,只專注於發送和接收訊息,而不管底層平台如何。這樣,如果我們決定將來切換到不同的 pub-sub 平台,我們不需要對溫度模組進行任何更改。
為了實現這種抽象,我們可以創建 Producer 和 Consumer 類別來處理 Kafka Producer 和 Consumer 實作的細節。讓我們從製作人開始:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
現在,我們可以專注於建立溫度模組。在Temperature.service.ts 檔案中,我們將建立一個方法來註冊溫度,在本例中,該方法將簡單地使用生產者將溫度資料傳送到代理程式。此外,我們將實作一種方法來處理傳入訊息以用於演示目的。
這些方法可以由另一個服務或控制器呼叫。但是,為了簡單起見,在本範例中,我們將在應用程式啟動時利用 onModuleInit 方法直接呼叫它們。
就是這樣!透過在 Docker 容器中執行代理,您可以啟動應用程式來傳送和接收訊息。此外,您可以使用以下命令在代理容器內開啟 shell:
docker exec --workdir /opt/kafka/bin/ -it Broker sh
