>
钥匙要点
> WorkerSender :: execute()方法将收到一个发票号。 接下来,我们像往常一样创建一个连接,频道和队列。
>请注意,这次,在创建消息对象时,构造函数将接收第二个参数:array('veliver_mode'=> 2)。 在这种情况下,我们要声明,如果RabbitMQ服务器崩溃,则不会丢失消息。 请注意,为了使这项工作,队列也必须被声明持久。
可以使用以下代码接收表单数据并执行生产者: 请使用您感到满意的输入消毒/验证。
>
现在,我们如何发送Ack? 请查看Workerreceiver :: process()方法(接收到消息时,该方法称为回调方法)。对生成PDF()和sendemail()方法的调用只是虚拟方法,它将模拟完成这两个任务所花费的时间。 $ msg参数不仅包含从生产者发送的有效载荷,还包含有关生产者使用的对象的信息。 我们可以使用$ msg-> velivery_info ['channel']提取有关生产者使用的通道的信息(这是我们使用$ Connection-> Channel();)为消费者打开的通道的对象类型。 由于我们需要向生产者的频道发送确认我们已经完成了该过程的确认,因此我们将使用其basic_ack()方法,将作为参数发送作为参数($ msg-> delivery_info ['evely_tag'])rabbitmq自动生成的rabbitmq正确地将ACK属于哪个消息相关联。 我们如何解雇工人?只需创建以下文件,请调用Workerreceiver :: listing()方法:
>
>另一个很大的更改是,在执行$ channel-> basic_consume()时,生产者也将成为队列的消费者,请注意,我们在声明队列时提供了$ callback_queue值。 像每个消费者一样,我们将在该过程收到响应时声明一个回调要执行。
接下来,我们必须为消息创建一个相关ID,这无非是每个消息的唯一标识符。 在示例中,我们正在使用uniqid()的输出,但是您可以使用自己喜欢的任何机制(只要它不创建种族条件,就不需要是强大的,加密的安全RNG)。 现在,让我们创建一条消息,与我们在前两个示例中使用的消息相比,它具有重要的变化。除了分配一个包含我们要身份验证的凭证的JSON编码字符串外,我们还必须提供给AMQPMESSAGE构造函数一个具有两个定义的属性的数组: 。
RPC消费者呢? 在这里是:
’)。 我们将定义QoS参数,因为我们将停用自动ACK,因此我们可以在验证凭据验证并具有结果时通知。
魔术来自声明的回调。一旦我们对凭据进行身份验证(是的,我知道该过程是针对静态用户名/密码值完成的,本教程并不是关于如何对凭据进行身份验证;)),我们必须创建具有相同关联ID的返回消息(生产者)创建。 我们可以使用$ req-> get('correlation_id')从请求消息中提取此值,以相同的方式传递此值。
>发布消息后,我们必须使用$ req-> delivery_info ['channel''] - > basic_ack()将ACK通知发送到频道,使用$ req-> evelyse_info ['velivery_tag'' ]这样生产者可以停止等待。 >关于RabbitMQ和AMQP的更多要说,例如虚拟主机,交换类型,服务器管理等……您可以在此处和文档页面上找到更多的应用程序模式(例如路由,主题)。 还有一个命令行工具来管理RabbitMQ,还有一个基于Web的接口。 如果您喜欢这个教程系列,并且想查看有关MQ和更多现实世界用例的更多信息,请在下面的评论中告诉我们! >我如何在php中从php? 我如何处理rabbitmq中的PHP中的错误可以使用try-catch块在兔子中使用PHP中的兔子中的错误处理?发生错误时,PHP AMQP扩展会引发AMQPException类的异常。您可以捕获这些异常并根据您的应用程序的需求处理它们。 >如何使用PHP?<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
>像往常一样,我们必须创建一个连接,得出频道并声明队列(队列的参数必须与生产者相同)。<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
>为了具有工人行为(几个程序中的派遣消息),我们必须使用$ channel-> basic_qos()声明服务质量(QoS)参数:预摘要大小
>示例2:发送RPC请求并期望回复
到目前为止,我们一直在向RabbitMQ服务器发送消息,而无需等待回复。 对于异步过程,这可能比用户愿意花费更多的时间只是为了查看“确定”消息,这是可以的。 但是,如果我们真的需要答复怎么办?假设一个复杂的计算结果,因此我们可以将其显示给用户?<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
>
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
>一旦我们收到来自频道的响应,将调用回调方法(rpcsender :: onResponse())。 此方法将与已接收的相关ID与生成的相关ID匹配,如果它们是相同的,则将设置响应主体,从而破坏while循环。<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
rpc_queue <span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
中的路由密钥
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
再次启动聆听过程,您就可以开始了。 您甚至可以组合示例2和3以具有多工程师的RPC进程来执行身份验证请求,而不是通过启动几名工人来缩放。
>关于PHP RabbitMQ高级示例的常见问题(常见问题解答)
>兔子在PHP中的作用是什么?它通过使其能够更有效地处理高负载和复杂任务,在PHP应用中起着至关重要的作用。 RabbitMQ使用高级消息排队协议(AMQP)来促进应用程序不同部分之间的消息交换。这允许流程解耦,使应用程序更具可扩展性和弹性。你的机器。这可以通过官方的RabbitMQ网站完成。安装服务器后,您可以安装PHP AMQP扩展名,该扩展提供了与RabbitMQ交互的必要功能。可以使用命令pecl install amqp。 AMQPCHANNEL课程。此方法采用多个参数,包括交换的名称,交换的类型(直接,主题,粉丝或标题)以及可选参数,例如被动,耐用,auto_delete和grignestes。我是否在php中的兔子队列中发送消息?带有消息内容的AMQPMESSAGE类。然后,您可以使用AMQPCHANNEL类的Basic_Publish方法将消息发送到队列。 BASIC_PUBLISH方法将消息,交换和路由密钥作为参数。
>>
>如何确保使用php的兔子中的消息持久性? AMQPMESSAGE类的velivery_mode属性到2。这将使RabbitMQ将消息存储在磁盘上,即使RabbitMQ服务器崩溃或重新启动。
>如何通过php?
在兔子中的优先级队列在PHP中实现优先级排队,可以通过在声明队列时设置X-Max-Prifority参数来实现。然后,在发送消息时,您可以将AMQPMESSAGE类的优先属性设置为0和您指定的最大优先级之间的值。我如何在PHP中使用RabbitMQ作为RPC? > RABBITMQ可以通过向回调队列发送带有回复属性设置的消息,用于PHP中的远程过程调用(RPC)。然后,服务器可以将响应发送到回调队列,并且客户端可以从那里消耗响应。
>>监视PHP中的RabbitMQ中的RabbitMQ。 RabbitMQ管理插件,该插件提供了一个基于Web的接口,用于监视和管理RabbitMQ服务器。您还可以使用AMQPCHANNEL类的方法来获取有关频道状态的信息,例如未经确定的消息的数量。
>
以上是PHP和RabbitMQ:高级示例的详细内容。更多信息请关注PHP中文网其他相关文章!