Heim >Backend-Entwicklung >PHP-Tutorial >PHP und Rabbitmq: Fortgeschrittene Beispiele
In Teil 1 haben wir die Theorie und einen einfachen Anwendungsfall des AMQP -Protokolls in PHP mit Rabbitmq als Broker behandelt. Lassen Sie uns nun in einige fortgeschrittenere Beispiele eintauchen.
Im Beispiel des vorherigen Teils hatten wir einen Produzenten, einen Verbraucher. Wenn der Verbraucher starb, würden sich die Nachrichten weiter in der Warteschlange stapeln, bis der Verbraucher erneut begann. Es würde dann alle Nachrichten nacheinander verarbeiten.
Dies kann in einer gleichzeitigen Benutzerumgebung mit einer angemessenen Anzahl von Anfragen pro Minute weniger als ideal sein. Glücklicherweise ist die Skalierung der Verbraucher super einfach, aber lassen Sie uns ein weiteres Beispiel implementieren.
Nehmen wir an, wir haben einen Rechnungsgenerierungsdienst, in dem die Benutzer nur die Rechnungsnummer bereitstellen müssen, und das System generiert automatisch eine PDF -Datei und eine E -Mail an den Benutzer. Das Generieren und Senden der E -Mail kann noch einige Sekunden dauern, wenn der Server, auf dem der Erzeugungsprozess ausgeführt wird, Ressource Limited ist. Nehmen wir nun an, wir müssen mehrere Transaktionen pro Sekunde unterstützen, wie wir dies erreichen, ohne den Server zu überwältigen?
Wir müssen das folgende Muster implementieren:
Schauen wir uns unsere Produzentenklasse an:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
Die MOTORELTER :: Execute () erhält eine Rechnungsnummer. Als nächstes erstellen wir wie gewohnt eine Verbindung, Kanal und Warteschlange.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Bitte beachten Sie, dass der Konstruktor dieses Mal beim Erstellen des Nachrichtenobjekts einen zweiten Parameter erhält: Array ('lieferung_mode' => 2). In diesem Fall möchten wir angeben, dass die Nachricht nicht verloren gehen sollte, wenn der Rabbitmq -Server abstürzt. Bitte beachten Sie, dass die Warteschlange auch für dauerhaft erklärt werden muss.
Der folgende Code kann verwendet werden, um die Formulardaten zu empfangen und den Produzenten auszuführen:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
Bitte verwenden Sie die Eingabeeinstellungen/Validierung, mit denen Sie sich wohl fühlen.
Die Dinge werden auf der Verbraucherseite ein bisschen interessant:
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Wie gewohnt müssen wir eine Verbindung herstellen, einen Kanal ableiten und eine Warteschlange deklarieren (die Parameter der Warteschlange müssen wie der Produzent gleich sein).
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Um das Verhalten von Work-Verhaltensweisen (Versandnachrichten unter mehreren Prozessen) zu haben, müssen wir die Parameter der Qualität des Dienstes (QoS) mit $ Channel-> Basic_qos () deklarieren:
Als nächstes werden wir mit einem wichtigen Unterschied in den Parametern beginnen. Wir werden die automatischen ACKs ausschalten, da wir dem Rabbitmq -Server mitteilen werden, wenn wir die Meldung beendet haben und bereit sind, eine neue zu erhalten.
Wie schicken wir das ACK? Bitte werfen Sie einen Blick auf die METHODERRECEIVER :: Process () (die als Rückrufmethode deklariert wird, wenn eine Nachricht empfangen wird). Die Aufrufe von generierten Methoden fürPdf () und Sendemail () sind nur Dummy -Methoden, die die Zeit simulieren, die für die Erfüllung beider Aufgaben aufgewendet wird. Der $ msg -Parameter enthält nicht nur die vom Produzenten gesendete Nutzlast, sondern auch Informationen zu den vom Produzenten verwendeten Objekten. Wir können Informationen über den vom Produzent verwendeten Kanal mit $ msg-> lieferung_info ['Kanal'] extrahieren (was der gleiche Objekttyp wie der Kanal ist, den wir für den Verbraucher mit $ Connection-> Kanal () geöffnet haben;). Da wir dem Kanal des Produzenten eine Bestätigung, dass wir den Prozess abgeschlossen haben ordnungsgemäß zu assoziieren, zu welcher Nachricht der ACK gehört.
Wie steuern wir die Arbeiter an? Erstellen Sie einfach eine Datei wie Folgendes und rufen Sie die WorkerReceiver :: listen () auf:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span>
Verwenden Sie nun den PHP -Befehl (z. B. PHP Worker.php oder welcher Namen, den Sie der oben genannten Datei gegeben haben), um den Arbeiter abzubauen. Aber warte, der Zweck war, zwei oder mehr Arbeiter zu haben, nicht wahr? Kein Problem, starten mehr Arbeitnehmer auf die gleiche Weise, um mehrere Prozesse derselben Datei zu haben, und Rabbitmq registriert die Verbraucher und vertreibt die Arbeiten gemäß den QoS -Parametern.
Bisher haben wir Nachrichten an den Rabbitmq -Server gesendet, ohne dass der Benutzer auf eine Antwort warten muss. Dies ist in Ordnung für asynchrone Prozesse, die möglicherweise mehr Zeit in Anspruch nehmen, als der Benutzer bereit ist, nur eine "OK" -Meldung zu sehen. Aber was ist, wenn wir tatsächlich eine Antwort brauchen? Nehmen wir einige Ergebnisse aus einer komplexen Berechnung an, damit wir es dem Benutzer zeigen können?
Nehmen wir an, wir haben einen zentralisierten Anmelderver (einzelnes Zeichen), der als Authentifizierungsmechanismus aus den übrigen Anwendungen isoliert wird. Die einzige Möglichkeit, diesen Server zu erreichen, ist über Rabbitmq. Wir müssen eine Möglichkeit implementieren, die Anmeldeinformationen an diesen Server zu senden und auf eine Zugriffsantwort zu warten/zu verweigern.
Wir müssen das folgende Muster implementieren:
Schauen wir uns zuerst den Produzenten an:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
Betrachten Sie die Methode von RPCSender :: Execute ausführen, bitte beachten Sie, dass der Parameter von $ condentInicals ein Array in Form von ['Benutzername' => 'x', 'Passwort' => 'y'] ist. Auch hier öffnen wir eine neue Verbindung und erstellen wie gewohnt einen Kanal.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Der erste Unterschied ergibt sich aus der Erklärung einer Warteschlange. Beachten Sie zunächst, dass wir das List () -Konstrukt verwenden, um das Ergebnis von $ Channel-> queue_declare () zu fangen. Dies liegt daran, dass wir einen Warteschlangennamen nicht explizit senden, während wir ihn erklären. Deshalb müssen wir herausfinden, wie diese Warteschlange identifiziert wird. Wir interessieren uns nur für das erste Element des Ergebnisarrays, das eine eindeutige Kennung der Warteschlange sein wird (so etwas wie amq.gen-_u0kjvm8HelfZQK9p0z9gg). Die zweite Änderung ist, dass wir diese Warteschlange als exklusiv deklarieren müssen, sodass die Ergebnisse anderer gleichzeitiger Prozesse nicht vermischt werden.
.Eine weitere große Änderung ist, dass der Produzent auch ein Verbraucher einer Warteschlange sein wird, wenn $ Channel-> Basic_consume () beachten, dass wir den Wert $ callback_queue bereitstellen, den wir während der Deklaration der Warteschlange erhalten haben. Und wie jeder Verbraucher werden wir einen Rückruf erklären, der ausgeführt wird, wenn der Prozess eine Antwort erhält.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Als nächstes müssen wir eine Korrelations -ID für die Nachricht erstellen, dies ist nichts weiter als eine eindeutige Bezeichnung für jede Nachricht. Im Beispiel verwenden wir die Ausgabe von Uniqid (), aber Sie können den Mechanismus verwenden, den Sie bevorzugen (solange er keine Rassenbedingung erzeugt, muss keine starke, kryptosichere RNG sein).
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span>
Lassen Sie uns nun eine Nachricht erstellen, die wichtige Änderungen im Vergleich zu dem hat, was wir in den ersten beiden Beispielen gewohnt waren. Abgesehen von der Zuweisung einer JSON-kodierten Zeichenfolge, die die Anmeldeinformationen enthält, die wir authentifizieren möchten, müssen wir dem AMQPMessage Constructor ein Array mit zwei definierten Eigenschaften zur Verfügung stellen:
Nach der Veröffentlichung der Nachricht werden wir die Antwort bewerten, die zu Beginn leer ist. Während der Antwortwert leer bleibt, warten wir auf eine Antwort aus dem Kanal mit $ Channel-> Wait ();.
Sobald wir eine Antwort vom Kanal erhalten haben, wird die Rückrufmethode aufgerufen (RPCSender :: OnResponse ()). Diese Methode entspricht der empfangenen Korrelations -ID gegen die erzeugte. Wenn sie gleich sind, setzen Sie den Antwortkörper ein, wodurch die While -Schleife gebrochen wird.
Was ist mit dem RPC -Verbraucher? Hier ist es:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
gleiche alte Verbindung und Kanalerstellung :)
Wie bei der Deklaration der Warteschlange hat diese Warteschlange jedoch einen vordefinierten Namen (‘ rpc_queue '). Wir werden die QoS -Parameter definieren, da wir automatische ACKs deaktivieren werden, damit wir mitteilen können, wenn wir die Anmeldeinformationen überprüfen und ein Ergebnis haben.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Die Magie kommt aus dem deklarierten Rückruf. Sobald wir die Anmeldeinformationen authentifiziert haben (ja, ich weiß, dass der Prozess gegen statische Benutzername/Kennwortwerte durchgeführt wird, geht es in diesem Tutorial nicht darum, wie Anmeldeinformationen authentifiziert werden können;)), sondern die Rückgabenachricht mit derselben Korrelations -ID des Produzenten erstellen erstellt. Wir können dies aus der Anforderungsnachricht mit $ req-> get ('correlation_id') extrahieren und diesen Wert genauso übergeben wie im Produzenten.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Jetzt müssen wir diese Nachricht an die gleiche Warteschlange veröffentlichen, die im Produzenten erstellt wurde (die mit dem "zufälligen" Namen). Wir extrahieren den Namen der Warteschlange mit $ req-> get ('Reply_to') und verwenden ihn als Routing-Schlüssel in Basic_publish ().
Nachdem wir die Nachricht veröffentlicht haben, müssen wir den ACK-Hinweis mit $ req-> lieferung_info ['Kanal']-> basic_ack () an den Liefertag in $ req-> lieferung_info ['lieferung_tag' senden. ] so kann der Produzent aufhören zu warten.
starten Sie erneut einen Hörprozess und Sie sind bereit zu gehen. Sie können sogar Beispiele 2 und 3 kombinieren, um einen Multi-Arbeiter-RPC-Prozess zu haben, um die Authentifizierungsanfragen auszuführen, als nur durch das Aufrühren mehrerer Arbeiter skaliert werden kann.
Es gibt weit mehr zu sagen über Rabbitmq und AMQP, wie virtuelle Hosts, Exchange -Typen, Serververwaltung usw. Sie können hier und auf der Dokumentationsseite weitere Anwendungsmuster (wie Routing, Themen) finden. Es gibt auch ein Befehlszeilen -Tool zum Verwalten von Rabbitmq sowie eine webbasierte Schnittstelle.
Wenn Ihnen diese Tutorial -Serie gefallen hat und mehr über MQ und reale Anwendungsfälle erfahren möchten, teilen Sie uns bitte in den Kommentaren unten mit!
Wie installiere ich Rabbitmq für PHP? Ihre Maschine. Dies kann über die offizielle Rabbitmq -Website erfolgen. Nach der Installation des Servers können Sie die PHP AMQP -Erweiterung installieren, die die erforderlichen Funktionen für die Interaktion mit Rabbitmq bietet. Dies kann mit dem PECL -Installationsprogramm mit dem Befehl Pecl install AMQP durchgeführt werden. der AMQPChannel -Klasse. Diese Methode nimmt mehrere Parameter an, einschließlich des Namens des Austauschs, des Typs des Austauschs (Direkte, Thema, Fanout oder Header) und optionale Parameter wie Passiv, langlebig, auto_delete und Argumente.
Das obige ist der detaillierte Inhalt vonPHP und Rabbitmq: Fortgeschrittene Beispiele. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!