首頁  >  文章  >  後端開發  >  如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

WBOY
WBOY轉載
2024-02-09 08:15:08508瀏覽

如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?

php小編草莓為大家帶來了一篇關於如何使用Azure Event Hubs Go SDK (azeventhubs)使用最新事件的指南。 Azure Event Hubs是一種高可擴展、即時資料傳輸服務,可用於處理大量事件資料。在本指南中,我們將介紹如何使用azeventhubs套件來與Azure Event Hubs進行交互,並示範如何使用最新事件功能來取得即時資料。透過本文的指導,您將能夠輕鬆地在Go應用程式中使用Azure Event Hubs,並利用最新事件功能來獲取與處理即時數據。

問題內容

我正在從 azure-event-hubs-go/v3 遷移到較新的 azeventhubs Go SDK。在舊版 SDK 中,有一個 ReceiveOption 參數,讓我可以指定從哪裡開始消費事件。

在新的 SDK 中,我使用以下程式碼來初始化處理器:

processor, err := azeventhubs.NewProcessor(
    e.ConsumerClient, 
    checkpointStore, 
    &azeventhubs.ProcessorOptions{
        UpdateInterval: time.Second, 
        Prefetch: 0, 
        StartPositions: azeventhubs.StartPositions{
            Default: azeventhubs.StartPosition{
                Latest: to.Ptr(true), 
                EnqueuedTime: to.Ptr(time.Now()), 
                Inclusive: true
            }
        }
    }
)

但是,我注意到事件是從最後一個檢查點而不是最近發送的事件中消耗的。

我嘗試過的: 我已經嘗試過 ConsumingEventsUsingConsumerClientConsumingEventsWithCheckpoints 範例,但它們的行為方式相同,消耗來自最後一個檢查點的事件而不是最近的事件。

我的期望: 我希望處理器開始使用從設備發送的最新事件,該設備每秒發送一條訊息。如何使用 azeventhubs Go SDK 實作此行為?

解決方法

我最初很難掌握 AMQP 的底層機制。不過,我很高興地報告,該問題已成功解決。

var wg sync.WaitGroup
wg.Add(1)

for _, partition := range p.PartitionIDs {
    go func(partition string) {
        defer wg.Done()

        partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
        if err != nil {
            panic(err)
        }

        receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
        defer cancel()

        for {
            events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)

            if err != nil && !errors.Is(err, context.DeadlineExceeded) {
                panic(err)
            }

            for _, evt := range events {
                fmt.Printf("partition: %s\n", partition)
                fmt.Printf("Body: %s\n", string(evt.Body))
            }
        }
    }(partition)
}

wg.Wait()

我對 Azure 客戶支援服務團隊提供的寶貴協助表示感謝。

以上是如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除