Heim >Backend-Entwicklung >PHP-Tutorial >php操作rabbitmq教程

php操作rabbitmq教程

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOriginal
2016-06-23 13:24:571620Durchsuche

1: 连接rabbitmq 新建exchange和queue

amqp_manager.php

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit(0);
}

$channel = new AMQPChannel($conn);


$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE) ;
$exchange->declare();     // 声明一个名为 lizhifeng的 路由器

// 添加一个名为queue1  的队列并绑定 key1
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key1');


// 添加一个名为queue2  的队列并绑定 key2
$queue = new AMQPQueue($channel);
$queue->setName('queue2');    
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();  
$queue->bind('lizhifeng','key2');

// 将queue1 绑定到key3    注意key3不会覆盖key1   
// 而是key1和key3将同时生效
$queue = new AMQPQueue($channel);
$queue->setName('queue1');    
##$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); 无需重复设置队列queue1的属性
##$queue->declare();   这里不需要再重复申明了
$queue->bind('lizhifeng','key3');


/*
// 删除exchange
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->delete();

// 删除队列
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->delete();
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->delete();
*/
?>

2:连接rabbitmq 往exchange中写消息

amqp_server.php

$routingkey='key1';

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
}

$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');

for($i=0;$i{
        if($routingkey=='key1')
    {
                $routingkey='key2';    // 路由到队列queue2
        }
    else if($routingkey=='key2')
    {
                $routingkey='key3';    // 路由到队列queue1
        }
    else
    {
        $routingkey='key1';    // 路由到队列queue1
    }
    $tmp=array();
    $tmp[]="第".$i."个消息的key为".$routingkey ;
    $message = json_encode($tmp);
        if($exchange->publish($message,$routingkey))
    {
        print $routingkey."\tok\n";
    }
    else
    {
        print "error\n" ;
    }
}

3:连接rabbitmq消费消息

amqp_client.php


//连接RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);

if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
    exit();
}


$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue1');    
// 这里并不是创建新的队列,只是连接到名为quene1的队列
// 我的理解为队列其实在服务器上,消息已经被路由到不同的队列了, 我们只需取消息
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$q = new AMQPQueue($channel);
$q->setName('queue2');    
while($messages = $q->get(AMQP_AUTOACK))
{
    var_dump(json_decode($messages->getBody(), true ));
}

$conn->disconnect();
?>



Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:PHP 模拟多进程Nächster Artikel:php中的数种依赖注入