Rumah >pembangunan bahagian belakang >tutorial php >PHP dan Rabbitmq: Contoh Lanjutan
3
Dalam Bahagian 1 kita meliputi teori dan kes penggunaan mudah protokol AMQP dalam PHP dengan RabbitMQ sebagai broker. Sekarang, mari kita menyelam beberapa contoh yang lebih maju.
Melaksanakan mesej berterusan dalam RabbitMQ untuk mengelakkan kehilangan data semasa kemalangan pelayan dengan menetapkan mesej 'Delivery_mode' kepada 2 dan mengisytiharkan beratur sebagai tahan lama.
kita perlu melaksanakan corak berikut:
Mari lihat kelas pengeluar kami:
Kaedah Workersender :: Execute () akan menerima nombor invois. Seterusnya kami membuat sambungan, saluran dan barisan seperti biasa.
Sila perhatikan bahawa kali ini, sambil membuat objek mesej, pembina menerima parameter kedua: array ('delivery_mode' => 2). Dalam kes ini kita mahu menyatakan bahawa mesej itu tidak boleh hilang jika pelayan RabbitMQ terhempas. Harap maklum bahawa agar ini berfungsi, barisan itu harus diisytiharkan tahan lama juga.
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>Kod berikut boleh digunakan untuk menerima data borang dan melaksanakan pengeluar:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
sila gunakan mana -mana sanitisasi input/pengesahan yang anda rasa selesa dengan.
perkara mendapat sedikit menarik di sisi pengguna:
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Seperti biasa, kita perlu membuat sambungan, memperoleh saluran dan mengisytiharkan barisan (parameter barisan harus sama dengan pengeluar).
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Untuk mempunyai tingkah laku pekerja (menghantar mesej di kalangan beberapa proses), kita perlu mengisytiharkan parameter kualiti perkhidmatan (QoS) dengan $ channel-> basic_qos ():
Seterusnya, kita akan mula memakan, dengan perbezaan utama dalam parameter. Kami akan mematikan ACK automatik, kerana kami akan memberitahu pelayan RabbitMQ apabila kami selesai memproses mesej dan bersedia untuk menerima yang baru.
Sekarang, bagaimana kita menghantar ACK itu? Sila lihat kaedah WorkerReceiver :: Process () (yang diisytiharkan sebagai kaedah panggil balik apabila mesej diterima). Panggilan ke GeneratedPDF () dan SendeMail () kaedah hanyalah kaedah dummy yang akan mensimulasikan masa yang dihabiskan untuk mencapai kedua -dua tugas. Parameter $ MSG bukan sahaja mengandungi muatan yang dihantar dari pengeluar, ia juga mengandungi maklumat mengenai objek yang digunakan oleh pengeluar. Kami boleh mengekstrak maklumat mengenai saluran yang digunakan oleh pengeluar dengan $ msg-> delivery_info ['channel'] (yang merupakan jenis objek yang sama ke saluran yang kami buka untuk pengguna dengan $ sambungan-> saluran ();). Oleh kerana kita perlu menghantar saluran pengeluar pengakuan bahawa kita telah menyelesaikan proses itu, kita akan menggunakan kaedah Basic_ack (), menghantar sebagai parameter tag penghantaran ($ msg-> delivery_info ['delivery_tag']) rabbitmq yang dihasilkan secara automatik mengikut urutan yang teratur dalam keadaan teratur secara automatik dihasilkan mengikut urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam rangka. untuk mengaitkan dengan betul ke mana mesej ACK dimiliki.
bagaimana kita membakar pekerja? Cukup buat fail seperti yang berikut, memohon Kaedah WorkerReceiver :: Dengar ():
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span>
Sekarang gunakan arahan PHP (mis. PHP Worker.php atau mana -mana nama yang anda berikan kepada fail di atas) untuk membakar pekerja. Tetapi tunggu, tujuannya adalah untuk mempunyai dua atau lebih pekerja, bukan? Tidak ada masalah, membakar lebih banyak pekerja dengan cara yang sama untuk mempunyai pelbagai proses fail yang sama, dan RabbitMQ akan mendaftarkan pengguna dan mengedarkan kerja di antara mereka mengikut parameter QoS.
Setakat ini, kami telah menghantar mesej ke pelayan RabbitMQ tanpa pengguna perlu menunggu jawapan. Ini adalah OK untuk proses asynchronous yang mungkin mengambil lebih banyak masa daripada pengguna bersedia untuk menghabiskan hanya untuk melihat mesej 'OK'. Tetapi bagaimana jika kita benar -benar memerlukan jawapan? Katakan beberapa hasil daripada pengiraan yang kompleks, jadi kita dapat menunjukkannya kepada pengguna?
katakan kami mempunyai pelayan login berpusat (tanda tunggal) yang akan berfungsi sebagai mekanisme pengesahan yang diasingkan dari seluruh aplikasi kami. Satu -satunya cara untuk mencapai pelayan ini adalah melalui RabbitMQ. Kami perlu melaksanakan cara untuk menghantar kelayakan log masuk ke pelayan ini dan menunggu geran/menafikan respons akses.
kita perlu melaksanakan corak berikut:
Seperti biasa, mari kita lihat pengeluar terlebih dahulu:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
Melihat kaedah RPCSender :: Execute, sila sedar bahawa parameter kelayakan $ adalah array dalam bentuk ['nama pengguna' => 'x', 'kata laluan' => 'y']. Sekali lagi, kami membuka sambungan baru dan membuat saluran seperti biasa.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
Perbezaan pertama datang dari mengisytiharkan barisan. Notis pertama bahawa kami menggunakan senarai () membina untuk menangkap hasil dari $ saluran-> queue_declare (). Ini kerana kita tidak secara jelas menghantar nama giliran sambil mengisytiharkannya supaya kita perlu mengetahui bagaimana barisan ini dikenalpasti. Kami hanya berminat dengan elemen pertama array hasil, yang akan menjadi pengenal unik barisan (sesuatu seperti amq.gen-_u0kjvm8helfzqk9p0z9gg). Perubahan kedua ialah kita perlu mengisytiharkan barisan ini sebagai eksklusif, jadi tidak ada campuran dalam hasil dari proses serentak lain.
Satu lagi perubahan besar ialah pengeluar akan menjadi pengguna giliran juga, apabila melaksanakan $ channel-> basic_consume () sila perhatikan bahawa kami menyediakan nilai $ callback_queue yang kami dapat semasa mengisytiharkan barisan. Dan seperti setiap pengguna, kami akan mengisytiharkan panggilan balik untuk dilaksanakan apabila proses menerima respons.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Seterusnya, kita perlu membuat ID korelasi untuk mesej, ini tidak lebih daripada pengecam unik untuk setiap mesej. Contohnya, kami menggunakan output Uniqid (), tetapi anda boleh menggunakan mana-mana mekanisme yang anda suka (selagi ia tidak mewujudkan keadaan perlumbaan, tidak perlu menjadi RNG yang kuat, crypto-selamat).
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span>Sekarang mari kita buat mesej, yang mempunyai perubahan penting berbanding dengan apa yang kita gunakan dalam 2 contoh pertama. Selain daripada memberikan rentetan yang dikodkan JSON yang mengandungi kelayakan yang kami ingin mengesahkan, kami perlu menyediakan kepada pembina amqpmessage array dengan dua sifat yang ditakrifkan:
Selepas menerbitkan mesej, kami akan menilai respons, yang akan kosong pada mulanya. Walaupun nilai tindak balas kekal kosong, kami akan menunggu respons dari saluran dengan $ saluran-> tunggu ();.
Sebaik sahaja kami menerima respons dari saluran, kaedah panggil balik akan dipanggil (RPCSender :: onResponse ()). Kaedah ini akan sepadan dengan ID korelasi yang diterima terhadap yang dihasilkan, dan jika ia sama, akan menetapkan badan tindak balas, dengan itu memecahkan gelung sementara.
bagaimana dengan pengguna RPC? Di sini ia adalah:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
Sambungan lama dan penciptaan saluran yang sama :)
Sama seperti mengisytiharkan barisan, namun barisan ini akan mempunyai nama yang telah ditetapkan (' rpc_queue '). Kami akan menentukan parameter QoS kerana kami akan menyahaktifkan ACK automatik, jadi kami dapat memberitahu apabila kami selesai mengesahkan kelayakan dan mempunyai hasil.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>Keajaiban datang dari dalam panggilan balik yang diisytiharkan. Sebaik sahaja kita selesai mengesahkan kelayakan (ya, saya tahu proses itu dilakukan terhadap nilai nama pengguna/kata laluan statik, tutorial ini bukan tentang cara mengesahkan kelayakan;)), kita perlu membuat mesej kembali dengan id korelasi yang sama dicipta. Kami boleh mengekstrak ini dari mesej permintaan dengan $ req-> get ('correlation_id'), lulus nilai ini dengan cara yang sama yang kami lakukan dalam pengeluar.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>Sekarang kita perlu menerbitkan mesej ini ke barisan yang sama yang dicipta dalam pengeluar (yang mempunyai nama 'rawak'). Kami mengekstrak nama barisan dengan $ req-> get ('Reply_to') dan menggunakannya sebagai kunci penghalaan dalam asas_publish ().
Sebaik sahaja kami menerbitkan mesej itu, kami perlu menghantar notis ACK ke saluran dengan $ req-> delivery_info ['channel']-> basic_ack (), menggunakan tag penghantaran dalam $ req-> delivery_info ['delivery_tag' ] jadi pengeluar boleh berhenti menunggu.
Sekali lagi, api proses mendengar dan anda sudah bersedia untuk pergi. Anda juga boleh menggabungkan Contoh 2 dan 3 untuk mempunyai proses RPC multi-pekerja untuk melaksanakan permintaan pengesahan daripada yang boleh diperkecil hanya dengan menembak beberapa pekerja.
Terdapat lebih banyak lagi yang boleh dikatakan mengenai RabbitMQ dan AMQP, seperti tuan rumah maya, jenis pertukaran, pentadbiran pelayan, dan lain -lain ... anda boleh mencari lebih banyak corak aplikasi (seperti penghalaan, topik) di sini dan di halaman dokumentasi. Terdapat juga alat baris arahan untuk menguruskan RabbitMQ, serta antara muka berasaskan web.
Jika anda menyukai siri tutorial ini dan ingin melihat lebih lanjut mengenai MQ dan kes penggunaan dunia yang lebih nyata, sila beritahu kami di komen di bawah!
daripada kelas amqpchannel. Kaedah ini mengambil beberapa parameter, termasuk nama pertukaran, jenis pertukaran (langsung, topik, fanout, atau tajuk), dan parameter pilihan seperti pasif, tahan lama, auto_delete, dan argumen. Adakah saya menghantar mesej ke barisan rabbitmq dalam php?
Atas ialah kandungan terperinci PHP dan Rabbitmq: Contoh Lanjutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!