search
HomeBackend DevelopmentPHP TutorialPHP and RabbitMQ: Advanced Examples

PHP and RabbitMQ: Advanced Examples

PHP and RabbitMQ: Advanced Examples

In part 1 we covered the theory and a simple use case of the AMQP protocol in PHP with RabbitMQ as the broker. Now, let’s dive into some more advanced examples.

Key Takeaways

  • Employ PHP and RabbitMQ to process data asynchronously among multiple workers, enhancing efficiency in high-transaction environments.
  • Implement persistent messages in RabbitMQ to prevent data loss during server crashes by setting message ‘delivery_mode’ to 2 and declaring queues as durable.
  • Use the Quality of Service (QoS) settings in RabbitMQ to control message distribution among workers, ensuring no single worker is overwhelmed.
  • Utilize RabbitMQ for RPC (Remote Procedure Call) by sending messages that require a response, useful for tasks like user authentication.
  • Set up exclusive, temporary queues for RPC responses to ensure messages are directed correctly and securely between the client and server.
  • Handle RPC responses effectively by matching ‘correlation_id’ in the response with the request, ensuring the correct handling and processing of replies.

Example 1: send request to process data asynchronously among several workers

In the example of the previous part, we had one producer, one consumer. If the consumer died, messages would continue to stack up in the queue until the consumer started again. It would then process all the messages, one by one.

This can be less than ideal in a concurrent user environment with a fair amount of requests per minute. Fortunately, scaling the consumers is super easy, but let’s implement another example.

Let’s say we have an invoice generation service, where the users just need to provide the invoice number, and the system will automatically generate a pdf file and email it to the user. Generating and sending the email could take even several seconds if the server on which the generation process runs is resource limited. Now suppose we are required to support several transactions per second, how do we accomplish this without overwhelming the server?

We need to implement the following pattern:

Let’s look at our producer class:

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

The WorkerSender::execute() method will receive an invoice number. Next we create a connection, channel and queue as usual.

<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>

Please notice that this time, while creating the message object, the constructor receives a second parameter: array('delivery_mode' => 2). In this case we want to state that the message should not be lost if the RabbitMQ server crashes. Please be aware that in order for this to work, the queue has to be declared durable, too.

The following code can be used to receive the form data and execute the producer:

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

Please use whichever input sanitization/validation you feel comfortable with.

Things get a little bit interesting on the consumer side:

<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>

As usual, we have to create a connection, derive a channel and declare a queue (the queue’s parameters have to be de same as the producer).

<span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>

In order to have worker behavior (dispatch messages among several proceses), we have to declare the Quality of Service (qos) parameters with $channel->basic_qos():

  • Prefetch size: no specific limit, we could have as many workers as we need
  • Prefetch count: how many messages to retrieve per worker before sending back an acknowledgement. This will make the worker to process 1 message at a time.
  • Global: null means that above settings will apply to this consumer only

Next, we will start consuming, with a key difference in the parameters. We will turn off automatic ack’s, since we will tell the RabbitMQ server when we have finished processing the message and be ready to receive a new one.

Now, how do we send that ack? Please take a look at the WorkerReceiver::process() method (which is declared as a callback method when a message is received). The calls to generatedPdf() and sendEmail() methods are just dummy methods that will simulate the time spent to accomplish both tasks. The $msg parameter not only contains the payload sent from the producer, it also contains information about the objects used by the producer. We can extract information about the channel used by the producer with $msg->delivery_info['channel'] (which is the same object type to the channel we opened for the consumer with $connection->channel();). Since we need to send the producer’s channel an acknowledgement that we have completed the process, we will use its basic_ack() method, sending as a parameter the delivery tag ($msg->delivery_info['delivery_tag']) RabbitMQ automatically generated in order to associate correctly to which message the ack belongs to.

How do we fire up the workers? Just create a file like the following, invoking the WorkerReceiver::listen() method:

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

Now use the php command (e.g. php worker.php or whichever name you have given to above file) to fire up the worker. But wait, the purpose was to have two or more workers, wasn’t it? No problem, fire up more workers in the same way in order to have multiple processes of the same file, and RabbitMQ will register the consumers and distribute work among them according to the QoS parameters.

Example 2: send RPC requests and expect a reply

So far, we have been sending messages to the RabbitMQ server without the user having to wait for a reply. This is ok for asynchronous processes that might take more time than the user is willing to spend just to see an ‘OK’ message. But what if we actually need a reply? Let’s say some result from a complex calculation, so we can show it to the user?

Let’s say we have a centralized login server (single sign on) that will work as an authentication mechanism isolated from the rest of our application(s). The only way to reach this server is through RabbitMQ. We need to implement a way to send the login credentials to this server and wait for a grant/deny access response.

We need to implement the following pattern:

As usual, let’s look at the producer first:

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

Looking at RpcSender::execute method, please be aware that the $credentials parameter is an array in the form of ['username'=>'x', 'password'=>'y']. Again, we open a new connection and create a channel as usual.

<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>

The first difference comes from declaring a queue. First notice that we are using the list() construct to catch the result from $channel->queue_declare(). This is because we do not explicitly send a queue name while declaring it so we need to find out how this queue is identified. We are only interested in the first element of the result array, which will be an unique identifier of the queue (something like amq.gen-_U0kJVm8helFzQk9P0z9gg). The second change is that we need to declare this queue as exclusive, so there is no mix up in the results from other concurrent processes.

Another big change is that the producer will be a consumer of a queue too, when executing $channel->basic_consume() please notice that we are providing the $callback_queue value we got while declaring the queue. And like every consumer, we will declare a callback to execute when the process receives a response.

<span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>

Next, we have to create a correlation id for the message, this is nothing more than a unique identifier for each message. In the example we are using uniqid()’s output, but you can use whichever mechanism you prefer (as long as it does not create a race condition, does not need to be a strong, crypto-safe RNG).

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

Now let’s create a message, which has important changes compared to what we were used to in the first 2 examples. Aside from assigning a json-encoded string containing the credentials we want to authenticate, we have to provide to the AMQPMessage constructor an array with two properties defined:

  • correlation_id: a tag for the message
  • reply_to: the queue identifier generated while declaring it

After publishing the message, we will evaluate the response, which will be empty at the beginning. While the response value remains empty, we will wait for a response from the channel with $channel->wait();.

Once we receive a response from the channel, the callback method will be invoked (RpcSender::onResponse()). This method will match the received correlation id against the one generated, and if they are the same, will set the response body, thus breaking the while loop.

What about the RPC consumer? Here it is:

<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span></span>

Same old connection and channel creation :)

Same as declaring the queue, however this queue will have a predefined name (‘rpc_queue‘). We will define the QoS parameters since we will deactivate automatic acks, so we can notify when we are finished verifying the credentials and have a result.

<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>

The magic comes from within the declared callback. Once we are done authenticating the credentials (yes, I know the process is done against static username/password values, this tutorial is not about how to authenticate credentials ;) ), we have to create the return message with the same correlation id the producer created. We can extract this from the request message with $req->get('correlation_id'), passing this value the same way we did it in the producer.

<span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>

Now we have to publish this message to the same queue that was created in the producer (the one with the ‘random’ name). We extract the queue’s name with $req->get('reply_to') and use it as the routing key in basic_publish().

Once we published the message, we have to send the ack notice to the channel with $req->delivery_info['channel']->basic_ack(), using the delivery tag in $req->delivery_info['delivery_tag'] so the producer can stop waiting.

Again, fire up a listening process and you are ready to go. You can even combine examples 2 and 3 to have a multi-worker rpc process to perform the authentication requests than can be scaled just by firing up several workers.

There is far more to be said about RabbitMQ and AMQP, like virtual hosts, exchange types, server administration, etc… you can find more application patterns (like routing, topics) here and at the documentation page. There is also a command line tool to manage RabbitMQ, as well as a web based interface.

If you liked this tutorial series and would like to see more about MQ and more real world use cases, please let us know in the comments below!

Frequently Asked Questions (FAQs) about PHP RabbitMQ Advanced Examples

What is the role of RabbitMQ in PHP?

RabbitMQ is a message broker that allows applications to communicate with each other asynchronously. It plays a crucial role in PHP applications by enabling them to handle high loads and complex tasks more efficiently. RabbitMQ uses the Advanced Message Queuing Protocol (AMQP) to facilitate the exchange of messages between different parts of an application. This allows for the decoupling of processes, making the application more scalable and resilient.

How do I install RabbitMQ for PHP?

To install RabbitMQ for PHP, you need to first install RabbitMQ server on your machine. This can be done through the official RabbitMQ website. After the server is installed, you can install the PHP AMQP extension, which provides the necessary functions to interact with RabbitMQ. This can be done using the PECL installer with the command pecl install amqp.

How can I create a RabbitMQ exchange in PHP?

In PHP, you can create a RabbitMQ exchange using the exchange_declare method of the AMQPChannel class. This method takes several parameters, including the name of the exchange, the type of the exchange (direct, topic, fanout, or headers), and optional parameters such as passive, durable, auto_delete, and arguments.

How do I send a message to a RabbitMQ queue in PHP?

To send a message to a RabbitMQ queue in PHP, you first need to create an instance of the AMQPMessage class with the message content. Then, you can use the basic_publish method of the AMQPChannel class to send the message to the queue. The basic_publish method takes the message, the exchange, and the routing key as parameters.

How can I consume messages from a RabbitMQ queue in PHP?

In PHP, you can consume messages from a RabbitMQ queue using the basic_consume method of the AMQPChannel class. This method takes several parameters, including the queue name, the consumer tag, no_local, no_ack, exclusive, and a callback function that will be executed when a message is received.

How can I handle errors in RabbitMQ with PHP?

Error handling in RabbitMQ with PHP can be done using try-catch blocks. The PHP AMQP extension throws exceptions of the AMQPException class when an error occurs. You can catch these exceptions and handle them according to your application’s needs.

How can I ensure message durability in RabbitMQ with PHP?

To ensure message durability in RabbitMQ with PHP, you can set the delivery_mode property of the AMQPMessage class to 2. This will make RabbitMQ store the message on disk, ensuring that it will not be lost even if the RabbitMQ server crashes or restarts.

How can I implement priority queues in RabbitMQ with PHP?

Priority queues in RabbitMQ can be implemented in PHP by setting the x-max-priority argument when declaring the queue. Then, when sending a message, you can set the priority property of the AMQPMessage class to a value between 0 and the maximum priority you specified.

How can I use RabbitMQ for RPC in PHP?

RabbitMQ can be used for Remote Procedure Call (RPC) in PHP by sending a message with a reply-to property set to a callback queue. The server can then send the response to the callback queue, and the client can consume the response from there.

How can I monitor RabbitMQ in PHP?

Monitoring RabbitMQ in PHP can be done using the RabbitMQ management plugin, which provides a web-based interface for monitoring and managing your RabbitMQ server. You can also use the AMQPChannel class’s methods to get information about the state of the channel, such as the number of unacknowledged messages.

The above is the detailed content of PHP and RabbitMQ: Advanced Examples. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
11 Best PHP URL Shortener Scripts (Free and Premium)11 Best PHP URL Shortener Scripts (Free and Premium)Mar 03, 2025 am 10:49 AM

Long URLs, often cluttered with keywords and tracking parameters, can deter visitors. A URL shortening script offers a solution, creating concise links ideal for social media and other platforms. These scripts are valuable for individual websites a

Introduction to the Instagram APIIntroduction to the Instagram APIMar 02, 2025 am 09:32 AM

Following its high-profile acquisition by Facebook in 2012, Instagram adopted two sets of APIs for third-party use. These are the Instagram Graph API and the Instagram Basic Display API.As a developer building an app that requires information from a

Working with Flash Session Data in LaravelWorking with Flash Session Data in LaravelMar 12, 2025 pm 05:08 PM

Laravel simplifies handling temporary session data using its intuitive flash methods. This is perfect for displaying brief messages, alerts, or notifications within your application. Data persists only for the subsequent request by default: $request-

Build a React App With a Laravel Back End: Part 2, ReactBuild a React App With a Laravel Back End: Part 2, ReactMar 04, 2025 am 09:33 AM

This is the second and final part of the series on building a React application with a Laravel back-end. In the first part of the series, we created a RESTful API using Laravel for a basic product-listing application. In this tutorial, we will be dev

Simplified HTTP Response Mocking in Laravel TestsSimplified HTTP Response Mocking in Laravel TestsMar 12, 2025 pm 05:09 PM

Laravel provides concise HTTP response simulation syntax, simplifying HTTP interaction testing. This approach significantly reduces code redundancy while making your test simulation more intuitive. The basic implementation provides a variety of response type shortcuts: use Illuminate\Support\Facades\Http; Http::fake([ 'google.com' => 'Hello World', 'github.com' => ['foo' => 'bar'], 'forge.laravel.com' =>

cURL in PHP: How to Use the PHP cURL Extension in REST APIscURL in PHP: How to Use the PHP cURL Extension in REST APIsMar 14, 2025 am 11:42 AM

The PHP Client URL (cURL) extension is a powerful tool for developers, enabling seamless interaction with remote servers and REST APIs. By leveraging libcurl, a well-respected multi-protocol file transfer library, PHP cURL facilitates efficient execution of various network protocols, including HTTP, HTTPS, and FTP. This extension offers granular control over HTTP requests, supports multiple concurrent operations, and provides built-in security features.

12 Best PHP Chat Scripts on CodeCanyon12 Best PHP Chat Scripts on CodeCanyonMar 13, 2025 pm 12:08 PM

Do you want to provide real-time, instant solutions to your customers' most pressing problems? Live chat lets you have real-time conversations with customers and resolve their problems instantly. It allows you to provide faster service to your custom

Announcement of 2025 PHP Situation SurveyAnnouncement of 2025 PHP Situation SurveyMar 03, 2025 pm 04:20 PM

The 2025 PHP Landscape Survey investigates current PHP development trends. It explores framework usage, deployment methods, and challenges, aiming to provide insights for developers and businesses. The survey anticipates growth in modern PHP versio

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Repo: How To Revive Teammates
4 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: How To Get Giant Seeds
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Dreamweaver Mac version

Dreamweaver Mac version

Visual web development tools

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Integrate Eclipse with SAP NetWeaver application server.

SublimeText3 English version

SublimeText3 English version

Recommended: Win version, supports code prompts!