搜索
首页后端开发Golang用 Go 构建 Kafka 生产者和消费者

Building a Kafka Producer and Consumer in Go

Apache Kafka 是一个强大的分布式流平台,用于构建实时数据管道和流应用程序。在这篇博文中,我们将逐步使用 Golang 设置 Kafka 生产者和消费者。

先决条件

在我们开始之前,请确保您的计算机上安装了以下软件:

  • Go(1.16 或更高)

  • Docker(用于在本地运行 Kafka)

  • 卡夫卡

使用 Docker 设置 Kafka

为了快速设置 Kafka,我们将使用 Docker。在项目目录中创建 docker-compose.yml 文件:

yamlCopy codeversion: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper

运行以下命令启动 Kafka 和 Zookeeper:

docker-compose up -d

在 Go 中创建 Kafka 生产者

首先,初始化一个新的 Go 模块:

go mod init kafka-example

安装 kafka-go 库:

go get github.com/segmentio/kafka-go

现在,创建一个文件 Producer.go 并添加以下代码:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "log"
    "time"
)

func main() {
    writer := kafka.Writer{
        Addr:     kafka.TCP("localhost:9092"),
        Topic:    "example-topic",
        Balancer: &kafka.LeastBytes{},
    }

    defer writer.Close()

    for i := 0; i 



<p>此代码设置一个 Kafka 生产者,向 example-topic 主题发送 10 条消息。</p>

<p>运行生产者:<br>
</p>

<pre class="brush:php;toolbar:false">go run producer.go

您应该看到指示消息已生成的输出。

在 Go 中创建 Kafka 消费者

创建文件consumer.go并添加以下代码:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "log"
)

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "example-topic",
        GroupID: "example-group",
    })

    defer reader.Close()

    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatal("could not read message " + err.Error())
        }
        fmt.Printf("Consumed message: %s\n", msg.Value)
    }
}

该消费者从 example-topic 主题读取消息并将其打印到控制台。

运行消费者:

go run consumer.go

您应该看到指示消息已被消耗的输出。

结论

在这篇博文中,我们演示了如何使用 Golang 设置 Kafka 生产者和消费者。这个简单的示例展示了生成和消费消息的基础知识,但 Kafka 的功能远远不止于此。借助 Kafka,您可以构建强大的、可扩展的实时数据处理系统。

随意探索更高级的功能,例如消息分区、基于密钥的消息分发以及与其他系统的集成。快乐编码!


就是这样!这篇博文简要介绍了如何将 Kafka 与 Go 结合使用,非常适合想要开始实时数据处理的开发人员。

以上是用 Go 构建 Kafka 生产者和消费者的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
测试代码依赖于INET功能的代码测试代码依赖于INET功能的代码May 03, 2025 am 12:20 AM

whentestinggocodewithinitfunctions,useexplicitseTupfunctionsorseParateTestFileSteSteTepteTementDippedDependendendencyOnInItfunctionsIdeFunctionSideFunctionsEffect.1)useexplicitsetupfunctionStocontrolglobalvaribalization.2)createSepEpontrolglobalvarialization

将GO的错误处理方法与其他语言进行比较将GO的错误处理方法与其他语言进行比较May 03, 2025 am 12:20 AM

go'serrorhandlingurturnserrorsasvalues,与Javaandpythonwhichuseexceptions.1)go'smethodensursexplitirorhanderling,propertingrobustcodebutincreasingverbosity.2)

设计有效界面的最佳实践设计有效界面的最佳实践May 03, 2025 am 12:18 AM

AnefactiveInterfaceoisminimal,clear and promotesloosecoupling.1)minimizeTheInterfaceForflexibility andeaseofimplementation.2)useInterInterfaceForeabStractionTosWapImplementations withCallingCallingCode.3)

集中式错误处理策略集中式错误处理策略May 03, 2025 am 12:17 AM

集中式错误处理在Go语言中可以提升代码的可读性和可维护性。其实现方式和优势包括:1.将错误处理逻辑从业务逻辑中分离,简化代码。2.通过集中处理错误,确保错误处理的一致性。3.使用defer和recover来捕获和处理panic,增强程序健壮性。

init in Init函数的替代方案,用于go中的包装初始化init in Init函数的替代方案,用于go中的包装初始化May 03, 2025 am 12:17 AM

Ingo,替代词Inivuntionsionializatializatializationfunctionsandsingletons.1)customInitializationfunctions hallowexpliticpliticpliticconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconconcontirization curssementializatizatupsetups.2)单次固定元素限制ininconinconcurrent

与GO接口键入断言和类型开关与GO接口键入断言和类型开关May 02, 2025 am 12:20 AM

Gohandlesinterfacesandtypeassertionseffectively,enhancingcodeflexibilityandrobustness.1)Typeassertionsallowruntimetypechecking,asseenwiththeShapeinterfaceandCircletype.2)Typeswitcheshandlemultipletypesefficiently,usefulforvariousshapesimplementingthe

使用errors.is和错误。使用errors.is和错误。May 02, 2025 am 12:11 AM

Go语言的错误处理通过errors.Is和errors.As函数变得更加灵活和可读。1.errors.Is用于检查错误是否与指定错误相同,适用于错误链的处理。2.errors.As不仅能检查错误类型,还能将错误转换为具体类型,方便提取错误信息。使用这些函数可以简化错误处理逻辑,但需注意错误链的正确传递和避免过度依赖以防代码复杂化。

在GO中进行性能调整:优化您的应用程序在GO中进行性能调整:优化您的应用程序May 02, 2025 am 12:06 AM

tomakegoapplicationsRunfasterandMorefly,useProflingTools,leverageConCurrency,andManageMoryfectily.1)usepprofforcpuorforcpuandmemoryproflingtoidentifybottlenecks.2)upitizegorizegoroutizegoroutinesandchannelstoparalletaparelalyizetasksandimproverperformance.3)

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)