在過去的幾周里,我一直在深入研究Kafka 並一路做筆記,我決定組織和構建一篇博客文章,在上面,除了概念和技巧之外,還有一個使用構建的實際示例NestJS和KafkaJs。
Apache Kafka 是一個分散式事件流平台,旨在處理即時事件。它能夠儲存、處理和檢索大規模、高吞吐量、低延遲的資料流,使其適合建立即時資料管道和事件驅動的應用程式。
這些權衡是有意的設計選擇,旨在最大限度地提高 Kafka 的性能,但可能會給需要更大靈活性的用例帶來挑戰:
Kafka 整合了隊列和發布-訂閱訊息傳遞模型的功能,為消費者提供每種方法的優勢。
Kafka 採用分區日誌系統來結合佇列和發布-訂閱模型的優點。日誌是有序的記錄序列,被分成多個分區,每個分區分配給不同的訂閱者(消費者)。此設定使多個訂閱者能夠共享一個主題,同時保持可擴展性。
我們已經看到 Kafka 是一個旨在處理即時事件的平台,在討論如何處理這些事件之前,我們需要對它們進行定義:
事件是記錄應用程式的操作、事件或更改,例如付款、網站點擊或溫度讀數。
Kafka 中的事件被建模為鍵/值對,其中鍵和值都被序列化為位元組序列。
Kafka 將事件組織成有序日誌,稱為主題。當外部系統將事件寫入 Kafka 時,它會被附加到主題的末端。即使在閱讀後,訊息也會在主題中保留可配置的持續時間。與佇列不同,主題具有持久性、可複製性和容錯性,可以有效地儲存事件記錄。但日誌只能順序掃描,不能查詢。
主題作為日誌檔案儲存在磁碟上,但是磁碟具有有限的大小和 I/O 等限制。為了克服這個問題,Kafka 允許主題分為分區,將單一日誌分解為多個可以分佈在不同伺服器上的日誌。這種分區使 Kafka 能夠水平擴展,增強其處理大量事件和高吞吐量的能力。
Kafka 根據分割區是否有 key:
將訊息分配給分割區Kafka 使用名為 brokers 的節點作為分散式資料基礎設施運行,這些節點共同形成 Kafka 叢集。代理程式可以在裸機硬體、雲端執行個體、Kubernetes 管理的容器中、筆記型電腦上的 Docker 或任何可以執行 JVM 程序的地方運作。
經紀商關注:
它們不執行訊息計算或主題到主題的路由,從而保持設計簡單且有效率。
Kafka 透過跨多個代理複製分區資料來確保資料的持久性和容錯性。分區的主要副本是領導副本,而其他副本是跟隨副本。資料被寫入領導者,領導者自動將更新複製到追隨者。
此複製過程可確保:
開發人員可以從這些保證中受益,而無需直接管理複製,因為 Kafka 會透明地處理它。
Kafka 生產者 是一個客戶端應用程序,它將資料發送(發布)到 Kafka 主題。它負責建立訊息(記錄)並將其傳送到 Kafka 叢集。生產者根據其配置和訊息金鑰的存在來決定儲存訊息的主題和分區。生產者負責但不限於:
壓縮:
生產者可以壓縮訊息以減少網路頻寬和儲存使用。
Kafka 消費者 是一個客戶端應用程序,它從Kafka 主題讀取訊息, 它按照自己的節奏從Kafka 分區檢索訊息,允許即時或批量處理數據。請注意,Kafka 不會將訊息推送給消費者,而是透過請求資料從 Kafka 分區中拉取訊息。
消費者也可以追蹤他們已處理的抵銷額。偏移量可以自動或手動提交,確保消費者失敗時資料不會遺失。這允許靈活的消費,包括透過重置偏移量來重播訊息。
消費者組是一組消費者,它們合作消費來自某些主題的數據,從而允許分散式處理主題的訊息。
主題的分區在群組內的消費者之間劃分,確保每個訊息僅由群組內的一個消費者處理。多個消費組可以獨立消費同一個主題,互不干擾。
當新的消費者加入群組或現有消費者失敗時,Kafka 會在群組中的消費者之間重新分配分區,以確保覆蓋所有分區。
Kafka中的序列化和反序列化是將資料在其原始格式和位元組數組之間進行轉換以進行傳輸和存儲,從而使生產者和消費者能夠高效地進行通訊。
是將物件或資料結構轉換為位元組流以便傳輸或儲存的過程。在生產者將資料傳送到 Kafka 主題之前,它將資料(鍵和值)序列化為位元組數組。
常見序列化格式:
是相反的過程,其中位元組流被轉換回其原始物件或資料結構。當消費者從 Kafka 主題讀取資料時,它將位元組數組反序列化回可用的格式進行處理。
壓縮是指在儲存或傳輸訊息之前減少訊息的大小。它透過在生產者、代理商和消費者之間發送較小的有效負載來優化儲存使用、減少網路頻寬消耗並提高整體效能。
當生產者向 Kafka 主題發送訊息時,它可以在傳輸之前對訊息進行壓縮。壓縮的訊息原樣儲存在代理程式上,並由消費者在讀取訊息時解壓縮。
雖然壓縮可以節省資源,但必須平衡 CPU 使用率和壓縮優勢之間的權衡,選擇適合您的用例的編解碼器。
最佳化 Apache Kafka 的效能涉及微調各個元件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調優 Kafka 時需要考慮的一些方面:
分區管理:
生產者配置:
消費者配置:
想像一個應用程式記錄房間內的溫度並使用 Kafka 傳輸該數據,然後另一個應用程式處理該數據。為簡單起見,我們將僅關注 Kafka 方面,生產者和消費者都在同一應用程式中實現。在這種情況下,特定時刻記錄的每個溫度都代表一個事件:
{ temperature: 42, timeStamp: new Date(), };
所有程式碼都將在此儲存庫中。
首先,我們需要一個 Kafka 代理,但我們不需要在我們的機器中安裝 Kafka,只需使用這個 Docker Kafka 映像即可。
先拉取該影像:
docker 拉 apache/kafka
然後運行它,映射 Kafka 在我們機器上的同一連接埠上偵聽的連接埠:
docker run -d -p 9092:9092 --name Broker apache/kafka:latest
就是這樣,我們有一個正在運行的 Kafka 代理,在繼續之前,您可能想通過創建主題、發送和使用訊息來嘗試它,只需按照該圖像頁面上的說明進行操作即可。
為了建立我們的應用程序,我們將結合 NestJS 和 KafkaJS,首先使用 Nest CLI 建立應用程式
嵌套新的我的巢項目
在專案資料夾內安裝kafkajs
npm 我卡夫卡
並產生以下模組
巢g莫卡夫卡
nest g mo 製作人
巢 g mo 消費者
巢穴溫度
Kafka 模組 將處理所有 Kafka 特定的操作,包括管理用於連接、斷開連接、發送和接收訊息的消費者和生產者類別。這將是唯一直接與 kafkajs 套件互動的模組。
生產者和消費者模組將充當發布-訂閱平台(在本例中為 Kafka)與應用程式其餘部分之間的接口,抽象平台特定的詳細資訊。
溫度模組將管理事件。它不需要知道正在使用哪個發布-訂閱平台,只需要消費者和生產者即可運作。
建立模組後,我們也會建立一個資料夾 src/interface 並在其中新增以下介面:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
在 src/kafka/ 資料夾中新增實作這些介面的生產者和消費者類別:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
不要忘記在 kafka.module.ts 中匯出這些類別
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
現在我們可以轉到溫度模組並實例化這些 Kafka 類別並開始使用它們。然而,如果溫度模組不必擔心它正在使用哪個 pub-sub 平台,那就更好了。相反,它應該簡單地與注入的生產者和/或消費者一起工作,只專注於發送和接收訊息,而不管底層平台如何。這樣,如果我們決定將來切換到不同的 pub-sub 平台,我們不需要對溫度模組進行任何更改。
為了實現這種抽象,我們可以創建 Producer 和 Consumer 類別來處理 Kafka Producer 和 Consumer 實作的細節。讓我們從製作人開始:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
現在,消費者:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
現在,我們可以專注於建立溫度模組。在Temperature.service.ts 檔案中,我們將建立一個方法來註冊溫度,在本例中,該方法將簡單地使用生產者將溫度資料傳送到代理程式。此外,我們將實作一種方法來處理傳入訊息以用於演示目的。
這些方法可以由另一個服務或控制器呼叫。但是,為了簡單起見,在本範例中,我們將在應用程式啟動時利用 onModuleInit 方法直接呼叫它們。
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
就是這樣!透過在 Docker 容器中執行代理,您可以啟動應用程式來傳送和接收訊息。此外,您可以使用以下命令在代理容器內開啟 shell:
docker exec --workdir /opt/kafka/bin/ -it Broker sh
從那裡,您可以直接與代理互動並向應用程式發送訊息、從中接收訊息、創建新主題等。
這是包含本範例程式碼的儲存庫。
以上是Kafka 基礎知識與實際範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!