Rumah >pembangunan bahagian belakang >tutorial php >PHP dan Rabbitmq: Contoh Lanjutan

PHP dan Rabbitmq: Contoh Lanjutan

Jennifer Aniston
Jennifer Anistonasal
2025-02-19 09:44:12533semak imbas

3

Dalam Bahagian 1 kita meliputi teori dan kes penggunaan mudah protokol AMQP dalam PHP dengan RabbitMQ sebagai broker. Sekarang, mari kita menyelam beberapa contoh yang lebih maju. PHP and RabbitMQ: Advanced Examples

Takeaways Key

PHP dan Rabbitmq: Contoh Lanjutan

Menggunakan PHP dan RabbitMQ untuk memproses data secara asynchronously di kalangan pekerja berganda, meningkatkan kecekapan dalam persekitaran transaksi tinggi.

Melaksanakan mesej berterusan dalam RabbitMQ untuk mengelakkan kehilangan data semasa kemalangan pelayan dengan menetapkan mesej 'Delivery_mode' kepada 2 dan mengisytiharkan beratur sebagai tahan lama.

Gunakan tetapan Kualiti Perkhidmatan (QoS) di RabbitMQ untuk mengawal pengedaran mesej di kalangan pekerja, memastikan tiada pekerja tunggal yang terharu.
  • Gunakan RabbitMQ untuk RPC (Panggilan Prosedur Jauh) dengan menghantar mesej yang memerlukan respons, berguna untuk tugas seperti Pengesahan Pengguna.
  • Sediakan beratur eksklusif, sementara untuk respons RPC untuk memastikan mesej diarahkan dengan betul dan selamat antara klien dan pelayan.
  • Mengendalikan respons RPC dengan berkesan dengan memadankan 'correlation_id' dalam respons dengan permintaan, memastikan pengendalian dan pemprosesan yang betul.
  • Contoh 1: Hantar permintaan untuk memproses data secara asynchronously di kalangan beberapa pekerja
  • Dalam contoh bahagian sebelumnya, kami mempunyai satu pengeluar, satu pengguna. Sekiranya pengguna mati, mesej akan terus ditumpuk dalam barisan sehingga pengguna bermula lagi. Ia kemudiannya akan memproses semua mesej, satu demi satu.
  • Ini boleh kurang daripada ideal dalam persekitaran pengguna serentak dengan jumlah permintaan yang saksama seminit. Nasib baik, skala pengguna sangat mudah, tetapi mari kita melaksanakan contoh lain.

Katakan kami mempunyai perkhidmatan penjanaan invois, di mana pengguna hanya perlu memberikan nombor invois, dan sistem secara automatik akan menghasilkan fail PDF dan menghantar e -mel kepada pengguna. Menjana dan menghantar e -mel boleh mengambil masa beberapa saat jika pelayan di mana proses generasi berjalan adalah sumber terhad. Sekarang katakan kita dikehendaki menyokong beberapa urus niaga sesaat, bagaimana kita dapat mencapai ini tanpa melampaui pelayan?

kita perlu melaksanakan corak berikut:

Mari lihat kelas pengeluar kami:

Kaedah Workersender :: Execute () akan menerima nombor invois. Seterusnya kami membuat sambungan, saluran dan barisan seperti biasa.

Sila perhatikan bahawa kali ini, sambil membuat objek mesej, pembina menerima parameter kedua: array ('delivery_mode' => 2). Dalam kes ini kita mahu menyatakan bahawa mesej itu tidak boleh hilang jika pelayan RabbitMQ terhempas. Harap maklum bahawa agar ini berfungsi, barisan itu harus diisytiharkan tahan lama juga.

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Kod berikut boleh digunakan untuk menerima data borang dan melaksanakan pengeluar:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

sila gunakan mana -mana sanitisasi input/pengesahan yang anda rasa selesa dengan.

perkara mendapat sedikit menarik di sisi pengguna:

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Seperti biasa, kita perlu membuat sambungan, memperoleh saluran dan mengisytiharkan barisan (parameter barisan harus sama dengan pengeluar).

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

Untuk mempunyai tingkah laku pekerja (menghantar mesej di kalangan beberapa proses), kita perlu mengisytiharkan parameter kualiti perkhidmatan (QoS) dengan $ channel-> basic_qos ():

  • saiz prefetch : tiada had tertentu, kita boleh mempunyai seberapa banyak pekerja yang kita perlukan
  • Prefetch Count : Berapa banyak mesej yang akan diambil setiap pekerja sebelum menghantar semula pengakuan. Ini akan menjadikan pekerja memproses mesej 1 pada satu masa.
  • Global : null bermaksud bahawa tetapan di atas akan dikenakan kepada pengguna ini sahaja

Seterusnya, kita akan mula memakan, dengan perbezaan utama dalam parameter. Kami akan mematikan ACK automatik, kerana kami akan memberitahu pelayan RabbitMQ apabila kami selesai memproses mesej dan bersedia untuk menerima yang baru.

Sekarang, bagaimana kita menghantar ACK itu? Sila lihat kaedah WorkerReceiver :: Process () (yang diisytiharkan sebagai kaedah panggil balik apabila mesej diterima). Panggilan ke GeneratedPDF () dan SendeMail () kaedah hanyalah kaedah dummy yang akan mensimulasikan masa yang dihabiskan untuk mencapai kedua -dua tugas. Parameter $ MSG bukan sahaja mengandungi muatan yang dihantar dari pengeluar, ia juga mengandungi maklumat mengenai objek yang digunakan oleh pengeluar. Kami boleh mengekstrak maklumat mengenai saluran yang digunakan oleh pengeluar dengan $ msg-> delivery_info ['channel'] (yang merupakan jenis objek yang sama ke saluran yang kami buka untuk pengguna dengan $ sambungan-> saluran ();). Oleh kerana kita perlu menghantar saluran pengeluar pengakuan bahawa kita telah menyelesaikan proses itu, kita akan menggunakan kaedah Basic_ack (), menghantar sebagai parameter tag penghantaran ($ msg-> delivery_info ['delivery_tag']) rabbitmq yang dihasilkan secara automatik mengikut urutan yang teratur dalam keadaan teratur secara automatik dihasilkan mengikut urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam rangka. untuk mengaitkan dengan betul ke mana mesej ACK dimiliki.

bagaimana kita membakar pekerja? Cukup buat fail seperti yang berikut, memohon Kaedah WorkerReceiver :: Dengar ():

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Sekarang gunakan arahan PHP (mis. PHP Worker.php atau mana -mana nama yang anda berikan kepada fail di atas) untuk membakar pekerja. Tetapi tunggu, tujuannya adalah untuk mempunyai dua atau lebih pekerja, bukan? Tidak ada masalah, membakar lebih banyak pekerja dengan cara yang sama untuk mempunyai pelbagai proses fail yang sama, dan RabbitMQ akan mendaftarkan pengguna dan mengedarkan kerja di antara mereka mengikut parameter QoS.

Contoh 2: Hantar permintaan RPC dan mengharapkan balasan

Setakat ini, kami telah menghantar mesej ke pelayan RabbitMQ tanpa pengguna perlu menunggu jawapan. Ini adalah OK untuk proses asynchronous yang mungkin mengambil lebih banyak masa daripada pengguna bersedia untuk menghabiskan hanya untuk melihat mesej 'OK'. Tetapi bagaimana jika kita benar -benar memerlukan jawapan? Katakan beberapa hasil daripada pengiraan yang kompleks, jadi kita dapat menunjukkannya kepada pengguna?

katakan kami mempunyai pelayan login berpusat (tanda tunggal) yang akan berfungsi sebagai mekanisme pengesahan yang diasingkan dari seluruh aplikasi kami. Satu -satunya cara untuk mencapai pelayan ini adalah melalui RabbitMQ. Kami perlu melaksanakan cara untuk menghantar kelayakan log masuk ke pelayan ini dan menunggu geran/menafikan respons akses.

kita perlu melaksanakan corak berikut:

Seperti biasa, mari kita lihat pengeluar terlebih dahulu:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Melihat kaedah RPCSender :: Execute, sila sedar bahawa parameter kelayakan $ adalah array dalam bentuk ['nama pengguna' => 'x', 'kata laluan' => 'y']. Sekali lagi, kami membuka sambungan baru dan membuat saluran seperti biasa.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>

Perbezaan pertama datang dari mengisytiharkan barisan. Notis pertama bahawa kami menggunakan senarai () membina untuk menangkap hasil dari $ saluran-> queue_declare (). Ini kerana kita tidak secara jelas menghantar nama giliran sambil mengisytiharkannya supaya kita perlu mengetahui bagaimana barisan ini dikenalpasti. Kami hanya berminat dengan elemen pertama array hasil, yang akan menjadi pengenal unik barisan (sesuatu seperti amq.gen-_u0kjvm8helfzqk9p0z9gg). Perubahan kedua ialah kita perlu mengisytiharkan barisan ini sebagai eksklusif, jadi tidak ada campuran dalam hasil dari proses serentak lain.

Satu lagi perubahan besar ialah pengeluar akan menjadi pengguna giliran juga, apabila melaksanakan $ channel-> basic_consume () sila perhatikan bahawa kami menyediakan nilai $ callback_queue yang kami dapat semasa mengisytiharkan barisan. Dan seperti setiap pengguna, kami akan mengisytiharkan panggilan balik untuk dilaksanakan apabila proses menerima respons.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>

Seterusnya, kita perlu membuat ID korelasi untuk mesej, ini tidak lebih daripada pengecam unik untuk setiap mesej. Contohnya, kami menggunakan output Uniqid (), tetapi anda boleh menggunakan mana-mana mekanisme yang anda suka (selagi ia tidak mewujudkan keadaan perlumbaan, tidak perlu menjadi RNG yang kuat, crypto-selamat).

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Sekarang mari kita buat mesej, yang mempunyai perubahan penting berbanding dengan apa yang kita gunakan dalam 2 contoh pertama. Selain daripada memberikan rentetan yang dikodkan JSON yang mengandungi kelayakan yang kami ingin mengesahkan, kami perlu menyediakan kepada pembina amqpmessage array dengan dua sifat yang ditakrifkan:

  • correlation_id : tag untuk mesej
  • Reply_to : Pengenal pasti barisan yang dihasilkan semasa mengisytiharkannya

Selepas menerbitkan mesej, kami akan menilai respons, yang akan kosong pada mulanya. Walaupun nilai tindak balas kekal kosong, kami akan menunggu respons dari saluran dengan $ saluran-> tunggu ();.

Sebaik sahaja kami menerima respons dari saluran, kaedah panggil balik akan dipanggil (RPCSender :: onResponse ()). Kaedah ini akan sepadan dengan ID korelasi yang diterima terhadap yang dihasilkan, dan jika ia sama, akan menetapkan badan tindak balas, dengan itu memecahkan gelung sementara.

bagaimana dengan pengguna RPC? Di sini ia adalah:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>

Sambungan lama dan penciptaan saluran yang sama :)

Sama seperti mengisytiharkan barisan, namun barisan ini akan mempunyai nama yang telah ditetapkan (' rpc_queue '). Kami akan menentukan parameter QoS kerana kami akan menyahaktifkan ACK automatik, jadi kami dapat memberitahu apabila kami selesai mengesahkan kelayakan dan mempunyai hasil.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
Keajaiban datang dari dalam panggilan balik yang diisytiharkan. Sebaik sahaja kita selesai mengesahkan kelayakan (ya, saya tahu proses itu dilakukan terhadap nilai nama pengguna/kata laluan statik, tutorial ini bukan tentang cara mengesahkan kelayakan;)), kita perlu membuat mesej kembali dengan id korelasi yang sama dicipta. Kami boleh mengekstrak ini dari mesej permintaan dengan $ req-> get ('correlation_id'), lulus nilai ini dengan cara yang sama yang kami lakukan dalam pengeluar.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Sekarang kita perlu menerbitkan mesej ini ke barisan yang sama yang dicipta dalam pengeluar (yang mempunyai nama 'rawak'). Kami mengekstrak nama barisan dengan $ req-> get ('Reply_to') dan menggunakannya sebagai kunci penghalaan dalam asas_publish ().

Sebaik sahaja kami menerbitkan mesej itu, kami perlu menghantar notis ACK ke saluran dengan $ req-> delivery_info ['channel']-> basic_ack (), menggunakan tag penghantaran dalam $ req-> delivery_info ['delivery_tag' ] jadi pengeluar boleh berhenti menunggu.

Sekali lagi, api proses mendengar dan anda sudah bersedia untuk pergi. Anda juga boleh menggabungkan Contoh 2 dan 3 untuk mempunyai proses RPC multi-pekerja untuk melaksanakan permintaan pengesahan daripada yang boleh diperkecil hanya dengan menembak beberapa pekerja.

Terdapat lebih banyak lagi yang boleh dikatakan mengenai RabbitMQ dan AMQP, seperti tuan rumah maya, jenis pertukaran, pentadbiran pelayan, dan lain -lain ... anda boleh mencari lebih banyak corak aplikasi (seperti penghalaan, topik) di sini dan di halaman dokumentasi. Terdapat juga alat baris arahan untuk menguruskan RabbitMQ, serta antara muka berasaskan web.

Jika anda menyukai siri tutorial ini dan ingin melihat lebih lanjut mengenai MQ dan kes penggunaan dunia yang lebih nyata, sila beritahu kami di komen di bawah!

Soalan Lazim (Soalan Lazim) Mengenai PHP Rabbitmq Contoh Lanjutan

Apakah peranan Rabbitmq dalam PHP? Ia memainkan peranan penting dalam aplikasi PHP dengan membolehkan mereka mengendalikan beban tinggi dan tugas yang kompleks dengan lebih cekap. RabbitMQ menggunakan Protokol Beratur Mesej Lanjutan (AMQP) untuk memudahkan pertukaran mesej antara bahagian -bahagian yang berlainan aplikasi. Ini membolehkan proses decoupling, menjadikan aplikasi lebih berskala dan berdaya tahan. mesin anda. Ini boleh dilakukan melalui laman web RabbitMQ rasmi. Selepas pelayan dipasang, anda boleh memasang pelanjutan PHP AMQP, yang menyediakan fungsi yang diperlukan untuk berinteraksi dengan RabbitMQ. Ini boleh dilakukan dengan menggunakan pemasang pecl dengan perintah PECL memasang amqp.

bagaimana saya boleh membuat pertukaran rabbitmq dalam php?

daripada kelas amqpchannel. Kaedah ini mengambil beberapa parameter, termasuk nama pertukaran, jenis pertukaran (langsung, topik, fanout, atau tajuk), dan parameter pilihan seperti pasif, tahan lama, auto_delete, dan argumen. Adakah saya menghantar mesej ke barisan rabbitmq dalam php?

Kelas amqpmessage dengan kandungan mesej. Kemudian, anda boleh menggunakan kaedah Basic_publish kelas amqpChannel untuk menghantar mesej ke barisan. Kaedah Basic_publish mengambil mesej, pertukaran, dan kunci penghalaan sebagai parameter. Giliran Rabbitmq menggunakan kaedah Basic_consume kelas AmqpChannel. Kaedah ini mengambil beberapa parameter, termasuk nama giliran, tag pengguna, no_local, no_ack, eksklusif, dan fungsi panggilan balik yang akan dilaksanakan apabila mesej diterima. ?

Pengendalian ralat di RabbitMQ dengan PHP boleh dilakukan dengan menggunakan blok percubaan. Pelanjutan PHP AMQP melemparkan pengecualian kelas AMQPException apabila ralat berlaku. Anda boleh menangkap pengecualian ini dan mengendalikannya mengikut keperluan aplikasi anda. Delivery_mode harta kelas amqpmessage ke 2. Ini akan menjadikan Rabbitmq menyimpan mesej pada cakera, memastikan bahawa ia tidak akan hilang walaupun pelayan Rabbitmq terhempas atau Dimulakan semula.

Bagaimanakah saya boleh melaksanakan beratur keutamaan dalam RabbitMQ dengan PHP? Kemudian, apabila menghantar mesej, anda boleh menetapkan sifat keutamaan kelas AMQPMessage kepada nilai antara 0 dan keutamaan maksimum yang anda tentukan. > RabbitMQ boleh digunakan untuk Panggilan Prosedur Jauh (RPC) dalam PHP dengan menghantar mesej dengan balasan-ke harta yang ditetapkan ke giliran panggilan balik. Pelayan kemudian boleh menghantar respons kepada giliran panggil balik, dan pelanggan boleh mengambil respons dari sana. Plugin Pengurusan RabbitMQ, yang menyediakan antara muka berasaskan web untuk memantau dan menguruskan pelayan RabbitMQ anda. Anda juga boleh menggunakan kaedah kelas AmqpChannel untuk mendapatkan maklumat mengenai keadaan saluran, seperti bilangan mesej yang tidak diketahui.

Atas ialah kandungan terperinci PHP dan Rabbitmq: Contoh Lanjutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn