Home >Backend Development >PHP Tutorial >php amqp message queue RabbitMQ exchange type direct connection (3)_PHP tutorial

php amqp message queue RabbitMQ exchange type direct connection (3)_PHP tutorial

WBOY
WBOYOriginal
2016-07-13 10:37:211488browse

1. AMQP_EX_TYPE_DIRECT: direct connection type

Direct connection types also include: 1 to 1 and 1 to N (N to 1, N to N)

The code of receive.php on the receiving end is as follows
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('logs');
$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {
    $queue->consume('callback');
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}

The send.php code on the sending end is as follows
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$exchange->publish('direct type test','logs');
var_dump("Send Message OK");

$connect->disconnect();

The running results are as shown in the figure




Create receive_one.php and receive_two.php and change the send.php code to the following code for our viewing convenience receive_one.php and receive_two.php have the same code or use dos to run multiple receivers
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {
    $queue->consume('callback');
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}




send.php
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

for ($index = 1; $index < 5; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}

$exchange->delete();
$connect->disconnect();

The running results are as follows

The queue will distribute the message to each receiving end for distribution and processing. This seems perfect, but if you want to better handle different tasks, you need fair scheduling
For example, when 1 and 3 deal with simple people and 2 and 4 deal with complex tasks, if there are too many tasks, receive_one.php will be idle and receive_two.php will have heavy tasks. We conduct the following tests Change send.php to 5 to 50
for ($index = 1; $index < 50; $index++) {
    $exchange->publish($index,'logs');
    var_dump("Send:$index");
}

receive_two.php plus sleep(3)
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    sleep(3);
    $queue->nack($envelope->getDeliveryTag());
}

When we run the program, the results are as follows


After all receive_one runs, receive_two only runs one. After that, receive_one has been idle. We can set it on the receiving end by $channel->setPrefetchCount(1);
New messages will not be received until no one completes the task, and the message will be sent to other receivers.
As follows receive_one.php and receive_two.php
$channel = new AMQPChannel($connect);
Change it to the following
$channel = new AMQPChannel($connect);
$channel->setPrefetchCount(1);




www.bkjia.comtruehttp: //www.bkjia.com/PHPjc/735888.htmlTechArticle1. AMQP_EX_TYPE_DIRECT: Direct connection type. Direct connection type also includes: 1 to 1 and 1 to N (N to 1 , N to N) The receiver.php code is as follows connect();$channel = new AMQPChannel($connect);$exchange...
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn