首頁 >web前端 >js教程 >對於NodeJS如何操作訊息佇列RabbitMQ的分析

對於NodeJS如何操作訊息佇列RabbitMQ的分析

不言
不言原創
2018-07-14 16:03:372836瀏覽

這篇文章主要介紹了關於NodeJS如何操作訊息佇列RabbitMQ的分析,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下

一. 什麼是訊息佇列?

訊息(Message)是指在應用程式間傳送的資料。訊息可以非常簡單,例如只包含文字字串,也可以更複雜,可能包含嵌入物件。

訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息發佈者只管把訊息發佈到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 取訊息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

二. 常用的訊息佇列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至現在部分NoSQL也可做訊息佇列,如Redis。

三. 訊息佇列的使用場景?

  • 非同步處理

  • 應用解耦

  • 流量削峰

四. 使用案例

上規模的公司都會有自己的日誌分析系統,日誌系統是怎麼實現的呢?

 

圖解:使用者在存取應用程式的時候,我們要記錄使用者的操作記錄和系統的例外日誌,常規的做法是將系統產生的日誌保存到伺服器磁碟,在伺服器中開啟定時任務,定時將磁碟的日誌資訊傳入mq中(生產者),也定時將mq中的消息取出並存到對應的資料庫,如ElasticSearch或Hive中。

五. 如何安裝RabbitMQ?

上面的案例介紹了MQ的一個使用場景,我這裡是用RabbitMQ舉例,現實專案中可能用到的是Kafka。

先安裝brew(mac為例)

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

安裝RabbitMQ

brew install rabbitmq

執行RabbitMQ

進入到/usr/local/Cellar/rabbitmq/3.7 .7,執行

sbin/rabbitmq-server

啟動外掛

進入到/usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management

登陸管理介面

開啟瀏覽器輸入:http://localhost:15672,RabbitMQ預設15672連接埠六. Nodejs操作RabbitMQ

     

網路上可以找到好幾個對應的Node SDK,這裡推薦amqplib

1.生產者

/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

2. 消費者

/**
 * 对RabbitMQ的封装
 */
let amqp = require(&#39;amqplib&#39;);

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. 透過生產者向MQ發送一個訊息,並建立佇列

let mq = new RabbitMQ();
mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => {
    console.log(error)
})

執行之後,我們開啟管理平台,發現RabbbitMQ已經接受到了一個訊息:

並且RabbbitMQ新增了一個佇列testQueue

4. 取得指定佇列的訊息

let mq = new RabbitMQ();
mq.receiveQueueMsg(&#39;testQueue&#39;,(msg) => {    
   console.log(msg)
})// 输出结果:my first message复制代码

此時開啟RabbitMQ管理平台,訊息數量已經變成0

綜上:我們簡單講述了訊息佇列及RabbitMQ相關的一些知識,以及我們如何透過nodejs來生產與消費訊息。

以上就是本文的全部內容,希望對大家的學習有所幫助,更多相關內容請關注PHP中文網!

相關推薦:

JavaScript如何實作檔案的下載功能

#透過node.js來調取baidu-aip -SDK實作身分證辨識的功能

以上是對於NodeJS如何操作訊息佇列RabbitMQ的分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn