3
Dalam Bahagian 1 kita meliputi teori dan kes penggunaan mudah protokol AMQP dalam PHP dengan RabbitMQ sebagai broker. Sekarang, mari kita menyelam beberapa contoh yang lebih maju.
Melaksanakan mesej berterusan dalam RabbitMQ untuk mengelakkan kehilangan data semasa kemalangan pelayan dengan menetapkan mesej 'Delivery_mode' kepada 2 dan mengisytiharkan beratur sebagai tahan lama.
Gunakan tetapan Kualiti Perkhidmatan (QoS) di RabbitMQ untuk mengawal pengedaran mesej di kalangan pekerja, memastikan tiada pekerja tunggal yang terharu.
- Gunakan RabbitMQ untuk RPC (Panggilan Prosedur Jauh) dengan menghantar mesej yang memerlukan respons, berguna untuk tugas seperti Pengesahan Pengguna.
- Sediakan beratur eksklusif, sementara untuk respons RPC untuk memastikan mesej diarahkan dengan betul dan selamat antara klien dan pelayan.
- Mengendalikan respons RPC dengan berkesan dengan memadankan 'correlation_id' dalam respons dengan permintaan, memastikan pengendalian dan pemprosesan yang betul.
- Contoh 1: Hantar permintaan untuk memproses data secara asynchronously di kalangan beberapa pekerja
- Dalam contoh bahagian sebelumnya, kami mempunyai satu pengeluar, satu pengguna. Sekiranya pengguna mati, mesej akan terus ditumpuk dalam barisan sehingga pengguna bermula lagi. Ia kemudiannya akan memproses semua mesej, satu demi satu.
- Ini boleh kurang daripada ideal dalam persekitaran pengguna serentak dengan jumlah permintaan yang saksama seminit. Nasib baik, skala pengguna sangat mudah, tetapi mari kita melaksanakan contoh lain.
Katakan kami mempunyai perkhidmatan penjanaan invois, di mana pengguna hanya perlu memberikan nombor invois, dan sistem secara automatik akan menghasilkan fail PDF dan menghantar e -mel kepada pengguna. Menjana dan menghantar e -mel boleh mengambil masa beberapa saat jika pelayan di mana proses generasi berjalan adalah sumber terhad. Sekarang katakan kita dikehendaki menyokong beberapa urus niaga sesaat, bagaimana kita dapat mencapai ini tanpa melampaui pelayan?
kita perlu melaksanakan corak berikut:
Mari lihat kelas pengeluar kami:
Kaedah Workersender :: Execute () akan menerima nombor invois. Seterusnya kami membuat sambungan, saluran dan barisan seperti biasa.
Sila perhatikan bahawa kali ini, sambil membuat objek mesej, pembina menerima parameter kedua: array ('delivery_mode' => 2). Dalam kes ini kita mahu menyatakan bahawa mesej itu tidak boleh hilang jika pelayan RabbitMQ terhempas. Harap maklum bahawa agar ini berfungsi, barisan itu harus diisytiharkan tahan lama juga.
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span></span>Kod berikut boleh digunakan untuk menerima data borang dan melaksanakan pengeluar:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span></span>
sila gunakan mana -mana sanitisasi input/pengesahan yang anda rasa selesa dengan.
perkara mendapat sedikit menarik di sisi pengguna:
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span></span>
Seperti biasa, kita perlu membuat sambungan, memperoleh saluran dan mengisytiharkan barisan (parameter barisan harus sama dengan pengeluar).
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>
Untuk mempunyai tingkah laku pekerja (menghantar mesej di kalangan beberapa proses), kita perlu mengisytiharkan parameter kualiti perkhidmatan (QoS) dengan $ channel-> basic_qos ():
- saiz prefetch : tiada had tertentu, kita boleh mempunyai seberapa banyak pekerja yang kita perlukan
- Prefetch Count : Berapa banyak mesej yang akan diambil setiap pekerja sebelum menghantar semula pengakuan. Ini akan menjadikan pekerja memproses mesej 1 pada satu masa.
- Global : null bermaksud bahawa tetapan di atas akan dikenakan kepada pengguna ini sahaja
Seterusnya, kita akan mula memakan, dengan perbezaan utama dalam parameter. Kami akan mematikan ACK automatik, kerana kami akan memberitahu pelayan RabbitMQ apabila kami selesai memproses mesej dan bersedia untuk menerima yang baru.
Sekarang, bagaimana kita menghantar ACK itu? Sila lihat kaedah WorkerReceiver :: Process () (yang diisytiharkan sebagai kaedah panggil balik apabila mesej diterima). Panggilan ke GeneratedPDF () dan SendeMail () kaedah hanyalah kaedah dummy yang akan mensimulasikan masa yang dihabiskan untuk mencapai kedua -dua tugas. Parameter $ MSG bukan sahaja mengandungi muatan yang dihantar dari pengeluar, ia juga mengandungi maklumat mengenai objek yang digunakan oleh pengeluar. Kami boleh mengekstrak maklumat mengenai saluran yang digunakan oleh pengeluar dengan $ msg-> delivery_info ['channel'] (yang merupakan jenis objek yang sama ke saluran yang kami buka untuk pengguna dengan $ sambungan-> saluran ();). Oleh kerana kita perlu menghantar saluran pengeluar pengakuan bahawa kita telah menyelesaikan proses itu, kita akan menggunakan kaedah Basic_ack (), menghantar sebagai parameter tag penghantaran ($ msg-> delivery_info ['delivery_tag']) rabbitmq yang dihasilkan secara automatik mengikut urutan yang teratur dalam keadaan teratur secara automatik dihasilkan mengikut urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam urutan yang teratur dalam rangka. untuk mengaitkan dengan betul ke mana mesej ACK dimiliki.
bagaimana kita membakar pekerja? Cukup buat fail seperti yang berikut, memohon Kaedah WorkerReceiver :: Dengar ():
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span></span>
Sekarang gunakan arahan PHP (mis. PHP Worker.php atau mana -mana nama yang anda berikan kepada fail di atas) untuk membakar pekerja. Tetapi tunggu, tujuannya adalah untuk mempunyai dua atau lebih pekerja, bukan? Tidak ada masalah, membakar lebih banyak pekerja dengan cara yang sama untuk mempunyai pelbagai proses fail yang sama, dan RabbitMQ akan mendaftarkan pengguna dan mengedarkan kerja di antara mereka mengikut parameter QoS.
Contoh 2: Hantar permintaan RPC dan mengharapkan balasan
Setakat ini, kami telah menghantar mesej ke pelayan RabbitMQ tanpa pengguna perlu menunggu jawapan. Ini adalah OK untuk proses asynchronous yang mungkin mengambil lebih banyak masa daripada pengguna bersedia untuk menghabiskan hanya untuk melihat mesej 'OK'. Tetapi bagaimana jika kita benar -benar memerlukan jawapan? Katakan beberapa hasil daripada pengiraan yang kompleks, jadi kita dapat menunjukkannya kepada pengguna?
katakan kami mempunyai pelayan login berpusat (tanda tunggal) yang akan berfungsi sebagai mekanisme pengesahan yang diasingkan dari seluruh aplikasi kami. Satu -satunya cara untuk mencapai pelayan ini adalah melalui RabbitMQ. Kami perlu melaksanakan cara untuk menghantar kelayakan log masuk ke pelayan ini dan menunggu geran/menafikan respons akses.
kita perlu melaksanakan corak berikut:
Seperti biasa, mari kita lihat pengeluar terlebih dahulu:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span></span>
Melihat kaedah RPCSender :: Execute, sila sedar bahawa parameter kelayakan $ adalah array dalam bentuk ['nama pengguna' => 'x', 'kata laluan' => 'y']. Sekali lagi, kami membuka sambungan baru dan membuat saluran seperti biasa.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span></span>
Perbezaan pertama datang dari mengisytiharkan barisan. Notis pertama bahawa kami menggunakan senarai () membina untuk menangkap hasil dari $ saluran-> queue_declare (). Ini kerana kita tidak secara jelas menghantar nama giliran sambil mengisytiharkannya supaya kita perlu mengetahui bagaimana barisan ini dikenalpasti. Kami hanya berminat dengan elemen pertama array hasil, yang akan menjadi pengenal unik barisan (sesuatu seperti amq.gen-_u0kjvm8helfzqk9p0z9gg). Perubahan kedua ialah kita perlu mengisytiharkan barisan ini sebagai eksklusif, jadi tidak ada campuran dalam hasil dari proses serentak lain.
Satu lagi perubahan besar ialah pengeluar akan menjadi pengguna giliran juga, apabila melaksanakan $ channel-> basic_consume () sila perhatikan bahawa kami menyediakan nilai $ callback_queue yang kami dapat semasa mengisytiharkan barisan. Dan seperti setiap pengguna, kami akan mengisytiharkan panggilan balik untuk dilaksanakan apabila proses menerima respons.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>
Seterusnya, kita perlu membuat ID korelasi untuk mesej, ini tidak lebih daripada pengecam unik untuk setiap mesej. Contohnya, kami menggunakan output Uniqid (), tetapi anda boleh menggunakan mana-mana mekanisme yang anda suka (selagi ia tidak mewujudkan keadaan perlumbaan, tidak perlu menjadi RNG yang kuat, crypto-selamat).
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerReceiver </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Process incoming request to generate pdf invoices and send them through </span></span><span><span> * email. </span></span><span><span> */ </span></span><span> <span>public function listen() </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>false, #passive </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false #auto delete - the queue is deleted when all consumers have finished using it </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * don't dispatch a new message to a worker until it has processed and </span></span><span><span> * acknowledged the previous one. Instead, it will dispatch it to the </span></span><span><span> * next worker that is not still busy. </span></span><span><span> */ </span></span><span> <span>$channel->basic_qos( </span></span><span> <span>null, #prefetch size - prefetch window size in octets, null meaning "no specific limit" </span></span><span> <span>1, #prefetch count - prefetch window in terms of whole messages </span></span><span> <span>null #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel </span></span><span> <span>); </span></span><span> </span><span> <span>/** </span></span><span><span> * indicate interest in consuming messages from a particular queue. When they do </span></span><span><span> * so, we say that they register a consumer or, simply put, subscribe to a queue. </span></span><span><span> * Each consumer (subscription) has an identifier called a consumer tag </span></span><span><span> */ </span></span><span> <span>$channel->basic_consume( </span></span><span> <span>'invoice_queue', #queue </span></span><span> <span>'', #consumer tag - Identifier for the consumer, valid within the current channel. just string </span></span><span> <span>false, #no local - TRUE: the server will not send messages to the connection that published them </span></span><span> <span>false, #no ack, false - acks turned on, true - off. send a proper acknowledgment from the worker, once we're done with a task </span></span><span> <span>false, #exclusive - queues may only be accessed by the current connection </span></span><span> <span>false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method </span></span><span> <span>array($this, 'process') #callback </span></span><span> <span>); </span></span><span> </span><span> <span>while(count($channel->callbacks)) { </span></span><span> <span>$this->log->addInfo('Waiting for incoming messages'); </span></span><span> <span>$channel->wait(); </span></span><span> <span>} </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * process received request </span></span><span><span> * </span></span><span><span> * <span>@param AMQPMessage $msg </span></span></span><span><span> */ </span></span><span> <span>public function process(AMQPMessage $msg) </span></span><span> <span>{ </span></span><span> <span>$this->generatePdf()->sendEmail(); </span></span><span> </span><span> <span>/** </span></span><span><span> * If a consumer dies without sending an acknowledgement the AMQP broker </span></span><span><span> * will redeliver it to another consumer or, if none are available at the </span></span><span><span> * time, the broker will wait until at least one consumer is registered </span></span><span><span> * for the same queue before attempting redelivery </span></span><span><span> */ </span></span><span> <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Generates invoice's pdf </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function generatePdf() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking a pdf generation processing time. This will take between </span></span><span><span> * 2 and 5 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(2, 5)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends email </span></span><span><span> * </span></span><span><span> * <span>@return WorkerReceiver </span></span></span><span><span> */ </span></span><span> <span>private function sendEmail() </span></span><span> <span>{ </span></span><span> <span>/** </span></span><span><span> * Mocking email sending time. This will take between 1 and 3 seconds </span></span><span><span> */ </span></span><span> <span>sleep(mt_rand(1,3)); </span></span><span> <span>return $this; </span></span><span> <span>} </span></span><span><span>}</span></span></span>Sekarang mari kita buat mesej, yang mempunyai perubahan penting berbanding dengan apa yang kita gunakan dalam 2 contoh pertama. Selain daripada memberikan rentetan yang dikodkan JSON yang mengandungi kelayakan yang kami ingin mengesahkan, kami perlu menyediakan kepada pembina amqpmessage array dengan dua sifat yang ditakrifkan:
- correlation_id : tag untuk mesej
- Reply_to : Pengenal pasti barisan yang dihasilkan semasa mengisytiharkannya
Selepas menerbitkan mesej, kami akan menilai respons, yang akan kosong pada mulanya. Walaupun nilai tindak balas kekal kosong, kami akan menunggu respons dari saluran dengan $ saluran-> tunggu ();.
Sebaik sahaja kami menerima respons dari saluran, kaedah panggil balik akan dipanggil (RPCSender :: onResponse ()). Kaedah ini akan sepadan dengan ID korelasi yang diterima terhadap yang dihasilkan, dan jika ia sama, akan menetapkan badan tindak balas, dengan itu memecahkan gelung sementara.
bagaimana dengan pengguna RPC? Di sini ia adalah:
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span></span>
Sambungan lama dan penciptaan saluran yang sama :)
Sama seperti mengisytiharkan barisan, namun barisan ini akan mempunyai nama yang telah ditetapkan (' rpc_queue '). Kami akan menentukan parameter QoS kerana kami akan menyahaktifkan ACK automatik, jadi kami dapat memberitahu apabila kami selesai mengesahkan kelayakan dan mempunyai hasil.
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span></span>Keajaiban datang dari dalam panggilan balik yang diisytiharkan. Sebaik sahaja kita selesai mengesahkan kelayakan (ya, saya tahu proses itu dilakukan terhadap nilai nama pengguna/kata laluan statik, tutorial ini bukan tentang cara mengesahkan kelayakan;)), kita perlu membuat mesej kembali dengan id korelasi yang sama dicipta. Kami boleh mengekstrak ini dari mesej permintaan dengan $ req-> get ('correlation_id'), lulus nilai ini dengan cara yang sama yang kami lakukan dalam pengeluar.
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>Sekarang kita perlu menerbitkan mesej ini ke barisan yang sama yang dicipta dalam pengeluar (yang mempunyai nama 'rawak'). Kami mengekstrak nama barisan dengan $ req-> get ('Reply_to') dan menggunakannya sebagai kunci penghalaan dalam asas_publish ().
Sebaik sahaja kami menerbitkan mesej itu, kami perlu menghantar notis ACK ke saluran dengan $ req-> delivery_info ['channel']-> basic_ack (), menggunakan tag penghantaran dalam $ req-> delivery_info ['delivery_tag' ] jadi pengeluar boleh berhenti menunggu.
Sekali lagi, api proses mendengar dan anda sudah bersedia untuk pergi. Anda juga boleh menggabungkan Contoh 2 dan 3 untuk mempunyai proses RPC multi-pekerja untuk melaksanakan permintaan pengesahan daripada yang boleh diperkecil hanya dengan menembak beberapa pekerja.
Terdapat lebih banyak lagi yang boleh dikatakan mengenai RabbitMQ dan AMQP, seperti tuan rumah maya, jenis pertukaran, pentadbiran pelayan, dan lain -lain ... anda boleh mencari lebih banyak corak aplikasi (seperti penghalaan, topik) di sini dan di halaman dokumentasi. Terdapat juga alat baris arahan untuk menguruskan RabbitMQ, serta antara muka berasaskan web.
Jika anda menyukai siri tutorial ini dan ingin melihat lebih lanjut mengenai MQ dan kes penggunaan dunia yang lebih nyata, sila beritahu kami di komen di bawah!
Soalan Lazim (Soalan Lazim) Mengenai PHP Rabbitmq Contoh Lanjutan
Apakah peranan Rabbitmq dalam PHP? Ia memainkan peranan penting dalam aplikasi PHP dengan membolehkan mereka mengendalikan beban tinggi dan tugas yang kompleks dengan lebih cekap. RabbitMQ menggunakan Protokol Beratur Mesej Lanjutan (AMQP) untuk memudahkan pertukaran mesej antara bahagian -bahagian yang berlainan aplikasi. Ini membolehkan proses decoupling, menjadikan aplikasi lebih berskala dan berdaya tahan. mesin anda. Ini boleh dilakukan melalui laman web RabbitMQ rasmi. Selepas pelayan dipasang, anda boleh memasang pelanjutan PHP AMQP, yang menyediakan fungsi yang diperlukan untuk berinteraksi dengan RabbitMQ. Ini boleh dilakukan dengan menggunakan pemasang pecl dengan perintah PECL memasang amqp.
daripada kelas amqpchannel. Kaedah ini mengambil beberapa parameter, termasuk nama pertukaran, jenis pertukaran (langsung, topik, fanout, atau tajuk), dan parameter pilihan seperti pasif, tahan lama, auto_delete, dan argumen. Adakah saya menghantar mesej ke barisan rabbitmq dalam php?
Kelas amqpmessage dengan kandungan mesej. Kemudian, anda boleh menggunakan kaedah Basic_publish kelas amqpChannel untuk menghantar mesej ke barisan. Kaedah Basic_publish mengambil mesej, pertukaran, dan kunci penghalaan sebagai parameter. Giliran Rabbitmq menggunakan kaedah Basic_consume kelas AmqpChannel. Kaedah ini mengambil beberapa parameter, termasuk nama giliran, tag pengguna, no_local, no_ack, eksklusif, dan fungsi panggilan balik yang akan dilaksanakan apabila mesej diterima. ?
Bagaimanakah saya boleh melaksanakan beratur keutamaan dalam RabbitMQ dengan PHP? Kemudian, apabila menghantar mesej, anda boleh menetapkan sifat keutamaan kelas AMQPMessage kepada nilai antara 0 dan keutamaan maksimum yang anda tentukan. > RabbitMQ boleh digunakan untuk Panggilan Prosedur Jauh (RPC) dalam PHP dengan menghantar mesej dengan balasan-ke harta yang ditetapkan ke giliran panggilan balik. Pelayan kemudian boleh menghantar respons kepada giliran panggil balik, dan pelanggan boleh mengambil respons dari sana. Plugin Pengurusan RabbitMQ, yang menyediakan antara muka berasaskan web untuk memantau dan menguruskan pelayan RabbitMQ anda. Anda juga boleh menggunakan kaedah kelas AmqpChannel untuk mendapatkan maklumat mengenai keadaan saluran, seperti bilangan mesej yang tidak diketahui.
Atas ialah kandungan terperinci PHP dan Rabbitmq: Contoh Lanjutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Untuk melindungi permohonan dari serangan XSS yang berkaitan dengan sesi, langkah-langkah berikut diperlukan: 1. Tetapkan bendera httponly dan selamat untuk melindungi kuki sesi. 2. Kod eksport untuk semua input pengguna. 3. Melaksanakan Dasar Keselamatan Kandungan (CSP) untuk mengehadkan sumber skrip. Melalui dasar-dasar ini, serangan XSS yang berkaitan dengan sesi dapat dilindungi dengan berkesan dan data pengguna dapat dipastikan.

Kaedah untuk mengoptimumkan prestasi sesi PHP termasuk: 1. Mula sesi kelewatan, 2. Gunakan pangkalan data untuk menyimpan sesi, 3. Data sesi kompres, 4. Mengurus kitaran hayat sesi, dan 5. Melaksanakan perkongsian sesi. Strategi ini dapat meningkatkan kecekapan aplikasi dalam persekitaran konkurensi yang tinggi.

Thesession.gc_maxlifetimesettinginphpdeterminesthelifespanofsessiondata, setInseconds.1) it'sconfiguredinphp.iniorviaini_set (). 2) abalanceisneededtoavoidperformanceissuesandunexpectedlogouts.3) php'sgarbageCollectionisprobabilistic, influedbygc_probabi

Dalam PHP, anda boleh menggunakan fungsi session_name () untuk mengkonfigurasi nama sesi. Langkah -langkah tertentu adalah seperti berikut: 1. Gunakan fungsi session_name () untuk menetapkan nama sesi, seperti session_name ("my_session"). 2. Selepas menetapkan nama sesi, hubungi session_start () untuk memulakan sesi. Mengkonfigurasi nama sesi boleh mengelakkan konflik data sesi antara pelbagai aplikasi dan meningkatkan keselamatan, tetapi memberi perhatian kepada keunikan, keselamatan, panjang dan penetapan masa sesi.

ID sesi hendaklah dijadikan semula secara teratur pada log masuk, sebelum operasi sensitif, dan setiap 30 minit. 1. Meningkatkan semula ID Sesi semasa log masuk untuk mengelakkan serangan tetap sesi. 2. Regenerate sebelum operasi sensitif untuk meningkatkan keselamatan. 3. Penjanaan semula secara berkala mengurangkan risiko penggunaan jangka panjang, tetapi pengalaman pengguna perlu ditimbang.

Menetapkan Parameter Cookie Sesi di PHP boleh dicapai melalui fungsi session_set_cookie_params (). 1) Gunakan fungsi ini untuk menetapkan parameter, seperti masa tamat, laluan, nama domain, bendera keselamatan, dan lain -lain; 2) hubungi session_start () untuk membuat parameter berkuatkuasa; 3) menyesuaikan parameter secara dinamik mengikut keperluan, seperti status log masuk pengguna; 4) Perhatikan untuk menetapkan bendera selamat dan httponly untuk meningkatkan keselamatan.

Tujuan utama menggunakan sesi dalam PHP adalah untuk mengekalkan status pengguna antara halaman yang berbeza. 1) Sesi dimulakan melalui fungsi session_start (), mewujudkan ID sesi yang unik dan menyimpannya dalam cookie pengguna. 2) Data sesi disimpan di pelayan, yang membolehkan data diluluskan antara permintaan yang berbeza, seperti status log masuk dan kandungan keranjang belanja.

Bagaimana untuk berkongsi sesi antara subdomain? Dilaksanakan dengan menetapkan kuki sesi untuk nama domain biasa. 1. Tetapkan domain cookie sesi ke .example.com di sebelah pelayan. 2. Pilih kaedah penyimpanan sesi yang sesuai, seperti memori, pangkalan data atau cache yang diedarkan. 3. Lulus ID Sesi melalui kuki, dan pelayan mengambil semula dan mengemas kini data sesi berdasarkan ID.


Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

Video Face Swap
Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Artikel Panas

Alat panas

SublimeText3 Linux versi baharu
SublimeText3 Linux versi terkini

VSCode Windows 64-bit Muat Turun
Editor IDE percuma dan berkuasa yang dilancarkan oleh Microsoft

MinGW - GNU Minimalis untuk Windows
Projek ini dalam proses untuk dipindahkan ke osdn.net/projects/mingw, anda boleh terus mengikuti kami di sana. MinGW: Port Windows asli bagi GNU Compiler Collection (GCC), perpustakaan import yang boleh diedarkan secara bebas dan fail pengepala untuk membina aplikasi Windows asli termasuk sambungan kepada masa jalan MSVC untuk menyokong fungsi C99. Semua perisian MinGW boleh dijalankan pada platform Windows 64-bit.

Dreamweaver Mac版
Alat pembangunan web visual

DVWA
Damn Vulnerable Web App (DVWA) ialah aplikasi web PHP/MySQL yang sangat terdedah. Matlamat utamanya adalah untuk menjadi bantuan bagi profesional keselamatan untuk menguji kemahiran dan alatan mereka dalam persekitaran undang-undang, untuk membantu pembangun web lebih memahami proses mengamankan aplikasi web, dan untuk membantu guru/pelajar mengajar/belajar dalam persekitaran bilik darjah Aplikasi web keselamatan. Matlamat DVWA adalah untuk mempraktikkan beberapa kelemahan web yang paling biasa melalui antara muka yang mudah dan mudah, dengan pelbagai tahap kesukaran. Sila ambil perhatian bahawa perisian ini