Home >Backend Development >Golang >Using custom handler nats golang retain subscription method

Using custom handler nats golang retain subscription method

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBforward
2024-02-09 08:30:331160browse

使用自定义处理程序 nats golang 保留订阅方法

php editor Banana will introduce you to how to use a custom handler nats golang to retain the subscription method in this article. During the development process, we often need to use message queues to handle asynchronous tasks, and nats golang is a lightweight message queue system with high performance and scalability. By customizing the handler, we can retain the subscription method and achieve more flexible message processing and process control. Below we will introduce in detail how to implement this function in nats golang.

Question content

I am writing a wrapper on top of golang's nats client and I want to get a handler function that can be called from the consumer once I receive a message from the nats server . I want to keep the custom subscription method until it receives the message from nats.

release:

func (busconfig busconfig) publish(service string, data []byte) error {
    puberr := conn.publish(service, data)
    if puberr != nil {
        return puberr
    }
    return nil
}

subscription:

func (busconfig busconfig) subscribe(subject string, handler func(msg []byte)) {
    fmt.println("subscrbing on : ", subject)

    //wg := sync.waitgroup{}
    //wg.add(1)
    subscription, err := conn.subscribe(subject, func(msg *nats.msg) {
        go func() {
            handler(msg.data)
        }()
        //wg.done()
    })
    if err != nil {
        fmt.println("subscriber error : ", err)
    }
    //wg.wait()
    defer subscription.unsubscribe()

}

Test case:

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("Life cycle event received :", string(input))
    })

    busClient.Publish(SUBJECT, []byte("complete notification"))
}

I see that the message is published but not subscribed, I tried using waitgroup retain subscription method but I think it is not the right solution.

Workaround

You cannot see the message being delivered because Subscribe is an asynchronous method that generates a goroutine to handle the incoming message and call the callback.

Your application exits immediately after calling busClient.Publish(). It doesn't wait for anything to happen inside Subscribe().

When you use nats.Subscribe() you typically have a long-running application that exits under certain conditions (such as receiving a shutdown signal). WaitGroup will work here, but probably not for real applications, just for testing.

You should also call the Flush() method on the NATS connection to ensure that all buffered messages have been sent before exiting the program.

If you want a synchronized method, you can use nats.SubscribeSync()

View example: https://natsbyexample.com/examples/messaging/publish-subscribe/execute

The above is the detailed content of Using custom handler nats golang retain subscription method. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete