首页 >web前端 >js教程 >Kafka 基础知识和实际示例

Kafka 基础知识和实际示例

Linda Hamilton
Linda Hamilton原创
2024-12-28 09:26:11356浏览

在过去的几周里,我一直在深入研究 Kafka 并一路做笔记,我决定组织和构建一篇博客文章,在上面,除了概念和技巧之外,还有一个使用构建的实际示例NestJS 和 KafkaJs。

卡夫卡是什么?

Apache Kafka 是一个分布式事件流平台,旨在处理实时事件。它能够存储、处理和检索大规模、高吞吐量、低延迟的数据流,使其适合构建实时数据管道和事件驱动的应用程序。

主要特点:

  • 事件流: Kafka 将数据组织成主题,它们是事件的有序日志。
  • 分布式架构:Kafka 是为了可扩展性和容错能力而构建的。它作为称为代理的节点集群运行,可以跨多个服务器分发数据。
  • 发布-订阅模型:生产者将消息写入主题消费者从中读取消息。 Kafka支持多个消费者,允许不同的应用程序独立处理同一个数据流。
  • 高性能: Kafka 针对高吞吐量进行了优化,每秒处理数百万条消息,延迟较低。
  • 持久存储: Kafka 将消息存储在磁盘上,保留期限可配置,确保数据持久性和可靠性。
  • 分区和复制:主题分为分区以实现可扩展性,并跨代理进行复制以实现容错。
  • 可重玩性:消费者可以通过重置其偏移量来重新读取消息,从而启用数据重新处理或恢复。
  • 集成和生态系统: Kafka 与各种系统集成,并拥有 Kafka Connect(用于数据集成)和 Kafka Streams(用于流处理)等工具。

优点

  • 可靠性:通过数据分发、复制和分区确保容错。
  • 可扩展性:Kafka 可以处理海量数据并水平扩展而无需停机。
  • 持久性:消息被及时存储,确保弹性和数据持久性。
  • 性能:Kafka 在极端数据负载下保持高性能,处理大量数据而不会造成停机或数据丢失。

缺点

这些权衡是有意的设计选择,旨在最大限度地提高 Kafka 的性能,但可能会给需要更大灵活性的用例带来挑战:

  • 灵活性有限: Kafka 缺乏对扩展查询的支持,例如过滤报告中的特定数据。消费者必须处理这些任务,因为 Kafka 按消息接收顺序的偏移量检索消息。
  • 不适合长期存储:Kafka 擅长流数据,但不适合长期存储历史数据。数据重复会使大型数据集的存储成本高昂。
  • 无通配符主题支持: Kafka 不允许使用通配符模式(例如 log-2024-*)从多个主题进行消费。

使用案例

  • 实时分析:在数据流发生时对其进行处理和分析。
  • 事件溯源: 将应用程序状态的所有更改记录为事件序列。
  • 日志聚合:从分布式系统收集和管理日志。
  • 数据管道:可靠且高效地在系统之间传输数据。
  • 物联网应用:实时处理来自物联网设备的传感器数据。

卡夫卡如何工作?

Kafka 集成了队列和发布-订阅消息传递模型的功能,为消费者提供每种方法的优势。

  • 队列 通过在多个消费者实例之间分配任务来实现可扩展的数据处理,但传统队列不支持多个订阅者。
  • 发布-订阅模型支持多个订阅者,但无法在多个工作进程之间分配任务,因为每条消息都会发送给所有订阅者。

Kafka 采用分区日志系统来结合队列和发布-订阅模型的优点。日志是有序的记录序列,被分为多个分区,每个分区分配给不同的订阅者(消费者)。此设置使多个订阅者能够共享一个主题,同时保持可扩展性。

Kafka fundamentals with a practical example

事件、主题和分区

我们已经看到 Kafka 是一个旨在处理实时事件的平台,在讨论如何处理这些事件之前,我们需要对它们进行定义:

事件是记录应用程序的操作、事件或更改,例如付款、网站点击或温度读数。

Kafka 中的

事件被建模为键/值对,其中键和值都被序列化为字节序列。

  • 通常表示序列化的域对象或原始输入,例如传感器输出或其他应用程序数据。它们封装了 Kafka 事件中传输的核心信息。
  • 可以是复杂的域对象,但通常是简单的类型,如字符串或整数。键通常标识系统内的实体,例如特定用户、订单或连接的设备,而不是唯一标识单个事件(如关系数据库中的主键)。

Kafka 将事件组织成有序日志,称为主题。当外部系统将事件写入 Kafka 时,它会被附加到主题的末尾。即使在阅读后,消息也会在主题中保留可配置的持续时间。与队列不同,主题具有持久性、可复制性和容错性,可以有效地存储事件记录。但日志只能顺序扫描,不能查询。

主题作为日志文件存储在磁盘上,但是磁盘具有有限的大小和 I/O 等限制。为了克服这个问题,Kafka 允许主题分为分区,将单个日志分解为多个可以分布在不同服务器上的日志。这种分区使 Kafka 能够水平扩展,增强其处理大量事件和高吞吐量的能力。

Kafka 根据分区是否有 key:

将消息分配给分区
  • 无键:消息在所有分区之间循环分发,确保数据均匀分布,但不保留消息顺序。
  • With Key: 分区是通过对 key 进行哈希处理来确定的,确保具有相同 key 的消息始终进入相同的分区并保持其顺序。

经纪人

Kafka 使用名为 brokers 的节点作为分布式数据基础设施运行,这些节点共同形成 Kafka 集群。代理可以在裸机硬件、云实例、Kubernetes 管理的容器中、笔记本电脑上的 Docker 中或任何可以运行 JVM 进程的地方运行。

经纪商关注:

  • 将新事件写入分区。
  • 从分区读取服务。
  • 跨代理复制分区。

它们不执行消息计算或主题到主题的路由,从而保持设计简单高效。

复制

Kafka 通过跨多个代理复制分区数据来确保数据的持久性和容错性。分区的主要副本是领导副本,而其他副本是跟随副本。数据被写入领导者,领导者自动将更新复制到追随者。

此复制过程可确保:

  • 数据安全,即使在代理或存储发生故障的情况下也是如此。
  • 自动故障转移,如果当前领导者失败,另一个副本将接管作为领导者。

开发人员可以从这些保证中受益,而无需直接管理复制,因为 Kafka 会透明地处理它。

制片人

Kafka 生产者 是一个客户端应用程序,它将数据发送(发布)到 Kafka 主题。它负责创建消息(记录)并将其传输到 Kafka 集群。生产者根据其配置和消息密钥的存在来确定存储消息的主题分区。生产者负责但不限于:

  • 消息撰写:
    • 每条消息由一个键(可选)、一个值(实际数据)和元数据组成。
    • key决定消息的分区,确保具有相同key的消息的顺序。
  • 分区分配:
    • 如果提供了密钥,生产者将使用哈希算法来确定分区。
    • 没有密钥,消息以循环方式跨分区分发以实现负载平衡。
  • 压缩:

    生产者可以压缩消息以减少网络带宽和存储使用。

消费者

Kafka 消费者 是一个客户端应用程序,它从 Kafka 主题读取消息, 它按照自己的节奏从 Kafka 分区检索消息,允许实时或批量处理数据。请注意,Kafka 不会将消息推送给消费者,而是通过请求数据从 Kafka 分区中拉取消息。

消费者还可以跟踪他们已处理的抵消额。偏移量可以自动手动提交,确保消费者失败时数据不会丢失。这允许灵活的消费,包括通过重置偏移量来重放消息。

消费群体

消费者组是一组消费者,它们合作消费来自某些主题的数据,从而允许分布式处理主题的消息。

主题的分区在组内的消费者之间划分,确保每条消息仅由组内的一个消费者处理。多个消费组可以独立消费同一个主题,互不干扰。

当新的消费者加入组或现有消费者失败时,Kafka 会在组中的消费者之间重新分配分区,以确保覆盖所有分区。

序列化和反序列化

Kafka中的序列化和反序列化是将数据在其原始格式和字节数组之间进行转换以进行传输和存储,从而使生产者和消费者能够高效地进行通信。

序列化

是将对象或数据结构转换为字节流以便传输或存储的过程。在生产者将数据发送到 Kafka 主题之前,它将数据(键和值)序列化为字节数组。

常见序列化格式:

  • JSON:人类可读,广泛兼容。
  • Avro:紧凑高效,基于模式。
  • Protobuf:紧凑、基于模式且与语言无关。
  • 字符串:简单的基于文本的序列化。
  • 自定义序列化:满足特定于应用程序的需求。

反序列化

是相反的过程,其中字节流被转换回其原始对象或数据结构。当消费者从 Kafka 主题读取数据时,它将字节数组反序列化回可用的格式进行处理。

压缩

压缩是指在存储或传输消息之前减小消息的大小。它通过在生产者、代理和消费者之间发送较小的有效负载来优化存储使用、减少网络带宽消耗并提高整体性能。

当生产者向 Kafka 主题发送消息时,它可以在传输之前对消息进行压缩。压缩的消息按原样存储在代理上,并由消费者在读取消息时解压缩。

优点

  • 减少网络带宽:较小的有效负载意味着通过网络传输的数据较少。
  • 较低的存储要求:压缩消息占用更少的磁盘空间。
  • 提高吞吐量:较小的消息可以实现更快的数据传输和处理。

什么时候使用?

  • 消息大小较大的用例:压缩大大减少了数据大小。
  • 高吞吐量系统:减少网络和存储资源的压力。
  • 批处理:当生产者将多个消息一起批处理时,压缩效果最佳。

虽然压缩可以节省资源,但必须平衡 CPU 使用率和压缩优势之间的权衡,选择适合您的用例的编解码器。

支持的压缩类型

  • 无: 无压缩(默认)。
  • Gzip:压缩比高,但CPU占用率较高。
  • Snappy:平衡的压缩速度和CPU使用率,适合实时用例。
  • LZ4:更快的压缩和解压缩,针对低延迟系统进行了优化。
  • Zstd: 高压缩比,性能比 Gzip 更好,较新的 Kafka 版本支持。

调音

优化 Apache Kafka 的性能涉及微调各个组件以有效平衡吞吐量和延迟。本文仅触及该主题的表面,以下是调优 Kafka 时需要考虑的一些方面:

  • 分区管理:

    • 分区计数:增加分区数量以增强并行性和吞吐量。但是,请避免过多的分区以防止管理开销。根据您的消费者能力和所需的消费率调整分区数量。
  • 生产者配置:

    • 批处理:配置batch.size和linger.ms以实现高效的消息批处理,减少请求数量并提高吞吐量。
    • 压缩: 实施压缩(例如,compression.type=snappy)以减小消息大小,从而减少网络和存储使用。请注意压缩带来的额外 CPU 开销。
  • 消费者配置:

    • 获取设置:调整 fetch.min.bytes 和 fetch.max.wait.ms 以控制消费者检索消息的方式,根据应用程序的需求平衡延迟和吞吐量。

实际例子

想象一个应用程序记录房间内的温度并使用 Kafka 传输该数据,然后另一个应用程序处理该数据。为简单起见,我们将仅关注 Kafka 方面,生产者和消费者都在同一应用程序中实现。在这种情况下,特定时刻记录的每个温度都代表一个事件:

{
  temperature: 42,
  timeStamp: new Date(),
};

所有代码都将在此存储库中。

首先,我们需要一个 Kafka 代理,但我们不需要在我们的机器中安装 Kafka,只需使用这个 Docker Kafka 镜像即可。

首先拉取该图像:

docker 拉 apache/kafka

然后运行它,映射 Kafka 在我们机器上的同一端口上侦听的端口:

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

就是这样,我们有一个正在运行的 Kafka 代理,在继续之前,您可能想通过创建主题、发送和使用消息来尝试一下它,只需按照该图像页面上的说明进行操作即可。

为了构建我们的应用程序,我们将结合使用 NestJS 和 KafkaJS,首先使用 Nest CLI 创建应用程序

嵌套新的我的巢项目

在项目文件夹内安装kafkajs

npm 我卡夫卡

并生成以下模块

巢g莫卡夫卡

nest g mo 制作人

巢 g mo 消费者

巢穴温度

Kafka 模块 将处理所有 Kafka 特定的操作,包括管理用于连接、断开连接、发送和接收消息的消费者和生产者类。这将是唯一直接与 kafkajs 包交互的模块。

生产者和消费者模块将充当发布-订阅平台(在本例中为 Kafka)与应用程序其余部分之间的接口,抽象平台特定的详细信息。

温度模块将管理事件。它不需要知道正在使用哪个发布-订阅平台,只需要消费者和生产者即可运行。

创建模块后,我们还创建一个文件夹 src/interface 并在其中添加以下接口:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

在 src/kafka/ 文件夹中添加实现这些接口的生产者和消费者类:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

不要忘记在 kafka.module.ts 中导出这些类

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

现在我们可以转到温度模块并实例化这些 Kafka 类并开始使用它们。然而,如果温度模块不必担心它正在使用哪个 pub-sub 平台,那就更好了。相反,它应该简单地与注入的生产者和/或消费者一起工作,只专注于发送和接收消息,而不管底层平台如何。这样,如果我们决定将来切换到不同的 pub-sub 平台,我们不需要对温度模块进行任何更改。

为了实现这种抽象,我们可以创建 Producer 和 Consumer 类来处理 Kafka Producer 和 Consumer 实现的细节。让我们从制作人开始:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

现在,消费者:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

现在,我们可以专注于构建温度模块。在Temperature.service.ts 文件中,我们将创建一个方法来注册温度,在本例中,该方法将简单地使用生产者将温度数据发送到代理。此外,我们将实现一种方法来处理传入消息以用于演示目的。

这些方法可以由另一个服务或控制器调用。但是,为了简单起见,在本示例中,我们将在应用程序启动时利用 onModuleInit 方法直接调用它们。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

就是这样!通过在 Docker 容器中运行代理,您可以启动应用程序来发送和接收消息。此外,您可以使用以下命令在代理容器内打开 shell:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

从那里,您可以直接与代理交互并向应用程序发送消息、从中接收消息、创建新主题等。

这是包含本示例代码的存储库。

以上是Kafka 基础知识和实际示例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn