>  기사  >  웹 프론트엔드  >  NodeJS가 메시지 큐 RabbitMQ를 운영하는 방법 분석

NodeJS가 메시지 큐 RabbitMQ를 운영하는 방법 분석

不言
不言원래의
2018-07-14 16:03:372772검색

이 글은 NodeJS가 메시지 큐 RabbitMQ를 어떻게 운영하는지에 대한 분석을 주로 소개합니다. 이제 여러분과 공유합니다. 도움이 필요한 친구들이 참고할 수 있습니다.

1.

메시지는 애플리케이션 간에 전송되는 데이터를 의미합니다. 메시지는 텍스트 문자열만 포함하는 매우 간단한 메시지일 수도 있고, 포함된 개체를 포함하는 더 복잡한 메시지일 수도 있습니다.

메시지 큐(Message Queue)는 애플리케이션 간의 통신 방법으로, 메시지가 전송된 후 즉시 반환될 수 있습니다. 메시지 시스템은 안정적인 메시지 전달을 보장합니다. 메시지 게시자는 메시지를 MQ에 게시하고 누가 메시지를 받는지는 상관하지 않으며, 메시지 소비자는 누가 게시하는지에 관계없이 MQ에서 메시지를 받습니다. 이렇게 하면 게시자도 사용자도 상대방의 존재를 알 필요가 없습니다.

2. 일반적으로 사용되는 메시지 대기열은 무엇인가요?

RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMq.

지금도 일부 NoSQL은 Redis와 같은 메시지 대기열로 사용될 수 있습니다.

3. 메시지 대기열의 사용 시나리오는 무엇입니까?

  • 비동기 처리

  • 애플리케이션 분리

  • 트래픽 피크 감소

4. 사용 사례

대규모 회사에는 자체 로그 분석 시스템이 어떻게 구현되나요?

예: 사용자가 애플리케이션에 액세스하면 사용자의 작업 기록과 시스템 예외 로그를 ​​기록해야 합니다. 일반적인 접근 방식은 시스템에서 생성된 로그를 서버 디스크에 저장하고 타이밍을 활성화하는 것입니다. 서버는 정기적으로 디스크의 로그 정보를 mq(생산자)로 전송하고, 정기적으로 mq에서 메시지를 추출하여 ElasticSearch 또는 Hive와 같은 해당 데이터베이스에 저장하는 작업을 수행합니다.

5. RabbitMQ를 설치하는 방법은 무엇인가요?

위 사례는 MQ의 사용 시나리오를 소개합니다. 여기서는 실제 프로젝트에서 Kafka를 사용할 수 있도록 RabbitMQ를 사용하고 있습니다.

Brew를 먼저 설치하세요(예: Mac)

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

RabbitMQ 설치

brew install rabbitmq

RabbitMQ 실행

/usr/local/Cellar/rabbitmq/3.7.7로 이동하여

sbin/rabbitmq-server

플러그인 시작

Go을 실행하세요. /usr/local/Cellar/rabbitmq/3.7.7/sbin

./rabbitmq-plugins enable rabbitmq_management

관리 인터페이스에 로그인

브라우저를 열고 다음을 입력하세요: http://localhost:15672, RabbitMQ 기본 15672 포트 6은 Nodejs에서 RabbitMQ를 운영합니다

온라인에서 사용 가능합니다. 해당 Node SDK를 여러 개 찾았습니다. 여기에서 amqplib

1을 추천합니다. Producer

/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

2. Consumer

/**
 * 对RabbitMQ的封装
 */
let amqp = require(&#39;amqplib&#39;);

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3를 통해 메시지를 MQ에 보내고 대기열을 만듭니다.

실행 후 관리 플랫폼을 열면 RabbitMQ가


메시지를 받았고 RabbitMQ는 새로운 대기열 testQueue

4을 추가했습니다. 지정된 대기열의 메시지를 가져옵니다

let mq = new RabbitMQ();
mq.sendQueueMsg(&#39;testQueue&#39;, &#39;my first message&#39;, (error) => {
    console.log(error)
})
.

이번에 RabbitMQ 관리 플랫폼을 오픈하고 메시지 개수가 0

으로 변경되었습니다. 요약: 메시지 큐와 RabbitMQ에 관련된 몇 가지 지식과 nodejs를 통해 메시지를 생성하고 소비하는 방법에 대해 간략하게 이야기했습니다. .

위 내용은 이 글의 전체 내용입니다. 모든 분들의 학습에 도움이 되었으면 좋겠습니다. 더 많은 관련 내용은 PHP 중국어 홈페이지를 주목해주세요!

관련 권장 사항:

JavaScript를 사용하여 파일 다운로드 기능을 구현하는 방법

node.js를 사용하여 baidu-aip-SDK를 호출하여 ID 카드 인식 기능을 구현합니다

위 내용은 NodeJS가 메시지 큐 RabbitMQ를 운영하는 방법 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.