Maison >développement back-end >Golang >Utilisation du gestionnaire personnalisé Nats Golang Retainer la méthode d'abonnement

Utilisation du gestionnaire personnalisé Nats Golang Retainer la méthode d'abonnement

WBOY
WBOYavant
2024-02-09 08:30:331153parcourir

使用自定义处理程序 nats golang 保留订阅方法

l'éditeur php Banana vous présentera comment utiliser un gestionnaire personnalisé nats golang pour conserver la méthode d'abonnement dans cet article. Pendant le processus de développement, nous devons souvent utiliser des files d'attente de messages pour gérer des tâches asynchrones, et nats golang est un système de file d'attente de messages léger avec des performances et une évolutivité élevées. En personnalisant le gestionnaire, nous pouvons conserver la méthode d'abonnement et obtenir un traitement des messages et un contrôle des processus plus flexibles. Ci-dessous, nous présenterons en détail comment implémenter cette fonction dans Nats Golang.

Contenu de la question

J'écris un wrapper au-dessus du client nats dans Golang et je souhaite obtenir une fonction de gestionnaire que je peux appeler du consommateur une fois que je reçois un message du serveur nats. Je souhaite conserver la méthode d'abonnement personnalisée jusqu'à ce qu'elle reçoive le message de Nats.

Publié par :

func (busconfig busconfig) publish(service string, data []byte) error {
    puberr := conn.publish(service, data)
    if puberr != nil {
        return puberr
    }
    return nil
}

Abonnez-vous :

func (busconfig busconfig) subscribe(subject string, handler func(msg []byte)) {
    fmt.println("subscrbing on : ", subject)

    //wg := sync.waitgroup{}
    //wg.add(1)
    subscription, err := conn.subscribe(subject, func(msg *nats.msg) {
        go func() {
            handler(msg.data)
        }()
        //wg.done()
    })
    if err != nil {
        fmt.println("subscriber error : ", err)
    }
    //wg.wait()
    defer subscription.unsubscribe()

}

Cas de test :

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("Life cycle event received :", string(input))
    })

    busClient.Publish(SUBJECT, []byte("complete notification"))
}

Je vois que le message est publié mais pas abonné, j'ai essayé d'utiliser waitgroup pour conserver la méthode d'abonnement, mais je pense que ce n'est pas la bonne solution.

Solution de contournement

Vous ne pouvez pas voir le message en cours de transmission car Subscribe est une méthode asynchrone qui génère une goroutine pour gérer le message entrant et appeler le rappel.

AppelbusClient.Publish() 之后,您的应用程序立即退出。它不会等待 Subscribe() sur tout ce qui se passe en interne.

Lorsque vous utilisez nats.Subscribe(), vous disposez généralement d'une application de longue durée qui se ferme dans certaines conditions (comme la réception d'un signal d'arrêt). WaitGroup fonctionnera ici, mais probablement pas pour de vraies applications, juste à des fins de test.

Vous devez également appeler la méthode Flush() sur la connexion NATS pour vous assurer que tous les messages mis en mémoire tampon sont envoyés avant de quitter le programme.

Si vous souhaitez une méthode synchronisée, vous pouvez utiliser nats.SubscribeSync()

Voir l'exemple : https://natsbyexample.com/examples/messaging/publish-subscribe/execute

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer