首頁 >後端開發 >Golang >使用自訂處理程序 nats golang 保留訂閱方法

使用自訂處理程序 nats golang 保留訂閱方法

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB轉載
2024-02-09 08:30:331165瀏覽

使用自定义处理程序 nats golang 保留订阅方法

php小編香蕉在本文中將為你介紹如何使用自訂處理程序 nats golang 來保留訂閱方法。在開發過程中,我們經常需要使用訊息佇列來處理非同步任務,而 nats golang 是一個輕量級的訊息佇列系統,具有高效能和可擴展性。透過自訂處理程序,我們可以保留訂閱方法,實現更靈活的訊息處理和流程控制。下面我們將詳細介紹如何在 nats golang 中實現這項功能。

問題內容

我正在golang 的nats 用戶端之上編寫包裝器,我想獲取處理函數,一旦我從nats 伺服器收到訊息,就可以從消費者呼叫該函數。 我想保留自訂訂閱方法,直到它收到 nats 的訊息。

發布:

func (busconfig busconfig) publish(service string, data []byte) error {
    puberr := conn.publish(service, data)
    if puberr != nil {
        return puberr
    }
    return nil
}

訂閱:

func (busconfig busconfig) subscribe(subject string, handler func(msg []byte)) {
    fmt.println("subscrbing on : ", subject)

    //wg := sync.waitgroup{}
    //wg.add(1)
    subscription, err := conn.subscribe(subject, func(msg *nats.msg) {
        go func() {
            handler(msg.data)
        }()
        //wg.done()
    })
    if err != nil {
        fmt.println("subscriber error : ", err)
    }
    //wg.wait()
    defer subscription.unsubscribe()

}

測試案例:

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("Life cycle event received :", string(input))
    })

    busClient.Publish(SUBJECT, []byte("complete notification"))
}

我看到訊息已發布但未訂閱,我嘗試使用 waitgroup 保留訂閱方法,但我認為這不是正確的解決方案。

解決方法

您看不到正在傳遞的訊息,因為 Subscribe 是一個非同步方法,它會產生一個 goroutine 來處理傳入訊息並呼叫回呼。

呼叫 busClient.Publish() 之後,您的應用程式立即退出。它不會等待 Subscribe() 內部發生任何事情。

當您使用 nats.Subscribe() 時,您通常會有一個長時間運行的應用程序,該應用程式會在特定條件下退出(例如收到關閉訊號)。 WaitGroup 可以在這裡工作,但可能不適用於實際應用程序,僅用於測試。

您還應該在 NATS 連線上呼叫 Flush() 方法,以確保在退出程式之前已發送所有緩衝的訊息。

如果想要同步方法,可以使用nats.SubscribeSync()

#查看範例:https://natsbyexample.com/examples/messaging/發布-訂閱/執行

以上是使用自訂處理程序 nats golang 保留訂閱方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除