首页 >后端开发 >Golang >如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

WBOY
WBOY转载
2024-02-09 08:15:08575浏览

如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

php小编草莓为大家带来了一篇关于如何使用Azure Event Hubs Go SDK (azeventhubs)使用最新事件的指南。Azure Event Hubs是一种高可扩展、实时数据传输服务,可以用于处理大量事件数据。在本指南中,我们将介绍如何使用azeventhubs包来与Azure Event Hubs进行交互,并演示如何使用最新事件功能来获取实时数据。通过本文的指导,您将能够轻松地在Go应用程序中使用Azure Event Hubs,并利用最新事件功能来获取与处理实时数据。

问题内容

我正在从 azure-event-hubs-go/v3 迁移到较新的 azeventhubs Go SDK。在旧版 SDK 中,有一个 ReceiveOption 参数,允许我指定从哪里开始消费事件。

在新的 SDK 中,我使用以下代码来初始化处理器:

processor, err := azeventhubs.NewProcessor(
    e.ConsumerClient, 
    checkpointStore, 
    &azeventhubs.ProcessorOptions{
        UpdateInterval: time.Second, 
        Prefetch: 0, 
        StartPositions: azeventhubs.StartPositions{
            Default: azeventhubs.StartPosition{
                Latest: to.Ptr(true), 
                EnqueuedTime: to.Ptr(time.Now()), 
                Inclusive: true
            }
        }
    }
)

但是,我注意到事件是从最后一个检查点而不是最近发送的事件中消耗的。

我尝试过的: 我已经尝试过 ConsumingEventsUsingConsumerClientConsumingEventsWithCheckpoints 示例,但它们的行为方式相同,消耗来自最后一个检查点的事件而不是最近的事件。

我的期望: 我希望处理器开始使用从设备发送的最新事件,该设备每秒发送一条消息。如何使用 azeventhubs Go SDK 实现此行为?

解决方法

我最初很难掌握 AMQP 的底层机制。不过,我很高兴地报告,该问题已成功解决。

var wg sync.WaitGroup
wg.Add(1)

for _, partition := range p.PartitionIDs {
    go func(partition string) {
        defer wg.Done()

        partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
        if err != nil {
            panic(err)
        }

        receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
        defer cancel()

        for {
            events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)

            if err != nil && !errors.Is(err, context.DeadlineExceeded) {
                panic(err)
            }

            for _, evt := range events {
                fmt.Printf("partition: %s\n", partition)
                fmt.Printf("Body: %s\n", string(evt.Body))
            }
        }
    }(partition)
}

wg.Wait()

我对 Azure 客户支持服务团队提供的宝贵帮助表示感谢。

以上是如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文转载于:stackoverflow.com。如有侵权,请联系admin@php.cn删除