php小編草莓為大家帶來了一篇關於如何使用Azure Event Hubs Go SDK (azeventhubs)使用最新事件的指南。 Azure Event Hubs是一種高可擴展、即時資料傳輸服務,可用於處理大量事件資料。在本指南中,我們將介紹如何使用azeventhubs套件來與Azure Event Hubs進行交互,並示範如何使用最新事件功能來取得即時資料。透過本文的指導,您將能夠輕鬆地在Go應用程式中使用Azure Event Hubs,並利用最新事件功能來獲取與處理即時數據。
我正在從 azure-event-hubs-go/v3 遷移到較新的 azeventhubs Go SDK。在舊版 SDK 中,有一個 ReceiveOption 參數,讓我可以指定從哪裡開始消費事件。
在新的 SDK 中,我使用以下程式碼來初始化處理器:
processor, err := azeventhubs.NewProcessor( e.ConsumerClient, checkpointStore, &azeventhubs.ProcessorOptions{ UpdateInterval: time.Second, Prefetch: 0, StartPositions: azeventhubs.StartPositions{ Default: azeventhubs.StartPosition{ Latest: to.Ptr(true), EnqueuedTime: to.Ptr(time.Now()), Inclusive: true } } } )
但是,我注意到事件是從最後一個檢查點而不是最近發送的事件中消耗的。
我嘗試過的: 我已經嘗試過 ConsumingEventsUsingConsumerClient 和 ConsumingEventsWithCheckpoints 範例,但它們的行為方式相同,消耗來自最後一個檢查點的事件而不是最近的事件。
我的期望: 我希望處理器開始使用從設備發送的最新事件,該設備每秒發送一條訊息。如何使用 azeventhubs Go SDK 實作此行為?
我最初很難掌握 AMQP 的底層機制。不過,我很高興地報告,該問題已成功解決。
var wg sync.WaitGroup wg.Add(1) for _, partition := range p.PartitionIDs { go func(partition string) { defer wg.Done() partitionClient, err := consumerClient.NewPartitionClient(partition, nil) if err != nil { panic(err) } receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30) defer cancel() for { events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil) if err != nil && !errors.Is(err, context.DeadlineExceeded) { panic(err) } for _, evt := range events { fmt.Printf("partition: %s\n", partition) fmt.Printf("Body: %s\n", string(evt.Body)) } } }(partition) } wg.Wait()
我對 Azure 客戶支援服務團隊提供的寶貴協助表示感謝。
以上是如何使用 Azure Event Hubs Go SDK (azeventhubs) 使用最新事件?的詳細內容。更多資訊請關注PHP中文網其他相關文章!