RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queuing Protocol)的開源實作。
AMQP :高階訊息佇列協議,是應用層協定的一個開放標準,為面向訊息的中間件設計。訊息中間件主要用於元件之間的解耦,訊息的發送者無需知道訊息使用者的存在,反之亦然。 AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和發布/訂閱)、可靠性、安全性。 RabbitMQ是一個開源的AMQP實現,伺服器端以Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴展性、高可用性等方面表現不俗。
RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
- 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 - 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
- Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 - Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Routing Key 路由关键字,exchange根据这个关键字进行消息投递。 - Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection 网络连接,比如一个TCP连接。 - Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker 表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange分發訊息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。 headers 匹配AMQP 訊息的header 而不是路由鍵,此外headers 交換器和direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
# directtopic 交換器透過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要綁定到一個模式上。它將路由鍵和綁定鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會辨識兩個通配符:符號「#」和符號「」。 #配對0個或多個單字,配對不多不少一個單字。
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的物件。
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
要持久化队列queue的持久化需要在声明时指定durable=True;
这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复
消息持久化包括3部分
- 1.exchange持久化,在声明时指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 2.queue持久化,在声明时指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.
注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
关于持久化的进一步讨论: 为了数据不丢失,我们采用了: 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。 持久化Message,理由同上。 但是这样能保证数据100%不丢失吗?答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。那么商业系统会做什么呢?一种可能的方案是在系统panic时或者异常重启时或者断电时,应该给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。
你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它不管Consumer是否还有unacked Message,只是按照这个默认的机制进行分发.
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢?
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它
1
channel.basic_qos(prefetch_count=1)
注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。
RabbitMQ使用ProtoBuf序列化消息,它可作为RabbitMQ的Message的数据格式进行传输,由于是结构化的数据,这样就极大的方便了Consumer的数据高效处理,当然也可以使用XML,与XML相比, ProtoBuf有以下优势:
1.简单
2.size小了3-10倍
3.速度快了20-100倍
4.易于编程
6.减少了语义的歧义.
,ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ 中实现RPC 的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties ,在AMQP 协议中定义了14中properties ,这些属性会随着消息一起发送)中设置两个值replyTo (一个Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue 中)和correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
服务器端收到消息并处理
服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue ,同时带上correlationId 属性
客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理
按照目前網路上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。
ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化訊息主要是指我們機器在不可抗力因素等情況下掛掉了,訊息不會遺失的機制。
可靠性、靈活的路由、叢集、事務、高可用的佇列、訊息排序、問題追蹤、視覺化管理工具、插件系統等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。當然ZeroMq 也可以做到,不過自己必須手動寫程式碼實現,程式碼量不小。尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性。
毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高並發高可用的erlang 語言。
RabbitMq 比Kafka 成熟,在可用性上,穩定性上,可靠性上, RabbitMq 勝於 Kafka (理論上)。
另外,Kafka 的定位主要在日誌等方面, 因為Kafka 設計的初衷就是處理日誌的,可以看做是一個日誌(訊息)系統一個重要元件,針對性很強,所以如果業務方面還是建議選擇RabbitMq 。
還有就是,Kafka 的效能(吞吐量、TPS )比RabbitMq 高出來很多。
選型最後總結:
如果我們系統中已經有選擇 Kafka ,或 RabbitMq ,並且完全可以滿足現在的業務,建議就不用重複去增加和造輪子。
可以在 Kafka 和 RabbitMq 中選擇一個適合自己團隊和業務的,這才是最重要的。但毋庸置疑現階段,綜合考量沒有第三選擇。
更多Laravel相關技術文章,請造訪Laravel教學專欄進行學習!
以上是RabbitMQ的應用場景以及基本原理介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!