Heim  >  Artikel  >  Web-Frontend  >  Analyse, wie NodeJS die Nachrichtenwarteschlange RabbitMQ betreibt

Analyse, wie NodeJS die Nachrichtenwarteschlange RabbitMQ betreibt

不言
不言Original
2018-07-14 16:03:372739Durchsuche

Dieser Artikel stellt hauptsächlich die Analyse vor, wie NodeJS die Nachrichtenwarteschlange RabbitMQ betreibt. Es hat einen bestimmten Referenzwert. Jetzt können Freunde in Not darauf verweisen.

1. Was ist eine Nachricht? Warteschlange ?

Nachricht bezieht sich auf Daten, die zwischen Anwendungen übertragen werden. Nachrichten können sehr einfach sein und nur Textzeichenfolgen enthalten, oder komplexer sein und möglicherweise eingebettete Objekte enthalten.

Nachrichtenwarteschlange (Message Queue) ist eine Kommunikationsmethode zwischen Anwendungen. Nachrichten können sofort nach dem Senden zurückgegeben werden. Das Nachrichtensystem gewährleistet eine zuverlässige Zustellung von Nachrichten. Der Nachrichtenherausgeber veröffentlicht die Nachricht einfach in MQ und kümmert sich nicht darum, wer sie erhält, und der Nachrichtenkonsument erhält die Nachricht einfach von MQ, unabhängig davon, wer sie veröffentlicht. Auf diese Weise müssen weder der Herausgeber noch der Benutzer die Existenz der anderen Partei kennen.

2. Was sind die am häufigsten verwendeten Nachrichtenwarteschlangen?

RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMq.

Auch jetzt können einige NoSQL-Datenbanken auch als Nachrichtenwarteschlangen verwendet werden, beispielsweise Redis.

3. Nutzungsszenarien der Nachrichtenwarteschlange?

  • Asynchrone Verarbeitung

  • Anwendungsentkopplung

  • Verkehrsspitzenreduzierung

4. Anwendungsfälle

Großunternehmen verfügen über ein eigenes Protokollanalysesystem.

Abbildung: Wenn ein Benutzer auf eine Anwendung zugreift, müssen wir den Vorgangsdatensatz des Benutzers und das Ausnahmeprotokoll des Systems aufzeichnen. Der herkömmliche Ansatz Es speichert die vom System generierten Protokolle auf der Serverfestplatte, startet geplante Aufgaben auf dem Server, überträgt regelmäßig die Protokollinformationen von der Festplatte an mq (Produzent) und holt auch regelmäßig die Nachrichten in mq heraus und speichert sie im entsprechenden Verzeichnis Datenbank wie ElasticSearch oder Hive.

5. Wie installiere ich RabbitMQ?

Der obige Fall stellt ein Verwendungsszenario von MQ vor. Ich verwende RabbitMQ als Beispiel für die Verwendung in realen Projekten.

Brew zuerst installieren (Mac als Beispiel)

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

RabbitMQ installieren

brew install rabbitmq

RabbitMQ ausführen

Geben Sie /usr/local/Cellar/rabbitmq/3.7 ein .7. Führen Sie

sbin/rabbitmq-server

aus, um das Plug-in zu starten

und geben Sie /usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management

ein Verwaltungsschnittstelle

Öffnen Sie den Browser und geben Sie ein: http://localhost:15672, RabbitMQ ist standardmäßig auf Port 15672 eingestellt. Nodejs betreibt RabbitMQ

Node SDK, hier empfehlen wir amqplib

1. Produzent

/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

2. Verbraucher

/**
 * 对RabbitMQ的封装
 */
let amqp = require(&#39;amqplib&#39;);

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. Senden Sie eine Nachricht an MQ über den Produzenten und erstellen Sie eine Warteschlange

let mq = new RabbitMQ();
mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => {
    console.log(error)
})

Nach der Ausführung haben wir die Verwaltungsplattform geöffnet und stellte fest, dass RabbbitMQ eine Nachricht erhalten hatte:


und RabbitMQ fügte eine neue Warteschlange testQueue hinzu

4. Erhalten Sie Nachrichten aus der angegebenen Warteschlange

let mq = new RabbitMQ();
mq.receiveQueueMsg(&#39;testQueue&#39;,(msg) => {    
   console.log(msg)
})// 输出结果:my first message复制代码

Öffnen Sie zu diesem Zeitpunkt die RabbitMQ-Verwaltungsplattform und die Anzahl der Nachrichten ist 0 geworden

Zusammenfassend: Wir Ich habe kurz über einige Kenntnisse im Zusammenhang mit Nachrichtenwarteschlangen und RabbitMQ gesprochen und darüber, wie wir Nachrichten über NodeJS produzieren und konsumieren.

Das Obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, er wird für das Studium aller hilfreich sein. Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website.

Verwandte Empfehlungen:

So implementieren Sie die Datei-Download-Funktion mit JavaScript

Aufruf von baidu-aip über node.js -SDK realisiert die Funktion der ID-Kartenerkennung

Das obige ist der detaillierte Inhalt vonAnalyse, wie NodeJS die Nachrichtenwarteschlange RabbitMQ betreibt. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn