Maison >développement back-end >Golang >Comparaison et sélection de plusieurs modes de message à l'aide de RabbitMQ dans Golang

Comparaison et sélection de plusieurs modes de message à l'aide de RabbitMQ dans Golang

WBOY
WBOYoriginal
2023-09-28 12:10:441352parcourir

Comparaison et sélection de plusieurs modes de message à laide de RabbitMQ dans Golang

Comparaison et sélection de plusieurs modes de message à l'aide de RabbitMQ dans Golang

Introduction :
Dans les systèmes distribués, la file d'attente de messages est un mécanisme de communication courant utilisé pour découpler l'expéditeur et le destinataire d'un message et mettre en œuvre une communication asynchrone. RabbitMQ, l'une des files d'attente de messages les plus populaires à l'heure actuelle, propose une variété de modes de message parmi lesquels les développeurs peuvent choisir. Cet article comparera les quatre modes de message classiques dans RabbitMQ, à savoir la file d'attente simple, la file d'attente de travail, le mode publication/abonnement et le mode sujet, analysera leurs caractéristiques et les scénarios applicables, et donnera un exemple de code Golang.

1. Simple Queue

Simple Queue est le mode de messagerie le plus basique de RabbitMQ, qui envoie un message à un consommateur. Les messages sont envoyés à la file d'attente et lus à leur tour par un consommateur.

Caractéristiques :

  1. Un message ne peut être consommé que par un seul consommateur.
  2. Si plusieurs consommateurs écoutent la même file d'attente, les messages seront distribués de manière égale aux consommateurs.
  3. Les consommateurs ayant une vitesse de traitement rapide consommeront plus de messages.

Scénarios applicables :

  1. Scénarios d'application dans lesquels des tâches ou des messages doivent être distribués à plusieurs unités de travail, tels que la collecte de journaux, la distribution de tâches, etc.

Exemple de code :

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "simple_queue",
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

2. File d'attente de travail (Work Queue)

Le mode de file d'attente de travail est un mécanisme d'équilibrage de charge de messages qui utilise plusieurs consommateurs pour traiter conjointement les messages dans une file d'attente. Lors de l'utilisation du mode file d'attente de travail, les messages sont envoyés à la file d'attente et obtenus et traités par les consommateurs dans l'ordre.

Caractéristiques :

  1. Un message ne peut être traité que par un seul consommateur.
  2. Les tâches traitées par chaque consommateur sont relativement égales, c'est-à-dire que les consommateurs ayant une vitesse de traitement rapide traiteront plus de messages.

Scénarios applicables :

  1. Traitement des tâches en arrière-plan, telles que le traitement d'images, le transcodage vidéo, etc.

Exemple de code :

package main

import (
    "log"
    "os"
    "strconv"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "work_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    body := bodyFrom(os.Args)
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "Hello, World!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return strconv.Itoa(os.Getpid()) + ":" + s
}

3. Mode publication/abonnement (Publish/Subscribe)

En mode publication/abonnement, les messages sont diffusés à tous les abonnés. Chaque abonné recevra le même message.

Caractéristiques :

  1. Chaque message sera diffusé à tous les abonnés.
  2. Différents abonnés peuvent avoir une logique de traitement différente pour les messages.

Scénarios applicables :

  1. Diffusion de messages, tels que la diffusion de journaux, la diffusion de notifications, etc.

Exemple de code :

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name,
        "",
        "logs",
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

4. Mode sujet (Sujet)

Le mode sujet est un mode de message plus complexe, qui envoie des messages aux abonnés correspondant au sujet en fonction des règles génériques du sujet.

Caractéristiques :

  1. Les messages sont acheminés via des règles de correspondance de sujets.
  2. Prend en charge la correspondance de sujets sous forme de caractère générique.
  3. Différents abonnés peuvent s'abonner selon les sujets qui les intéressent.

Scénarios applicables :

  1. Scénarios qui nécessitent un filtrage et un routage des messages en fonction des sujets.

Exemple de code :

package main

import (
    "log"
    "os"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare an exchange")

    severity := severityFrom(os.Args)
    body := bodyFrom(os.Args)

    err = ch.Publish(
        "direct_logs",
        severity,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        },
    )
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func severityFrom(args []string) string {
    var severity string
    if len(args) < 3 || os.Args[2] == "" {
        severity = "info"
    } else {
        severity = os.Args[2]
    }
    return severity
}

func bodyFrom(args []string) string {
    var s string
    if len(args) < 4 || os.Args[3] == "" {
        s = "Hello, World!"
    } else {
        s = strings.Join(args[3:], " ")
    }
    return s
}

Résumé :
RabbitMQ, en tant que système de file d'attente de messages haute performance, dispose de modes de message riches pour répondre aux besoins de différents scénarios. Selon les besoins réels de votre entreprise, vous pouvez choisir le mode de message correspondant. Cet article compare quatre modes de message typiques : file d'attente simple, file d'attente de travail, mode publication/abonnement et mode sujet, et donne l'exemple de code Golang correspondant. Les développeurs peuvent choisir le mode de message approprié pour créer un système distribué en fonction de leurs besoins.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn