Home >Backend Development >PHP Tutorial >用PHP尝试RabbitMQ(amqp扩展)

用PHP尝试RabbitMQ(amqp扩展)

WBOY
WBOYOriginal
2016-06-20 12:49:521063browse

装好了amqp后就可以开始编写代码了:

消费者:接收消息

逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

<?php   /************************************* * PHP amqp(RabbitMQ) Demo - consumer * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array(     'host' => '192.168.1.93',      'port' => '5672',      'login' => 'guest',      'password' => 'guest',     'vhost'=>'/' );   $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key  //创建连接和channel $conn = new AMQPConnection($conn_args);   if (!$conn->connect()) {       die("Cannot connect to the broker!\n");   }   $channel = new AMQPChannel($conn);    //创建交换机    $ex = new AMQPExchange($channel);   $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n";      //创建队列    $q = new AMQPQueue($channel); $q->setName($q_name);   $q->setFlags(AMQP_DURABLE); //持久化  echo "Message Total:".$q->declare()."\n";    //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";  //阻塞模式接收消息 echo "Message:\n";   while(True){     $q->consume('processMessage');       //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  } $conn->disconnect();    /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) {     $msg = $envelope->getBody();     echo $msg."\n"; //处理消息     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }



生产者:发送消息
逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息

<?php   /************************************* * PHP amqp(RabbitMQ) Demo - publisher * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array(     'host' => '192.168.1.93',      'port' => '5672',      'login' => 'guest',      'password' => 'guest',     'vhost'=>'/' );   $e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key  //创建连接和channel $conn = new AMQPConnection($conn_args);   if (!$conn->connect()) {       die("Cannot connect to the broker!\n");   }   $channel = new AMQPChannel($conn);    //消息内容 $message = "TEST MESSAGE! 测试消息!";    //创建交换机对象    $ex = new AMQPExchange($channel);   $ex->setName($e_name);    //发送消息 //$channel->startTransaction(); //开始事务  for($i=0; $i<5; ++$i){     echo "Send Message:".$ex->publish($message, $k_route)."\n";  } //$channel->commitTransaction(); //提交事务  $conn->disconnect();



  
需要注意的地方是:

queue对象有两个方法可用于取消息:consume和get。
前者是阻塞的,无消息时会被挂起,适合循环中使用;
后者则是非阻塞的,取消息时有则取,无则返回false。

测试截图

运行消费者:

 

运行生产者,发消息:


消费者接收到消息:


 

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:php Socket框架Next article:php二维数组添加元素