Home >Backend Development >PHP Tutorial >JMS中的讯息通信模型

JMS中的讯息通信模型

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOriginal
2016-06-13 12:17:021220browse

JMS中的消息通信模型

1. MQ简介:

JMS中的讯息通信模型

消息队列(Message Queue,简称MQ),是应用程序与应用程序之间的一种通信方法。应用程序通过发送和检索出入列队的针对应用程序的数据 - 消息来通信,而无需专用连接来链接它们。程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如RPC远程过程调用的技术。队列的使用消除了接收和发送应用程序需同时执行的要求。

2. JMS基本概念

JMS(Java Message Service) 即Java消息服务,是由Sun开发的。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS是一系列的接口及相关语义的集合,通过这些接口和和其中的方法,JMS客户端如何去访问消息系统,完成创建、发送、接收和读取企业消息系统中消息。

它支持两种消息通信模型:点对点模型(point-to-point、P2P)和发布者/订阅者模型(Pub/Sub)。P2P模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。

2.1 P2P模型 - 打电话模型

对于P2P模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。

JMS中的讯息通信模型

2.2 Pub/Sub模型 - 订报纸模型

与P2P模型不同,Pub/Sub模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

JMS中的讯息通信模型

3. 支持JMS的开源MQ - ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

主要特点:

  1. 多种语言和协议编写客户端。语言: Java、 C、 C++、 C#、 Ruby、 Perl、 Python、 PHP。应用协议: OpenWire、Stomp、Rest、WSNotification、XMPP、AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  4. 支持高速的消息持久化
  5. 从设计上保证了高性能的集群,客户端-服务器,点对点
  6. 支持Ajax

注:

在查询资料的过程中发现,PHP与ActiveMQ整合的方式可以直接使用Rest接口调用,但更高效的方式应该是安装Stomp协议的PHP扩展。

Stomp 提供了客户端和代理之间进行广泛消息传输的框架。Stomp 是一个非常简单而且易用的通讯协议实现,尽管代理端的编写可能非常复杂,但是编写一个 Stomp 客户端却是很简单的事情,另外你可以使用 Telnet 来与你的 Stomp 代理进行交互。

PHP可以通过PECL编译安装Stomp扩展。

stomp_common.php

<code>//connection ActiveMQ    function openMQ(&$queue=FALSE) {    $amq = array(        &#39;url&#39; => &#39;tcp://127.0.0.1:61613&#39;,        &#39;id&#39; => &#39;xxx&#39;,        &#39;pswd&#39; => &#39;xxx&#39;,        &#39;queue&#39; => &#39;/queue/mytest&#39;,        &#39;enable&#39; => TRUE    );    $link = stomp_connect($amq[&#39;url&#39;], $amq[&#39;id&#39;], $amq[&#39;pswd&#39;]);    if (!$link) {        die("Can&#39;t connect MQ !!");    } else {        return $link;    }}//send a message to the queuefunction sendMQ($data) {    $link = openMQ();    foreach ($data as $pitem) {        //使用 persistent message        $result = stomp_send($link, $amq[&#39;queue&#39;], $pitem, array("persistent" => "true"));        if (FALSE === $result) {            //do something        }    }}//receive messagefunction receiveMQ() {    $link = openMQ($queue);    //stomp_subscribe($link, $queue);    stomp_subscribe($link, $queue, array("activemq.prefetchSize" => 1000));        while (1) {        if (TRUE === stomp_has_frame($link)) {            $frame = stomp_read_frame($link);                if (FALSE !== $frame) {                stomp_ack($link, $frame[&#39;headers&#39;][&#39;message-id&#39;]);             } else {                //do something                break;                }            } else {                break;            }        }        stomp_unsubscribe($link, $queue);        stomp_close($link);    }    </code>

stomp_sender.php

<code>// 消息发送服务</code>

stomp_consumer.php

<code>// 消息消费服务</code>
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn