Maison  >  Article  >  interface Web  >  Analyse de la façon dont NodeJS fonctionne la file d'attente de messages RabbitMQ

Analyse de la façon dont NodeJS fonctionne la file d'attente de messages RabbitMQ

不言
不言original
2018-07-14 16:03:372774parcourir

Cet article présente principalement l'analyse de la façon dont NodeJS exploite la file d'attente de messages RabbitMQ. Il a une certaine valeur de référence. Maintenant, je le partage avec vous. Les amis dans le besoin peuvent s'y référer

1. Qu'est-ce qu'un. file d'attente des messages ?

Le message fait référence aux données transmises entre les applications. Les messages peuvent être très simples, contenant uniquement des chaînes de texte, ou plus complexes, contenant éventuellement des objets incorporés.

Message Queue (Message Queue) est une méthode de communication entre les applications. Les messages peuvent être renvoyés immédiatement après avoir été envoyés. Le système de messagerie garantit une livraison fiable des messages. L'éditeur du message publie simplement le message sur MQ et ne se soucie pas de savoir qui le reçoit, et le consommateur du message reçoit simplement le message de MQ, quel que soit celui qui le publie. De cette manière, ni l’éditeur ni l’utilisateur n’ont besoin de connaître l’existence de l’autre partie.

2. Quelles sont les files d'attente de messages couramment utilisées ?

RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMq.

Même maintenant, certains NoSQL peuvent également être utilisés comme files d'attente de messages, comme Redis.

3. Scénarios d'utilisation de la file d'attente des messages ?

  • Traitement asynchrone

  • Découplage applicatif

  • Réduction des pics de trafic

4. Cas d'utilisation

Les entreprises à grande échelle auront leur propre système d'analyse des journaux. Comment le système de journaux est-il mis en œuvre ?

Illustration : Lorsqu'un utilisateur accède à une application, nous devons enregistrer l'enregistrement des opérations de l'utilisateur et le journal des exceptions du système. is Il enregistre les journaux générés par le système sur le disque du serveur, démarre les tâches planifiées sur le serveur, transfère régulièrement les informations des journaux du disque vers mq (producteur), et extrait également régulièrement les messages dans mq et les stocke dans le correspondant. base de données, telle qu'ElasticSearch ou Hive.

5. Comment installer RabbitMQ ?

Le cas ci-dessus présente un scénario d'utilisation de MQ. J'utilise RabbitMQ comme exemple qui peut être utilisé dans des projets réels.

Installez d'abord Brew (mac à titre d'exemple)

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

Installez RabbitMQ

brew install rabbitmq

Exécutez RabbitMQ

Entrez /usr/local/Cellar /rabbitmq/3.7.7, exécutez

sbin/rabbitmq-server

pour démarrer le plug-in

et saisissez /usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management

Connectez-vous à l'interface de gestion

Ouvrez le navigateur et saisissez : http://localhost:15672, le port par défaut de RabbitMQ 15672 est six. Nodejs exploite RabbitMQ

Vous pouvez trouver plusieurs SDK Node correspondants sur Internet, nous recommandons ici amqplib

1 Producer

/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

2.

/**
 * 对RabbitMQ的封装
 */
let amqp = require(&#39;amqplib&#39;);

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }
3. Passer Le producteur envoie un message à MQ et crée une file d'attente

let mq = new RabbitMQ();
mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => {
    console.log(error)
})
Après l'exécution, nous ouvrons la plateforme de gestion et constatons que RabbitMQ a reçu un message :


Et RabbitMQ a ajouté une nouvelle file d'attente testQueue

4. Obtenez le message de la file d'attente spécifiée

let mq = new RabbitMQ();
mq.receiveQueueMsg(&#39;testQueue&#39;,(msg) => {    
   console.log(msg)
})// 输出结果:my first message复制代码
Ouvrez la plateforme de gestion RabbitMQ à ce moment, Le nombre de messages est devenu 0

En résumé : Nous avons brièvement parlé de certaines connaissances liées aux files d'attente de messages et RabbitMQ, et comment nous produisons et consommons des messages via nodejs.

Ce qui précède représente l'intégralité du contenu de cet article. J'espère qu'il sera utile à l'étude de chacun. Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois !

Recommandations associées :

Comment implémenter la fonction de téléchargement de fichiers à l'aide de JavaScript

Appel de baidu-aip via node.js -SDK réalise la fonction de reconnaissance de carte d'identité

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn