首页 >后端开发 >php教程 >PHP和RabbitMQ:高级示例

PHP和RabbitMQ:高级示例

Jennifer Aniston
Jennifer Aniston原创
2025-02-19 09:44:12532浏览

PHP and RabbitMQ: Advanced Examples

PHP和RabbitMQ:高级示例

在第1部分中,我们涵盖了PHP中AMQP协议的简单用例,并以RabbitMQ为代理。现在,让我们深入研究一些更高级的例子。

>

钥匙要点

    >使用PHP和RabbitMQ在多名工人中不同步处理数据,从而提高了高交易环境中的效率。
  • >在兔子中实现持久消息,以防止服务器崩溃期间的数据丢失,通过将消息“ evely_mode”设置为2,并将队列宣布为持久。
  • >使用RabbitMQ中的服务质量(QoS)设置来控制工人之间的消息分布,确保没有任何单个工人被不知所措。
  • 通过发送需要响应的消息,可用于使用RABBITMQ进行RPC(远程过程调用)
  • >设置RPC响应的独家,临时队列,以确保在客户端和服务器之间正确且安全地定向消息。>>>>> 通过将响应中的“相关性_id”与请求匹配,确保正确处理和处理答复。
  • >示例1:发送请求以在几个工人中对数据异步处理
  • 在上一部分的示例中,我们有一个生产者,一个消费者。 如果消费者死亡,则消息将继续堆积在队列中,直到消费者再次开始。然后,它将处理所有消息。
  • 在并发的用户环境中,每分钟请求相当多,这可能是不理想的。 幸运的是,扩展消费者非常容易,但让我们实现另一个示例。
  • >
  • >假设我们有一项发票生成服务,用户只需要提供发票号码,系统将自动生成PDF文件并将其通过电子邮件发送给用户。 如果生成过程运行的服务器受到资源有限,则生成和发送电子邮件可能需要几秒钟。 现在假设我们需要每秒支持几笔交易,我们如何在不压倒服务器的情况下完成此操作?
我们需要实现以下模式:

让我们看一下我们的制作人班:

> WorkerSender :: execute()方法将收到一个发票号。 接下来,我们像往常一样创建一个连接,频道和队列。

>请注意,这次,在创建消息对象时,构造函数将接收第二个参数:array('veliver_mode'=> 2)。 在这种情况下,我们要声明,如果RabbitMQ服务器崩溃,则不会丢失消息。 请注意,为了使这项工作,队列也必须被声明持久。

可以使用以下代码接收表单数据并执行生产者:>

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

请使用您感到满意的输入消毒/验证。>

在消费者方面,事情变得有些有趣:

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
>像往常一样,我们必须创建一个连接,得出频道并声明队列(队列的参数必须与生产者相同)。

>

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
>为了具有工人行为(几个程序中的派遣消息),我们必须使用$ channel-> basic_qos()声明服务质量(QoS)参数:>

    预摘要大小
  • :没有具体的限制,我们可以拥有尽可能多的工人
  • 预摘要计数
  • :在发送确认之前,每个工人要检索多少消息。这将使工人一次处理1条消息。>
  • global
  • :null表示以上设置仅适用于该消费者
  • 接下来,我们将开始消耗参数的关键差异。我们将关闭自动ACK,因为当我们完成处理消息并准备接收新消息时,我们会告诉RabbitMQ服务器。

现在,我们如何发送Ack? 请查看Workerreceiver :: process()方法(接收到消息时,该方法称为回调方法)。对生成PDF()和sendemail()方法的调用只是虚拟方法,它将模拟完成这两个任务所花费的时间。 $ msg参数不仅包含从生产者发送的有效载荷,还包含有关生产者使用的对象的信息。 我们可以使用$ msg-> velivery_info ['channel']提取有关生产者使用的通道的信息(这是我们使用$ Connection-> Channel();)为消费者打开的通道的对象类型。 由于我们需要向生产者的频道发送确认我们已经完成了该过程的确认,因此我们将使用其basic_ack()方法,将作为参数发送作为参数($ msg-> delivery_info ['evely_tag'])rabbitmq自动生成的rabbitmq正确地将ACK属于哪个消息相关联。

>

我们如何解雇工人?只需创建以下文件,请调用Workerreceiver :: listing()方法:

现在,使用php命令(例如php worker.php或您给出上述文件的任何名称)来启动工人。 但是等等,目的是有两个或更多工人,不是吗?没问题,以相同的方式启动更多的工人,以便拥有同一文件的多个进程,而RabbitMQ将根据QoS参数注册消费者并在其中分发工作。

>示例2:发送RPC请求并期望回复

到目前为止,我们一直在向RabbitMQ服务器发送消息,而无需等待回复。 对于异步过程,这可能比用户愿意花费更多的时间只是为了查看“确定”消息,这是可以的。 但是,如果我们真的需要答复怎么办?假设一个复杂的计算结果,因此我们可以将其显示给用户?

>

>假设我们有一个集中式登录服务器(单个登录),该服务器将用作与应用程序其余部分隔离的身份验证机制。 到达该服务器的唯一方法是通过RabbitMQ。 我们需要实现一种将登录凭据发送到该服务器的方法,并等待授予/拒绝访问响应。

我们需要实现以下模式:


>照常,让我们首先看一下制作人:>

>查看RPCSENDER ::执行方法,请注意,$凭据参数是['username'=>'x','password'=>'y']的形式的数组。 同样,我们打开一个新连接并照常创建频道。
<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

第一个区别来自宣布队列。 首先请注意,我们正在使用list()构造来从$ channel-> queue_declare()捕获结果。这是因为我们在声明时没有明确发送队列名称,因此我们需要找出如何识别该队列。 我们只对结果数组的第一个元素感兴趣,该元素将是队列的唯一标识符(例如AMQ.gen-_U0KJVM8HELFZQK9P0Z9GG)。 第二个更改是我们需要将此队列声明为独家,因此其他并发过程的结果没有混合。
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
>

>另一个很大的更改是,在执行$ channel-> basic_consume()时,生产者也将成为队列的消费者,请注意,我们在声明队列时提供了$ callback_queue值。 像每个消费者一样,我们将在该过程收到响应时声明一个回调要执行。

>

接下来,我们必须为消息创建一个相关ID,这无非是每个消息的唯一标识符。 在示例中,我们正在使用uniqid()的输出,但是您可以使用自己喜欢的任何机制(只要它不创建种族条件,就不需要是强大的,加密的安全RNG)。

>
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

现在,让我们创建一条消息,与我们在前两个示例中使用的消息相比,它具有重要的变化。除了分配一个包含我们要身份验证的凭证的JSON编码字符串外,我们还必须提供给AMQPMESSAGE构造函数一个具有两个定义的属性的数组:>

  • 回复_TO
  • :在声明它时生成的队列标识符>
  • 发布消息后,我们将评估响应,该响应将在开始时是空的。 虽然响应值仍然为空,但我们将等待通过$ channel-> wait();。
>一旦我们收到来自频道的响应,将调用回调方法(rpcsender :: onResponse())。 此方法将与已接收的相关ID与生成的相关ID匹配,如果它们是相同的,则将设置响应主体,从而破坏while循环。

。 RPC消费者呢? 在这里是:

相同的旧连接和频道创建:)

>与声明队列相同,但是该队列将具有预定义的名称('
<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
rpc_queue

’)。 我们将定义QoS参数,因为我们将停用自动ACK,因此我们可以在验证凭据验证并具有结果时通知。

>

魔术来自声明的回调。一旦我们对凭据进行身份验证(是的,我知道该过程是针对静态用户名/密码值完成的,本教程并不是关于如何对凭据进行身份验证;)),我们必须创建具有相同关联ID的返回消息(生产者)创建。 我们可以使用$ req-> get('correlation_id')从请求消息中提取此值,以相同的方式传递此值。

>现在,我们必须将此消息发布到制作人(带有“随机”名称)中创建的同一队列。 我们使用$ req-> get('reply_to')提取队列名称,并将其用作basic_publish()。
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
中的路由密钥

>发布消息后,我们必须使用$ req-> delivery_info ['channel''] - > basic_ack()将ACK通知发送到频道,使用$ req-> evelyse_info ['velivery_tag'' ]这样生产者可以停止等待。

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
再次启动聆听过程,您就可以开始了。 您甚至可以组合示例2和3以具有多工程师的RPC进程来执行身份验证请求,而不是通过启动几名工人来缩放。

>关于RabbitMQ和AMQP的更多要说,例如虚拟主机,交换类型,服务器管理等……您可以在此处和文档页面上找到更多的应用程序模式(例如路由,主题)。 还有一个命令行工具来管理RabbitMQ,还有一个基于Web的接口。

>

如果您喜欢这个教程系列,并且想查看有关MQ和更多现实世界用例的更多信息,请在下面的评论中告诉我们!

>关于PHP RabbitMQ高级示例的常见问题(常见问题解答)

>兔子在PHP中的作用是什么?它通过使其能够更有效地处理高负载和复杂任务,在PHP应用中起着至关重要的作用。 RabbitMQ使用高级消息排队协议(AMQP)来促进应用程序不同部分之间的消息交换。这允许流程解耦,使应用程序更具可扩展性和弹性。你的机器。这可以通过官方的RabbitMQ网站完成。安装服务器后,您可以安装PHP AMQP扩展名,该扩展提供了与RabbitMQ交互的必要功能。可以使用命令pecl install amqp。 AMQPCHANNEL课程。此方法采用多个参数,包括交换的名称,交换的类型(直接,主题,粉丝或标题)以及可选参数,例如被动,耐用,auto_delete和grignestes。我是否在php中的兔子队列中发送消息?带有消息内容的AMQPMESSAGE类。然后,您可以使用AMQPCHANNEL类的Basic_Publish方法将消息发送到队列。 BASIC_PUBLISH方法将消息,交换和路由密钥作为参数。

>

>我如何在php中从php?

中的兔子队列中消费消息,您可以在使用AMQPCHANNEL类的basic_consume方法兔队列。此方法采用几个参数,包括队列名称,消费者标签,no_local,no_ack,exclusive和一个回调函数,该函数将在接收到消息时将执行。

>

我如何处理rabbitmq中的PHP中的错误可以使用try-catch块在兔子中使用PHP中的兔子中的错误处理?发生错误时,PHP AMQP扩展会引发AMQPException类的异常。您可以捕获这些异常并根据您的应用程序的需求处理它们。

>如何确保使用php的兔子中的消息持久性? AMQPMESSAGE类的velivery_mode属性到2。这将使RabbitMQ将消息存储在磁盘上,即使RabbitMQ服务器崩溃或重新启动。

>如何通过php?

在兔子中的优先级队列在PHP中实现优先级排队,可以通过在声明队列时设置X-Max-Prifority参数来实现。然后,在发送消息时,您可以将AMQPMESSAGE类的优先属性设置为0和您指定的最大优先级之间的值。

我如何在PHP中使用RabbitMQ作为RPC? > RABBITMQ可以通过向回调队列发送带有回复属性设置的消息,用于PHP中的远程过程调用(RPC)。然后,服务器可以将响应发送到回调队列,并且客户端可以从那里消耗响应。

>

>如何使用PHP?

>监视PHP中的RabbitMQ中的RabbitMQ。 RabbitMQ管理插件,该插件提供了一个基于Web的接口,用于监视和管理RabbitMQ服务器。您还可以使用AMQPCHANNEL类的方法来获取有关频道状态的信息,例如未经确定的消息的数量。

>

以上是PHP和RabbitMQ:高级示例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn