Heim >Backend-Entwicklung >PHP-Tutorial >PHP und Rabbitmq: Fortgeschrittene Beispiele

PHP und Rabbitmq: Fortgeschrittene Beispiele

Jennifer Aniston
Jennifer AnistonOriginal
2025-02-19 09:44:12533Durchsuche

PHP and RabbitMQ: Advanced Examples

PHP und Rabbitmq: Fortgeschrittene Beispiele

In Teil 1 haben wir die Theorie und einen einfachen Anwendungsfall des AMQP -Protokolls in PHP mit Rabbitmq als Broker behandelt. Lassen Sie uns nun in einige fortgeschrittenere Beispiele eintauchen.

Key Takeaways

  • PHP und Rabbitmq verwenden, um Daten asynchron zwischen mehreren Arbeitnehmern zu verarbeiten und die Effizienz in Umgebungen mit hoher Transaktion zu verbessern.
  • Implementieren Sie persistente Nachrichten in RabbitMQ, um den Datenverlust während der Serverabstürze zu verhindern, indem Sie die Nachricht "Delivery_Mode" auf 2 festlegen und Warteschlangen als dauerhaft deklarieren.
  • Verwenden Sie die Einstellungen für Qualität des Dienstes (QoS) in Rabbitmq, um die Nachrichtenverteilung unter den Arbeitnehmern zu kontrollieren, und stellen Sie sicher, dass kein einzelner Arbeiter überwältigt ist.
  • Rabbitmq für RPC (Remote Procedure Call) verwenden, indem Nachrichten gesendet werden, die eine Antwort erfordern, die für Aufgaben wie Benutzerauthentifizierung nützlich ist.
  • exklusive, temporäre Warteschlangen für RPC -Antworten einrichten, um sicherzustellen, dass Nachrichten korrekt und sicher zwischen Client und Server gerichtet sind.
  • RPC -Antworten effektiv handhaben, indem Sie "correlation_id" in der Antwort mit der Anforderung übereinstimmen, um die korrekte Handhabung und Verarbeitung von Antworten sicherzustellen.

Beispiel 1: Senden Sie die Anforderung, Daten asynchron unter mehreren Arbeitnehmern zu verarbeiten

Im Beispiel des vorherigen Teils hatten wir einen Produzenten, einen Verbraucher. Wenn der Verbraucher starb, würden sich die Nachrichten weiter in der Warteschlange stapeln, bis der Verbraucher erneut begann. Es würde dann alle Nachrichten nacheinander verarbeiten.

Dies kann in einer gleichzeitigen Benutzerumgebung mit einer angemessenen Anzahl von Anfragen pro Minute weniger als ideal sein. Glücklicherweise ist die Skalierung der Verbraucher super einfach, aber lassen Sie uns ein weiteres Beispiel implementieren.

Nehmen wir an, wir haben einen Rechnungsgenerierungsdienst, in dem die Benutzer nur die Rechnungsnummer bereitstellen müssen, und das System generiert automatisch eine PDF -Datei und eine E -Mail an den Benutzer. Das Generieren und Senden der E -Mail kann noch einige Sekunden dauern, wenn der Server, auf dem der Erzeugungsprozess ausgeführt wird, Ressource Limited ist. Nehmen wir nun an, wir müssen mehrere Transaktionen pro Sekunde unterstützen, wie wir dies erreichen, ohne den Server zu überwältigen?

Wir müssen das folgende Muster implementieren:

Schauen wir uns unsere Produzentenklasse an:
<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Die MOTORELTER :: Execute () erhält eine Rechnungsnummer. Als nächstes erstellen wir wie gewohnt eine Verbindung, Kanal und Warteschlange.
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Bitte beachten Sie, dass der Konstruktor dieses Mal beim Erstellen des Nachrichtenobjekts einen zweiten Parameter erhält: Array ('lieferung_mode' => 2). In diesem Fall möchten wir angeben, dass die Nachricht nicht verloren gehen sollte, wenn der Rabbitmq -Server abstürzt. Bitte beachten Sie, dass die Warteschlange auch für dauerhaft erklärt werden muss.

Der folgende Code kann verwendet werden, um die Formulardaten zu empfangen und den Produzenten auszuführen:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Bitte verwenden Sie die Eingabeeinstellungen/Validierung, mit denen Sie sich wohl fühlen.

Die Dinge werden auf der Verbraucherseite ein bisschen interessant:

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Wie gewohnt müssen wir eine Verbindung herstellen, einen Kanal ableiten und eine Warteschlange deklarieren (die Parameter der Warteschlange müssen wie der Produzent gleich sein).

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

Um das Verhalten von Work-Verhaltensweisen (Versandnachrichten unter mehreren Prozessen) zu haben, müssen wir die Parameter der Qualität des Dienstes (QoS) mit $ Channel-> Basic_qos () deklarieren:

  • Vorabklagegröße : Keine spezifische Grenze, wir könnten so viele Arbeitnehmer haben, wie wir
  • brauchen
  • Bezählen Sie die Anzahl : Wie viele Nachrichten können Sie pro Arbeiter abrufen, bevor Sie eine Bestätigung zurücksenden. Dadurch wird der Arbeiter dazu verarbeitet, 1 Nachricht gleichzeitig zu verarbeiten.
  • global : null bedeutet, dass die obigen Einstellungen nur für diesen Verbraucher gelten

Als nächstes werden wir mit einem wichtigen Unterschied in den Parametern beginnen. Wir werden die automatischen ACKs ausschalten, da wir dem Rabbitmq -Server mitteilen werden, wenn wir die Meldung beendet haben und bereit sind, eine neue zu erhalten.

Wie schicken wir das ACK? Bitte werfen Sie einen Blick auf die METHODERRECEIVER :: Process () (die als Rückrufmethode deklariert wird, wenn eine Nachricht empfangen wird). Die Aufrufe von generierten Methoden fürPdf () und Sendemail () sind nur Dummy -Methoden, die die Zeit simulieren, die für die Erfüllung beider Aufgaben aufgewendet wird. Der $ msg -Parameter enthält nicht nur die vom Produzenten gesendete Nutzlast, sondern auch Informationen zu den vom Produzenten verwendeten Objekten. Wir können Informationen über den vom Produzent verwendeten Kanal mit $ msg-> lieferung_info ['Kanal'] extrahieren (was der gleiche Objekttyp wie der Kanal ist, den wir für den Verbraucher mit $ Connection-> Kanal () geöffnet haben;). Da wir dem Kanal des Produzenten eine Bestätigung, dass wir den Prozess abgeschlossen haben ordnungsgemäß zu assoziieren, zu welcher Nachricht der ACK gehört.

Wie steuern wir die Arbeiter an? Erstellen Sie einfach eine Datei wie Folgendes und rufen Sie die WorkerReceiver :: listen () auf:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Verwenden Sie nun den PHP -Befehl (z. B. PHP Worker.php oder welcher Namen, den Sie der oben genannten Datei gegeben haben), um den Arbeiter abzubauen. Aber warte, der Zweck war, zwei oder mehr Arbeiter zu haben, nicht wahr? Kein Problem, starten mehr Arbeitnehmer auf die gleiche Weise, um mehrere Prozesse derselben Datei zu haben, und Rabbitmq registriert die Verbraucher und vertreibt die Arbeiten gemäß den QoS -Parametern.

Beispiel 2: RPC -Anfragen senden und erwarten Sie eine Antwort

Bisher haben wir Nachrichten an den Rabbitmq -Server gesendet, ohne dass der Benutzer auf eine Antwort warten muss. Dies ist in Ordnung für asynchrone Prozesse, die möglicherweise mehr Zeit in Anspruch nehmen, als der Benutzer bereit ist, nur eine "OK" -Meldung zu sehen. Aber was ist, wenn wir tatsächlich eine Antwort brauchen? Nehmen wir einige Ergebnisse aus einer komplexen Berechnung an, damit wir es dem Benutzer zeigen können?

Nehmen wir an, wir haben einen zentralisierten Anmelderver (einzelnes Zeichen), der als Authentifizierungsmechanismus aus den übrigen Anwendungen isoliert wird. Die einzige Möglichkeit, diesen Server zu erreichen, ist über Rabbitmq. Wir müssen eine Möglichkeit implementieren, die Anmeldeinformationen an diesen Server zu senden und auf eine Zugriffsantwort zu warten/zu verweigern.

Wir müssen das folgende Muster implementieren:

Schauen wir uns zuerst den Produzenten an:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Betrachten Sie die Methode von RPCSender :: Execute ausführen, bitte beachten Sie, dass der Parameter von $ condentInicals ein Array in Form von ['Benutzername' => 'x', 'Passwort' => 'y'] ist. Auch hier öffnen wir eine neue Verbindung und erstellen wie gewohnt einen Kanal.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Der erste Unterschied ergibt sich aus der Erklärung einer Warteschlange. Beachten Sie zunächst, dass wir das List () -Konstrukt verwenden, um das Ergebnis von $ Channel-> queue_declare () zu fangen. Dies liegt daran, dass wir einen Warteschlangennamen nicht explizit senden, während wir ihn erklären. Deshalb müssen wir herausfinden, wie diese Warteschlange identifiziert wird. Wir interessieren uns nur für das erste Element des Ergebnisarrays, das eine eindeutige Kennung der Warteschlange sein wird (so etwas wie amq.gen-_u0kjvm8HelfZQK9p0z9gg). Die zweite Änderung ist, dass wir diese Warteschlange als exklusiv deklarieren müssen, sodass die Ergebnisse anderer gleichzeitiger Prozesse nicht vermischt werden.

.

Eine weitere große Änderung ist, dass der Produzent auch ein Verbraucher einer Warteschlange sein wird, wenn $ Channel-> Basic_consume () beachten, dass wir den Wert $ callback_queue bereitstellen, den wir während der Deklaration der Warteschlange erhalten haben. Und wie jeder Verbraucher werden wir einen Rückruf erklären, der ausgeführt wird, wenn der Prozess eine Antwort erhält.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

Als nächstes müssen wir eine Korrelations -ID für die Nachricht erstellen, dies ist nichts weiter als eine eindeutige Bezeichnung für jede Nachricht. Im Beispiel verwenden wir die Ausgabe von Uniqid (), aber Sie können den Mechanismus verwenden, den Sie bevorzugen (solange er keine Rassenbedingung erzeugt, muss keine starke, kryptosichere RNG sein).

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Lassen Sie uns nun eine Nachricht erstellen, die wichtige Änderungen im Vergleich zu dem hat, was wir in den ersten beiden Beispielen gewohnt waren. Abgesehen von der Zuweisung einer JSON-kodierten Zeichenfolge, die die Anmeldeinformationen enthält, die wir authentifizieren möchten, müssen wir dem AMQPMessage Constructor ein Array mit zwei definierten Eigenschaften zur Verfügung stellen:

  • correlation_id : Ein Tag für die Nachricht
  • Reply_to : Die Warteschlangenkennung erzeugt, während sie es
  • deklariert hat

Nach der Veröffentlichung der Nachricht werden wir die Antwort bewerten, die zu Beginn leer ist. Während der Antwortwert leer bleibt, warten wir auf eine Antwort aus dem Kanal mit $ Channel-> Wait ();.

Sobald wir eine Antwort vom Kanal erhalten haben, wird die Rückrufmethode aufgerufen (RPCSender :: OnResponse ()). Diese Methode entspricht der empfangenen Korrelations -ID gegen die erzeugte. Wenn sie gleich sind, setzen Sie den Antwortkörper ein, wodurch die While -Schleife gebrochen wird.

Was ist mit dem RPC -Verbraucher? Hier ist es:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

gleiche alte Verbindung und Kanalerstellung :)

Wie bei der Deklaration der Warteschlange hat diese Warteschlange jedoch einen vordefinierten Namen (‘ rpc_queue '). Wir werden die QoS -Parameter definieren, da wir automatische ACKs deaktivieren werden, damit wir mitteilen können, wenn wir die Anmeldeinformationen überprüfen und ein Ergebnis haben.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Die Magie kommt aus dem deklarierten Rückruf. Sobald wir die Anmeldeinformationen authentifiziert haben (ja, ich weiß, dass der Prozess gegen statische Benutzername/Kennwortwerte durchgeführt wird, geht es in diesem Tutorial nicht darum, wie Anmeldeinformationen authentifiziert werden können;)), sondern die Rückgabenachricht mit derselben Korrelations -ID des Produzenten erstellen erstellt. Wir können dies aus der Anforderungsnachricht mit $ req-> get ('correlation_id') extrahieren und diesen Wert genauso übergeben wie im Produzenten.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

Jetzt müssen wir diese Nachricht an die gleiche Warteschlange veröffentlichen, die im Produzenten erstellt wurde (die mit dem "zufälligen" Namen). Wir extrahieren den Namen der Warteschlange mit $ req-> get ('Reply_to') und verwenden ihn als Routing-Schlüssel in Basic_publish ().

Nachdem wir die Nachricht veröffentlicht haben, müssen wir den ACK-Hinweis mit $ req-> lieferung_info ['Kanal']-> basic_ack () an den Liefertag in $ req-> lieferung_info ['lieferung_tag' senden. ] so kann der Produzent aufhören zu warten.

starten Sie erneut einen Hörprozess und Sie sind bereit zu gehen. Sie können sogar Beispiele 2 und 3 kombinieren, um einen Multi-Arbeiter-RPC-Prozess zu haben, um die Authentifizierungsanfragen auszuführen, als nur durch das Aufrühren mehrerer Arbeiter skaliert werden kann.

Es gibt weit mehr zu sagen über Rabbitmq und AMQP, wie virtuelle Hosts, Exchange -Typen, Serververwaltung usw. Sie können hier und auf der Dokumentationsseite weitere Anwendungsmuster (wie Routing, Themen) finden. Es gibt auch ein Befehlszeilen -Tool zum Verwalten von Rabbitmq sowie eine webbasierte Schnittstelle.

Wenn Ihnen diese Tutorial -Serie gefallen hat und mehr über MQ und reale Anwendungsfälle erfahren möchten, teilen Sie uns bitte in den Kommentaren unten mit!

häufig gestellte Fragen (FAQs) zu PHP Rabbitmq Erweiterte Beispiele

Welche Rolle spielt Rabbitmq in PHP? Es spielt eine entscheidende Rolle in PHP -Anwendungen, indem sie es ermöglicht, hohe Lasten und komplexe Aufgaben effizienter zu behandeln. RabbitMQ verwendet das erweiterte Message Queuing Protocol (AMQP), um den Austausch von Nachrichten zwischen verschiedenen Teilen einer Anwendung zu erleichtern. Dies ermöglicht die Entkopplung von Prozessen, wodurch die Anwendung skalierbarer und widerstandsfähiger wird.

Wie installiere ich Rabbitmq für PHP? Ihre Maschine. Dies kann über die offizielle Rabbitmq -Website erfolgen. Nach der Installation des Servers können Sie die PHP AMQP -Erweiterung installieren, die die erforderlichen Funktionen für die Interaktion mit Rabbitmq bietet. Dies kann mit dem PECL -Installationsprogramm mit dem Befehl Pecl install AMQP durchgeführt werden. der AMQPChannel -Klasse. Diese Methode nimmt mehrere Parameter an, einschließlich des Namens des Austauschs, des Typs des Austauschs (Direkte, Thema, Fanout oder Header) und optionale Parameter wie Passiv, langlebig, auto_delete und Argumente.

Wie Sende ich eine Nachricht an eine Rabbitmq -Warteschlange in PHP? der AMQPMessage -Klasse mit dem Nachrichteninhalt. Anschließend können Sie die Basic_Publish -Methode der AMQPChannel -Klasse verwenden, um die Nachricht an die Warteschlange zu senden. Die Basic_Publish -Methode nimmt die Nachricht, den Austausch und den Routing -Schlüssel als Parameter an. Rabbitmq -Warteschlange unter Verwendung der Basic_consume -Methode der AMQPChannel -Klasse. Diese Methode enthält mehrere Parameter, einschließlich des Warteschlangennamens, des Verbraucher -Tags, der NO_Local, der NO_ACK, des exklusiven und einer Rückruffunktion, die ausgeführt wird, wenn eine Nachricht empfangen wird. ? Die PHP AMQP -Erweiterung löst Ausnahmen der AMQPException -Klasse aus, wenn ein Fehler auftritt. Sie können diese Ausnahmen erfassen und sie entsprechend den Anforderungen Ihrer Anwendung bewältigen. Delivery_Mode -Eigenschaft der AMQPMessage -Klasse auf 2.. Dadurch wird Rabbitmq die Nachricht auf der Festplatte speichern, um sicherzustellen, dass sie nicht verloren geht, selbst wenn der Rabbitmq -Server nicht mehr Abstürze oder Neustart.

Wie kann ich vorrangige Warteschlangen in Rabbitmq mit PHP? Wenn Sie dann eine Nachricht senden, können Sie die Prioritätseigenschaft der AMQPMessage -Klasse auf einen Wert zwischen 0 und die maximale Priorität festlegen, die Sie angegeben haben. > RabbitMQ kann für den Remote Procedure Call (RPC) in PHP verwendet werden, indem eine Nachricht mit einer Antwort-zu-Eigenschaft an eine Rückrufwarteschlange gesendet wird. Der Server kann dann die Antwort an die Rückrufwarteschlange senden und der Client kann die Antwort von dort aus konsumieren. Das RabbitMQ-Management-Plugin, das eine webbasierte Schnittstelle zur Überwachung und Verwaltung Ihres RabbitMQ-Servers bietet. Sie können auch die Methoden der AMQPChannel -Klasse verwenden, um Informationen über den Zustand des Kanals zu erhalten, z. B. die Anzahl der nicht anerkannten Nachrichten.

Das obige ist der detaillierte Inhalt vonPHP und Rabbitmq: Fortgeschrittene Beispiele. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn