首頁 >web前端 >js教程 >Kafka 基礎知識與實際範例

Kafka 基礎知識與實際範例

Linda Hamilton
Linda Hamilton原創
2024-12-28 09:26:11353瀏覽

在過去的幾周里,我一直在深入研究Kafka 並一路做筆記,我決定組織和構建一篇博客文章,在上面,除了概念和技巧之外,還有一個使用構建的實際示例NestJS和KafkaJs。

卡夫卡是什麼?

Apache Kafka 是一個分散式事件流平台,旨在處理即時事件。它能夠儲存、處理和檢索大規模、高吞吐量、低延遲的資料流,使其適合建立即時資料管道和事件驅動的應用程式。

主要特點:

  • 事件流: Kafka 將資料組織成主題,它們是事件的有序日誌。
  • 分散式架構:Kafka 是為了可擴展性和容錯能力而建構的。它作為稱為代理的節點叢集運行,可以跨多個伺服器分發資料。
  • 發布-訂閱模型:生產者將訊息寫入主題消費者從中讀取訊息。 Kafka支援多個消費者,讓不同的應用程式獨立處理同一個資料流。
  • 高效能: Kafka 針對高吞吐量進行了最佳化,每秒處理數百萬則訊息,延遲較低。
  • 持久儲存: Kafka 將訊息儲存在磁碟上,保留期限可配置,確保資料持久性和可靠性。
  • 分區與複製:主題分為分區以實現可擴展性,並跨代理進行複製以實現容錯。
  • 可重玩性:消費者可以透過重置其偏移量來重新讀取訊息,從而啟用資料重新處理或復原。
  • 集成和生態系統: Kafka 與各種系統集成,並擁有 Kafka Connect(用於數據集成)和 Kafka Streams(用於流處理)等工具。

優點

  • 可靠性:透過資料分發、複製和分區確保容錯。
  • 可擴充性:Kafka 可以處理大量資料並水平擴展而無需停機。
  • 持久性:訊息被及時存儲,確保彈性和資料持久性。
  • 效能:Kafka 在極端資料負載下保持高效能,處理大量資料而不會造成停機或資料遺失。

缺點

這些權衡是有意的設計選擇,旨在最大限度地提高 Kafka 的性能,但可能會給需要更大靈活性的用例帶來挑戰:

  • 靈活性有限: Kafka 缺乏對擴展查詢的支持,例如過濾報告中的特定資料。消費者必須處理這些任務,因為 Kafka 按訊息接收順序的偏移量檢索訊息。
  • 不適合長期儲存:Kafka 擅長串流數據,但不適合長期儲存歷史資料。資料重複會使大型資料集的儲存成本高昂。
  • 無通配符主題支援: Kafka 不允許使用通配符模式(例如 log-2024-*)從多個主題進行消費。

使用案例

  • 即時分析:在資料流發生時進行處理與分析。
  • 事件溯源: 將應用程式狀態的所有變更記錄為事件序列。
  • 日誌聚合:從分散式系統收集和管理日誌。
  • 資料管道:可靠且有效率地在系統之間傳輸資料。
  • 物聯網應用:即時處理來自物聯網設備的感測器資料。

卡夫卡如何運作?

Kafka 整合了隊列和發布-訂閱訊息傳遞模型的功能,為消費者提供每種方法的優勢。

  • 佇列 透過在多個消費者實例之間分配任務來實現可擴展的資料處理,但傳統佇列不支援多個訂閱者。
  • 發布-訂閱模型支援多個訂閱者,但無法在多個工作進程之間分配任務,因為每個訊息都會發送給所有訂閱者。

Kafka 採用分區日誌系統來結合佇列和發布-訂閱模型的優點。日誌是有序的記錄序列,被分成多個分區,每個分區分配給不同的訂閱者(消費者)。此設定使多個訂閱者能夠共享一個主題,同時保持可擴展性。

Kafka fundamentals with a practical example

活動、主題和分區

我們已經看到 Kafka 是一個旨在處理即時事件的平台,在討論如何處理這些事件之前,我們需要對它們進行定義:

事件是記錄應用程式的操作、事件或更改,例如付款、網站點擊或溫度讀數。

Kafka 中的

事件被建模為鍵/值對,其中鍵和值都被序列化為位元組序列。

  • 通常表示序列化的域物件或原始輸入,例如感測器輸出或其他應用程式資料。它們封裝了 Kafka 事件中傳輸的核心訊息。
  • 可以是複雜的域對象,但通常是簡單的類型,如字串或整數。鍵通常標識系統內的實體,例如特定使用者、訂單或連接的設備,而不是唯一標識單一事件(如關聯式資料庫中的主鍵)。

Kafka 將事件組織成有序日誌,稱為主題。當外部系統將事件寫入 Kafka 時,它會被附加到主題的末端。即使在閱讀後,訊息也會在主題中保留可配置的持續時間。與佇列不同,主題具有持久性、可複製性和容錯性,可以有效地儲存事件記錄。但日誌只能順序掃描,不能查詢。

主題作為日誌檔案儲存在磁碟上,但是磁碟具有有限的大小和 I/O 等限制。為了克服這個問題,Kafka 允許主題分為分區,將單一日誌分解為多個可以分佈在不同伺服器上的日誌。這種分區使 Kafka 能夠水平擴展,增強其處理大量事件和高吞吐量的能力。

Kafka 根據分割區是否有 key:

將訊息分配給分割區
  • 無鍵:訊息在所有分割區之間循環分發,確保資料均勻分佈,但不保留訊息順序。
  • With Key: 分區是透過對 key 進行哈希處理來確定的,確保具有相同 key 的訊息始終進入相同的分區並保持其順序。

經紀人

Kafka 使用名為 brokers 的節點作為分散式資料基礎設施運行,這些節點共同形成 Kafka 叢集。代理程式可以在裸機硬體、雲端執行個體、Kubernetes 管理的容器中、筆記型電腦上的 Docker 或任何可以執行 JVM 程序的地方運作。

經紀商關注:

  • 將新事件寫入分割區。
  • 從分割區讀取服務。
  • 跨代理複製分區。

它們不執行訊息計算或主題到主題的路由,從而保持設計簡單且有效率。

複製

Kafka 透過跨多個代理複製分區資料來確保資料的持久性和容錯性。分區的主要副本是領導副本,而其他副本是跟隨副本。資料被寫入領導者,領導者自動將更新複製到追隨者。

此複製過程可確保:

  • 資料安全,即使在代理程式或儲存發生故障的情況下也是如此。
  • 自動故障轉移,如果當前領導者失敗,另一個副本將接管作為領導者。

開發人員可以從這些保證中受益,而無需直接管理複製,因為 Kafka 會透明地處理它。

製片人

Kafka 生產者 是一個客戶端應用程序,它將資料發送(發布)到 Kafka 主題。它負責建立訊息(記錄)並將其傳送到 Kafka 叢集。生產者根據其配置和訊息金鑰的存在來決定儲存訊息的主題分區。生產者負責但不限於:

  • 訊息撰寫:
    • 每個訊息由一個鍵(可選)、一個值(實際資料)和元資料組成。
    • key決定訊息的分區,確保具有相同key的訊息的順序。
  • 分區分配:
    • 如果提供了金鑰,生產者將使用雜湊演算法來決定分區。
    • 沒有金鑰,訊息以循環方式跨分區分發以實現負載平衡。
  • 壓縮:

    生產者可以壓縮訊息以減少網路頻寬和儲存使用。

消費者

Kafka 消費者 是一個客戶端應用程序,它從Kafka 主題讀取訊息, 它按照自己的節奏從Kafka 分區檢索訊息,允許即時或批量處理數據。請注意,Kafka 不會將訊息推送給消費者,而是透過請求資料從 Kafka 分區中拉取訊息。

消費者也可以追蹤他們已處理的抵銷額。偏移量可以自動手動提交,確保消費者失敗時資料不會遺失。這允許靈活的消費,包括透過重置偏移量來重播訊息。

消費群

消費者組是一組消費者,它們合作消費來自某些主題的數據,從而允許分散式處理主題的訊息。

主題的分區在群組內的消費者之間劃分,確保每個訊息僅由群組內的一個消費者處理。多個消費組可以獨立消費同一個主題,互不干擾。

當新的消費者加入群組或現有消費者失敗時,Kafka 會在群組中的消費者之間重新分配分區,以確保覆蓋所有分區。

序列化和反序列化

Kafka中的序列化和反序列化是將資料在其原始格式和位元組數組之間進行轉換以進行傳輸和存儲,從而使生產者和消費者能夠高效地進行通訊。

序列化

是將物件或資料結構轉換為位元組流以便傳輸或儲存的過程。在生產者將資料傳送到 Kafka 主題之前,它將資料(鍵和值)序列化為位元組數組。

常見序列化格式:

  • JSON:人類可讀,廣泛相容。
  • Avro:緊湊高效,基於模式。
  • Protobuf:緊湊、基於模式且與語言無關。
  • 字串:簡單的基於文字的序列化。
  • 自訂序列化:滿足特定於應用程式的需求。

反序列化

是相反的過程,其中位元組流被轉換回其原始物件或資料結構。當消費者從 Kafka 主題讀取資料時,它將位元組數組反序列化回可用的格式進行處理。

壓縮

壓縮是指在儲存或傳輸訊息之前減少訊息的大小。它透過在生產者、代理商和消費者之間發送較小的有效負載來優化儲存使用、減少網路頻寬消耗並提高整體效能。

當生產者向 Kafka 主題發送訊息時,它可以在傳輸之前對訊息進行壓縮。壓縮的訊息原樣儲存在代理程式上,並由消費者在讀取訊息時解壓縮。

優點

  • 減少網路頻寬:較小的有效負載意味著透過網路傳輸的資料較少。
  • 較低的儲存需求:壓縮訊息佔用較少的磁碟空間。
  • 提高吞吐量:較小的訊息可以實現更快的資料傳輸和處理。

什麼時候使用?

  • 訊息大小較大的用例:壓縮大大減少了資料大小。
  • 高吞吐量系統:減少網路和儲存資源的壓力。
  • 批次:當生產者將多個訊息一起批次時,壓縮效果最佳。

雖然壓縮可以節省資源,但必須平衡 CPU 使用率和壓縮優勢之間的權衡,選擇適合您的用例的編解碼器。

支援的壓縮類型

  • 無: 無壓縮(預設)。
  • Gzip:壓縮比高,但CPU佔用率較高。
  • Snappy:平衡的壓縮速度和CPU使用率,適合即時使用案例。
  • LZ4:更快的壓縮和解壓縮,針對低延遲系統進行了最佳化。
  • Zstd: 高壓縮比,效能比 Gzip 更好,較新的 Kafka 版本支援。

調音

最佳化 Apache Kafka 的效能涉及微調各個元件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調優 Kafka 時需要考慮的一些方面:

  • 分區管理:

    • 分區計數:增加分區數量以增強並行性和吞吐量。但是,請避免過多的分區以防止管理開銷。根據您的消費者能力和所需的消費率調整分區數量。
  • 生產者配置:

    • 批次:配置batch.size和linger.ms以實現高效的訊息批次,減少請求數量並提高吞吐量。
    • 壓縮: 實作壓縮(例如,compression.type=snappy)以減少訊息大小,從而減少網路和儲存使用。請注意壓縮帶來的額外 CPU 開銷。
  • 消費者配置:

    • 取得設定:調整 fetch.min.bytes 和 fetch.max.wait.ms 以控制消費者檢索訊息的方式,根據應用程式的需求平衡延遲和吞吐量。

實際例子

想像一個應用程式記錄房間內的溫度並使用 Kafka 傳輸該數據,然後另一個應用程式處理該數據。為簡單起見,我們將僅關注 Kafka 方面,生產者和消費者都在同一應用程式中實現。在這種情況下,特定時刻記錄的每個溫度都代表一個事件:

{
  temperature: 42,
  timeStamp: new Date(),
};

所有程式碼都將在此儲存庫中。

首先,我們需要一個 Kafka 代理,但我們不需要在我們的機器中安裝 Kafka,只需使用這個 Docker Kafka 映像即可。

先拉取該影像:

docker 拉 apache/kafka

然後運行它,映射 Kafka 在我們機器上的同一連接埠上偵聽的連接埠:

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

就是這樣,我們有一個正在運行的 Kafka 代理,在繼續之前,您可能想通過創建主題、發送和使用訊息來嘗試它,只需按照該圖像頁面上的說明進行操作即可。

為了建立我們的應用程序,我們將結合 NestJS 和 KafkaJS,首先使用 Nest CLI 建立應用程式

嵌套新的我的巢項目

在專案資料夾內安裝kafkajs

npm 我卡夫卡

並產生以下模組

巢g莫卡夫卡

nest g mo 製作人

巢 g mo 消費者

巢穴溫度

Kafka 模組 將處理所有 Kafka 特定的操作,包括管理用於連接、斷開連接、發送和接收訊息的消費者和生產者類別。這將是唯一直接與 kafkajs 套件互動的模組。

生產者和消費者模組將充當發布-訂閱平台(在本例中為 Kafka)與應用程式其餘部分之間的接口,抽象平台特定的詳細資訊。

溫度模組將管理事件。它不需要知道正在使用哪個發布-訂閱平台,只需要消費者和生產者即可運作。

建立模組後,我們也會建立一個資料夾 src/interface 並在其中新增以下介面:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

在 src/kafka/ 資料夾中新增實作這些介面的生產者和消費者類別:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

不要忘記在 kafka.module.ts 中匯出這些類別

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

現在我們可以轉到溫度模組並實例化這些 Kafka 類別並開始使用它們。然而,如果溫度模組不必擔心它正在使用哪個 pub-sub 平台,那就更好了。相反,它應該簡單地與注入的生產者和/或消費者一起工作,只專注於發送和接收訊息,而不管底層平台如何。這樣,如果我們決定將來切換到不同的 pub-sub 平台,我們不需要對溫度模組進行任何更改。

為了實現這種抽象,我們可以創建 Producer 和 Consumer 類別來處理 Kafka Producer 和 Consumer 實作的細節。讓我們從製作人開始:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

現在,消費者:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

現在,我們可以專注於建立溫度模組。在Temperature.service.ts 檔案中,我們將建立一個方法來註冊溫度,在本例中,該方法將簡單地使用生產者將溫度資料傳送到代理程式。此外,我們將實作一種方法來處理傳入訊息以用於演示目的。

這些方法可以由另一個服務或控制器呼叫。但是,為了簡單起見,在本範例中,我們將在應用程式啟動時利用 onModuleInit 方法直接呼叫它們。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

就是這樣!透過在 Docker 容器中執行代理,您可以啟動應用程式來傳送和接收訊息。此外,您可以使用以下命令在代理容器內開啟 shell:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

從那裡,您可以直接與代理互動並向應用程式發送訊息、從中接收訊息、創建新主題等。

這是包含本範例程式碼的儲存庫。

以上是Kafka 基礎知識與實際範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn