Heim >Backend-Entwicklung >Golang >Vergleich und Auswahl mehrerer Nachrichtenmodi mit RabbitMQ in Golang

Vergleich und Auswahl mehrerer Nachrichtenmodi mit RabbitMQ in Golang

WBOY
WBOYOriginal
2023-09-28 12:10:441367Durchsuche

Vergleich und Auswahl mehrerer Nachrichtenmodi mit RabbitMQ in Golang

Vergleich und Auswahl mehrerer Nachrichtenmodi mit RabbitMQ in Golang

Einführung:
In verteilten Systemen ist die Nachrichtenwarteschlange ein gängiger Kommunikationsmechanismus, der verwendet wird, um Sender und Empfänger einer Nachricht zu entkoppeln und eine asynchrone Kommunikation zu implementieren. RabbitMQ ist derzeit eine der beliebtesten Nachrichtenwarteschlangen und bietet Entwicklern eine Vielzahl von Nachrichtenmodi zur Auswahl. In diesem Artikel werden die vier klassischen Nachrichtenmodi in RabbitMQ, nämlich einfache Warteschlange, Arbeitswarteschlange, Veröffentlichungs-/Abonnementmodus und Themenmodus, verglichen, ihre Eigenschaften und anwendbaren Szenarien analysiert und Golang-Beispielcode gegeben.

1. Simple Queue

Simple Queue ist der einfachste Messaging-Modus in RabbitMQ, der eine Nachricht an einen Verbraucher sendet. Nachrichten werden an die Warteschlange gesendet und wiederum von einem Verbraucher gelesen.

Eigenschaften:

  1. Eine Nachricht kann nur von einem Verbraucher konsumiert werden.
  2. Wenn mehrere Verbraucher dieselbe Warteschlange abhören, werden die Nachrichten gleichmäßig an die Verbraucher verteilt.
  3. Verbraucher mit hoher Verarbeitungsgeschwindigkeit werden mehr Nachrichten konsumieren.

Anwendbare Szenarien:

  1. Anwendungsszenarien, in denen Aufgaben oder Nachrichten auf mehrere Arbeitseinheiten verteilt werden müssen, z. B. Protokollerfassung, Aufgabenverteilung usw.

Beispielcode:

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "simple_queue",
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

2. Arbeitswarteschlange (Arbeitswarteschlange)

Der Arbeitswarteschlangenmodus ist ein Nachrichtenlastausgleichsmechanismus, der mehrere Verbraucher verwendet, um Nachrichten in einer Warteschlange gemeinsam zu verarbeiten. Bei Verwendung des Arbeitswarteschlangenmodus werden Nachrichten an die Warteschlange gesendet und von den Verbrauchern der Reihe nach abgerufen und verarbeitet.

Eigenschaften:

  1. Eine Nachricht kann nur von einem Verbraucher verarbeitet werden.
  2. Die von jedem Verbraucher verarbeiteten Aufgaben sind relativ gleich, dh Verbraucher mit hoher Verarbeitungsgeschwindigkeit verarbeiten mehr Nachrichten.

Anwendbare Szenarien:

  1. Hintergrundaufgabenverarbeitung wie Bildverarbeitung, Videotranskodierung usw.

Beispielcode:

package main

import (
    "log"
    "os"
    "strconv"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "work_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    body := bodyFrom(os.Args)
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "Hello, World!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return strconv.Itoa(os.Getpid()) + ":" + s
}

3. Publish/Subscribe-Modus (Publish/Subscribe)

Im Publish/Subscribe-Modus werden Nachrichten an alle Abonnenten gesendet. Jeder Abonnent erhält die gleiche Nachricht.

Funktionen:

  1. Jede Nachricht wird an alle Abonnenten gesendet.
  2. Verschiedene Abonnenten können unterschiedliche Verarbeitungslogiken für Nachrichten haben.

Anwendbare Szenarien:

  1. Broadcast-Nachrichten, wie Protokoll-Broadcast, Benachrichtigungs-Broadcast usw.

Beispielcode:

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name,
        "",
        "logs",
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

4. Themenmodus (Thema)

Der Themenmodus ist ein komplexerer Nachrichtenmodus, der Nachrichten an Abonnenten sendet, die dem Thema entsprechen, basierend auf den Platzhalterregeln des Themas.

Funktionen:

  1. Nachrichten werden über passende Themenregeln weitergeleitet.
  2. Unterstützt den Themenabgleich in Wildcard-Form.
  3. Verschiedene Abonnenten können je nach Themen, die sie interessieren, abonnieren.

Anwendbare Szenarien:

  1. Szenarien, die eine Nachrichtenfilterung und -weiterleitung basierend auf Themen erfordern.

Beispielcode:

package main

import (
    "log"
    "os"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare an exchange")

    severity := severityFrom(os.Args)
    body := bodyFrom(os.Args)

    err = ch.Publish(
        "direct_logs",
        severity,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        },
    )
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func severityFrom(args []string) string {
    var severity string
    if len(args) < 3 || os.Args[2] == "" {
        severity = "info"
    } else {
        severity = os.Args[2]
    }
    return severity
}

func bodyFrom(args []string) string {
    var s string
    if len(args) < 4 || os.Args[3] == "" {
        s = "Hello, World!"
    } else {
        s = strings.Join(args[3:], " ")
    }
    return s
}

Zusammenfassung:
RabbitMQ verfügt als leistungsstarkes Nachrichtenwarteschlangensystem über umfangreiche Nachrichtenmodi, um den Anforderungen verschiedener Szenarien gerecht zu werden. Entsprechend den tatsächlichen Geschäftsanforderungen können Sie den entsprechenden Nachrichtenmodus auswählen. Dieser Artikel vergleicht vier typische Nachrichtenmodi: einfache Warteschlange, Arbeitswarteschlange, Veröffentlichungs-/Abonnementmodus und Themenmodus und gibt den entsprechenden Golang-Beispielcode. Entwickler können den geeigneten Nachrichtenmodus auswählen, um ein verteiltes System basierend auf ihren Anforderungen aufzubauen.

Das obige ist der detaillierte Inhalt vonVergleich und Auswahl mehrerer Nachrichtenmodi mit RabbitMQ in Golang. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn