PHP AMQP 扩展 应用
周末休息 ,闲来研究下AMQP 的PHP 扩展,花了一天时间才调试好
高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用
我的AMQP服务器是使用RabbitMQ ,RabbitMQ 的安装网上很多。
重点说下 PHP 的扩展 php-amqp,我用的是最新的 amqp-1.0.1
文档是旧的 新的掺杂在一起 因此弄了好久才完成。
首先说下流程 即 Client - AMQP server - Client
左边的Client向右边的Client发送消息,流程:
1, 获取Conection
2, 获取Channel
3, 定义Exchange,Queue
4, 使用一个RoutingKey将Queue Binding到一个Exchange上
5, 通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
6, 接收方在接收时也是获取connection,接着获取channel,然后指定一个Queue直接到它关心的Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消息就OK了
以下是PHP 的实现:
生产消息:
<?php //设置你的连接 $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'); $conn = new AMQPConnection($conn_args); if ($conn->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } //你的消息 $message = json_encode(array('Hello World!','php','c++')); //创建channel $channel = new AMQPChannel($conn); //创建exchange $ex = new AMQPExchange($channel); $ex->setName('exchange');//创建名字 $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "exchange status:".$ex->declare(); echo "\n"; //创建队列 $q = new AMQPQueue($channel); //设置队列名字 如果不存在则添加 $q->setName('queue'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$q->declare(); echo "\n"; echo 'queue bind: '.$q->bind('exchange','route.key');//将你的队列绑定到routingKey echo "\n"; $channel->startTransaction(); echo "send: ".$ex->publish($message, 'route.key'); //将你的消息通过制定routingKey发送 $channel->commitTransaction(); $conn->disconnect(); ?>
<?php $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest' ,'vhost'=>'/'); $conn = new AMQPConnection($conn_args); $conn->connect(); $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName('queue2'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$q->declare(); echo "==========\n"; $messages = $q->get(AMQP_AUTOACK); print_r($messages->getBody()); echo "\n"; // disconnect $conn->disconnect(); ?>