Home >Backend Development >PHP Tutorial >php操作rabbitmq教程

php操作rabbitmq教程

WBOY
WBOYOriginal
2016-06-23 13:24:571612browse

1: 连接rabbitmq 新建exchange和queue

amqp_manager.php

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit(0);
}

$channel = new AMQPChannel($conn);


$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE) ;
$exchange->declare();     // 声明一个名为 lizhifeng的 路由器

// 添加一个名为queue1  的队列并绑定 key1
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key1');


// 添加一个名为queue2  的队列并绑定 key2
$queue = new AMQPQueue($channel);
$queue->setName('queue2');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key2');

// 将queue1 绑定到key3    注意key3不会覆盖key1   
// 而是key1和key3将同时生效
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
##$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); 无需重复设置队列queue1的属性
##$queue->declare();   这里不需要再重复申明了
$queue->bind('lizhifeng','key3');


/*
// 删除exchange
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->delete();

// 删除队列
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->delete();
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->delete();
*/
?>

2:连接rabbitmq 往exchange中写消息

amqp_server.php

$routingkey='key1';

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
}

$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');

for($i=0;$i{
        if($routingkey=='key1')
    {
                $routingkey='key2';    // 路由到队列queue2
        }
    else if($routingkey=='key2')
    {
                $routingkey='key3';    // 路由到队列queue1
        }
    else
    {
        $routingkey='key1';    // 路由到队列queue1
    }
    $tmp=array();
    $tmp[]="第".$i."个消息的key为".$routingkey ;
    $message = json_encode($tmp);
        if($exchange->publish($message,$routingkey))
    {
        print $routingkey."\tok\n";
    }
    else
    {
        print "error\n" ;
    }
}

3:连接rabbitmq消费消息

amqp_client.php


//连接RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);

if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit();
}


$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue1');    
// 这里并不是创建新的队列,只是连接到名为quene1的队列
// 我的理解为队列其实在服务器上,消息已经被路由到不同的队列了, 我们只需取消息
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$q = new AMQPQueue($channel);
$q->setName('queue2');    
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$conn->disconnect();
?>



Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:PHP 模拟多进程Next article:php中的数种依赖注入