ホームページ >ウェブフロントエンド >jsチュートリアル >NodeJS によるメッセージキュー RabbitMQ の操作方法の分析

NodeJS によるメッセージキュー RabbitMQ の操作方法の分析

不言
不言オリジナル
2018-07-14 16:03:372837ブラウズ

この記事では、NodeJS がどのようにメッセージ キューを操作するかを主に紹介します。これには、必要な参考値があります。

1.

メッセージとは、アプリケーション間で送信されるデータを指します。メッセージは、テキスト文字列のみを含む非常に単純な場合もあれば、埋め込みオブジェクトを含む場合があるより複雑な場合もあります。

メッセージ キュー (Message Queue) は、メッセージを送信後すぐに返すことができるメッセージ システムです。メッセージ発行者はメッセージを MQ に発行するだけで、誰がメッセージを取得するかは気にしません。また、メッセージ コンシューマーは、誰が発行するかに関係なく、MQ からメッセージを取得するだけです。これにより、発行者もユーザーも相手の存在を知る必要がなくなります。

2. 一般的に使用されるメッセージ キューとは何ですか?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

現在でも、Redis などの一部の NoSQL はメッセージ キューとして使用できます。

3. メッセージキューの使用シナリオは何ですか?

  • 非同期処理

  • アプリケーションのデカップリング

  • トラフィックピークの削減

4. ユースケース

大規模企業は独自のログ分析システムをどのように実装していますか?

図: ユーザーがアプリケーションにアクセスするとき、ユーザーの操作記録とシステム例外ログを記録する必要があります。従来のアプローチは、システムによって生成されたログをサーバー ディスクに保存し、タイミングを有効にすることです。サーバーのタスクは、ログ情報をディスクから mq (プロデューサー) に定期的に転送し、mq からメッセージを定期的に抽出して、ElasticSearch や Hive などの対応するデータベースに保存することです。

5. RabbitMQ をインストールするには?

上記のケースでは、実際のプロジェクトで使用できる例として RabbitMQ を使用しています。

最初にbrewをインストールします(例としてMac)

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

RabbitMQをインストールします

brew install rabbitmq

RabbitMQを実行します

/usr/local/Cellar/rabbitmq/3.7.7に移動し、実行します

sbin/rabbitmq-server

プラグインを開始します

Go /usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management

管理インターフェイスにログインします

ブラウザを開いて次のように入力します: http://localhost:15672、RabbitMQ のデフォルト 15672 Nodejs は RabbitMQ を動作させます

対応するノード SDK がいくつか見つかりました。ここでは amqplib

1. プロデューサー

/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

2. Consumer

/**
 * 对RabbitMQ的封装
 */
let amqp = require(&#39;amqplib&#39;);

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. プロデューサーを通じて MQ にメッセージを送信し、キュー

let mq = new RabbitMQ();
mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => {
    console.log(error)
})
を作成します。

実行後、管理プラットフォームを開くと、RabbitMQ がメッセージ


を受信して​​いることがわかり、RabbitMQ は新しいキュー testQueue

4 指定されたキューのメッセージを取得します。

この時点で、RabbitMQ 管理プラットフォームを開くと、メッセージの数が 0 に変わりました

要約すると、メッセージ キューと RabbitMQ に関連する知識と、nodejs を介してメッセージを生成および消費する方法について簡単に説明しました。 。

上記がこの記事の全内容です。その他の関連コンテンツについては、PHP 中国語 Web サイトをご覧ください。

関連する推奨事項:

JavaScriptを使用してファイルダウンロード機能を実装する方法


node.jsを使用してbaidu-aip-SDKを呼び出し、IDカード認識機能を実装する

以上がNodeJS によるメッセージキュー RabbitMQ の操作方法の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。