>
鑰匙要點- >使用PHP和RabbitMQ在多名工人中不同步處理數據,從而提高了高交易環境中的效率。
- >在兔子中實現持久消息,以防止服務器崩潰期間的數據丟失,通過將消息“ evely_mode”設置為2,並將隊列宣佈為持久。
- >使用RabbitMQ中的服務質量(QoS)設置來控制工人之間的消息分佈,確保沒有任何單個工人被不知所措。
- 通過發送需要響應的消息,可用於使用RABBITMQ進行RPC(遠程過程調用) >設置RPC響應的獨家,臨時隊列,以確保在客戶端和服務器之間正確且安全地定向消息。
- >示例1:發送請求以在幾個工人中對數據異步處理
- 在上一部分的示例中,我們有一個生產者,一個消費者。 如果消費者死亡,則消息將繼續堆積在隊列中,直到消費者再次開始。然後,它將處理所有消息。 在並發的用戶環境中,每分鐘請求相當多,這可能是不理想的。 幸運的是,擴展消費者非常容易,但讓我們實現另一個示例。
- > >假設我們有一項發票生成服務,用戶只需要提供發票號碼,系統將自動生成PDF文件並將其通過電子郵件發送給用戶。 如果生成過程運行的服務器受到資源有限,則生成和發送電子郵件可能需要幾秒鐘。 現在假設我們需要每秒支持幾筆交易,我們如何在不壓倒服務器的情況下完成此操作?
讓我們看一下我們的製作人班:
> WorkerSender :: execute()方法將收到一個發票號。 接下來,我們像往常一樣創建一個連接,頻道和隊列。
>請注意,這次,在創建消息對象時,構造函數將接收第二個參數:array('veliver_mode'=> 2)。 在這種情況下,我們要聲明,如果RabbitMQ服務器崩潰,則不會丟失消息。 請注意,為了使這項工作,隊列也必須被聲明持久。
可以使用以下代碼接收表單數據並執行生產者: 請使用您感到滿意的輸入消毒/驗證。
>
現在,我們如何發送Ack? 請查看Workerreceiver :: process()方法(接收到消息時,該方法稱為回調方法)。對生成PDF()和sendemail()方法的調用只是虛擬方法,它將模擬完成這兩個任務所花費的時間。 $ msg參數不僅包含從生產者發送的有效載荷,還包含有關生產者使用的對象的信息。 我們可以使用$ msg-> velivery_info ['channel']提取有關生產者使用的通道的信息(這是我們使用$ Connection-> Channel();)為消費者打開的通道的對像類型。 由於我們需要向生產者的頻道發送確認我們已經完成了該過程的確認,因此我們將使用其basic_ack()方法,將作為參數發送作為參數($ msg-> delivery_info ['evely_tag'])rabbitmq自動生成的rabbitmq正確地將ACK屬於哪個消息相關聯。 我們如何解僱工人?只需創建以下文件,請調用Workerreceiver :: listing()方法:
>
>另一個很大的更改是,在執行$ channel-> basic_consume()時,生產者也將成為隊列的消費者,請注意,我們在聲明隊列時提供了$ callback_queue值。 像每個消費者一樣,我們將在該過程收到響應時聲明一個回調要執行。
接下來,我們必須為消息創建一個相關ID,這無非是每個消息的唯一標識符。 在示例中,我們正在使用uniqid()的輸出,但是您可以使用自己喜歡的任何機制(只要它不創建種族條件,就不需要是強大的,加密的安全RNG)。 現在,讓我們創建一條消息,與我們在前兩個示例中使用的消息相比,它具有重要的變化。除了分配一個包含我們要身份驗證的憑證的JSON編碼字符串外,我們還必須提供給AMQPMESSAGE構造函數一個具有兩個定義的屬性的數組: 。
RPC消費者呢? 在這裡是:
’)。 我們將定義QoS參數,因為我們將停用自動ACK,因此我們可以在驗證憑據驗證並具有結果時通知。
魔術來自聲明的回調。一旦我們對憑據進行身份驗證(是的,我知道該過程是針對靜態用戶名/密碼值完成的,本教程並不是關於如何對憑據進行身份驗證;)),我們必須創建具有相同關聯ID的返回消息(生產者)創建。 我們可以使用$ req-> get('correlation_id')從請求消息中提取此值,以相同的方式傳遞此值。
>發布消息後,我們必須使用$ req-> delivery_info ['channel''] - > basic_ack()將ACK通知發送到頻道,使用$ req-> evelyse_info ['velivery_tag'' ]這樣生產者可以停止等待。 >關於RabbitMQ和AMQP的更多要說,例如虛擬主機,交換類型,服務器管理等……您可以在此處和文檔頁面上找到更多的應用程序模式(例如路由,主題)。 還有一個命令行工具來管理RabbitMQ,還有一個基於Web的接口。 如果您喜歡這個教程系列,並且想查看有關MQ和更多現實世界用例的更多信息,請在下面的評論中告訴我們! >我如何在php中從php? 我如何處理rabbitmq中的PHP中的錯誤可以使用try-catch塊在兔子中使用PHP中的兔子中的錯誤處理?發生錯誤時,PHP AMQP擴展會引發AMQPException類的異常。您可以捕獲這些異常並根據您的應用程序的需求處理它們。 >如何使用PHP? <span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span></span>
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>
>像往常一樣,我們必須創建一個連接,得出頻道並聲明隊列(隊列的參數必須與生產者相同)。 <span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>
>為了具有工人行為(幾個程序中的派遣消息),我們必須使用$ channel-> basic_qos()聲明服務質量(QoS)參數:預摘要大小
>示例2:發送RPC請求並期望回复
到目前為止,我們一直在向RabbitMQ服務器發送消息,而無需等待回复。 對於異步過程,這可能比用戶願意花費更多的時間只是為了查看“確定”消息,這是可以的。 但是,如果我們真的需要答复怎麼辦?假設一個複雜的計算結果,因此我們可以將其顯示給用戶? <span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span></span>
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>
>
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>
>一旦我們收到來自頻道的響應,將調用回調方法(rpcsender :: onResponse())。 此方法將與已接收的相關ID與生成的相關ID匹配,如果它們是相同的,則將設置響應主體,從而破壞while循環。 <span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span></span>
rpc_queue <span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span></span>
中的路由密鑰
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span></span>
再次啟動聆聽過程,您就可以開始了。 您甚至可以組合示例2和3以具有多工程師的RPC進程來執行身份驗證請求,而不是通過啟動幾名工人來縮放。
>關於PHP RabbitMQ高級示例的常見問題(常見問題解答)
>兔子在PHP中的作用是什麼?它通過使其能夠更有效地處理高負載和復雜任務,在PHP應用中起著至關重要的作用。 RabbitMQ使用高級消息排隊協議(AMQP)來促進應用程序不同部分之間的消息交換。這允許流程解耦,使應用程序更具可擴展性和彈性。你的機器。這可以通過官方的RabbitMQ網站完成。安裝服務器後,您可以安裝PHP AMQP擴展名,該擴展提供了與RabbitMQ交互的必要功能。可以使用命令pecl install amqp。 AMQPCHANNEL課程。此方法採用多個參數,包括交換的名稱,交換的類型(直接,主題,粉絲或標題)以及可選參數,例如被動,耐用,auto_delete和grignestes。我是否在php中的兔子隊列中發送消息?帶有消息內容的AMQPMESSAGE類。然後,您可以使用AMQPCHANNEL類的Basic_Publish方法將消息發送到隊列。 BASIC_PUBLISH方法將消息,交換和路由密鑰作為參數。
>>
>如何確保使用php的兔子中的消息持久性? AMQPMESSAGE類的velivery_mode屬性到2。這將使RabbitMQ將消息存儲在磁盤上,即使RabbitMQ服務器崩潰或重新啟動。
>如何通過php?
在兔子中的優先級隊列在PHP中實現優先級排隊,可以通過在聲明隊列時設置X-Max-Prifority參數來實現。然後,在發送消息時,您可以將AMQPMESSAGE類的優先屬性設置為0和您指定的最大優先級之間的值。 我如何在PHP中使用RabbitMQ作為RPC? > RABBITMQ可以通過向回調隊列發送帶有回复屬性設置的消息,用於PHP中的遠程過程調用(RPC)。然後,服務器可以將響應發送到回調隊列,並且客戶端可以從那裡消耗響應。
>>監視PHP中的RabbitMQ中的RabbitMQ。 RabbitMQ管理插件,該插件提供了一個基於Web的接口,用於監視和管理RabbitMQ服務器。您還可以使用AMQPCHANNEL類的方法來獲取有關頻道狀態的信息,例如未經確定的消息的數量。
>
以上是PHP和RabbitMQ:高級示例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Laravel使用其直觀的閃存方法簡化了處理臨時會話數據。這非常適合在您的應用程序中顯示簡短的消息,警報或通知。 默認情況下,數據僅針對後續請求: $請求 -

PHP客戶端URL(curl)擴展是開發人員的強大工具,可以與遠程服務器和REST API無縫交互。通過利用Libcurl(備受尊敬的多協議文件傳輸庫),PHP curl促進了有效的執行

Laravel 提供简洁的 HTTP 响应模拟语法,简化了 HTTP 交互测试。这种方法显著减少了代码冗余,同时使您的测试模拟更直观。 基本实现提供了多种响应类型快捷方式: use Illuminate\Support\Facades\Http; Http::fake([ 'google.com' => 'Hello World', 'github.com' => ['foo' => 'bar'], 'forge.laravel.com' =>

您是否想為客戶最緊迫的問題提供實時的即時解決方案? 實時聊天使您可以與客戶進行實時對話,並立即解決他們的問題。它允許您為您的自定義提供更快的服務

PHP日誌記錄對於監視和調試Web應用程序以及捕獲關鍵事件,錯誤和運行時行為至關重要。它為系統性能提供了寶貴的見解,有助於識別問題並支持更快的故障排除

文章討論了PHP 5.3中介紹的PHP中的晚期靜態結合(LSB),允許靜態方法的運行時間分辨率調用以更靈活的繼承。 LSB的實用應用和潛在的觸摸


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

ZendStudio 13.5.1 Mac
強大的PHP整合開發環境

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器

DVWA
Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),