ホームページ >バックエンド開発 >PHPチュートリアル >译-PHP RabbitMQ チュートリアル-6

译-PHP RabbitMQ チュートリアル-6

WBOY
WBOYオリジナル
2016-06-23 13:59:221273ブラウズ

リモート プロシージャ コール (RPC)

(php-amqplib を使用)


2 番目のチュートリアルでは、時間のかかるタスクを複数のワーカーに分散するためにワーク キューを使用する方法を学びました。このチュートリアルでは、ワーク キューを使用して時間のかかるタスクを複数のワーカーに分散する方法を学習しました。

しかし、リモート コンピューター上で関数を実行して結果を待つ必要がある場合はどうなるでしょうか? それは別の話です。このパターンは一般にリモート プロシージャ コールまたは RPC として知られています。リモートコンピュータ上で関数を実行し、結果を待ちますか? 関数を実行し、その結果が返されるのを待ちますか?まあ、それは2つの異なることです。このパターンは、リモート プロシージャ コールまたは RPC と呼ばれることがよくあります。

このチュートリアルでは、RabbitMQ を使用して RPC システム (クライアントとスケーラブルな RPC サーバー) を構築します。分散する価値のある時間のかかるタスクがないため、ダミーを作成します。フィボナッチ数を返す RPC サービス

今回は、RabbitMQ を使用して RPC システム (クライアントと拡張可能な RPC サーバー) を構築します。分散するほど時間のかかるタスクはないので、フィボナッチ数列を返す疑似 RPC サービスを作成します。

クライアント インターフェイス (クライアント インターフェイス)

RPC サービスの使用方法を説明するために、RPC リクエストを送信し、応答を受信するまでブロックする call という名前のメソッドを公開します。 :

RPC サービスがどのように使用されるかを示すために、単純なクライアント クラスを作成します。このクラスは、RPC 要求を送信し、応答を受信するまでブロックするメソッド呼び出しを公開します。

$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";

RPC に関するメモ

RPC はコンピューティングでは非常に一般的なパターンですが、プログラマーが関数呼び出しがローカルであるか、それとも遅い RPC であるかを認識していない場合に問題が発生します。 RPC を誤って使用すると、システムが予測不能になり、デバッグが不必要に複雑になります。RPC を誤って使用すると、保守不能なスパゲッティ コードが生成される可能性があります。

RPC はコンピューター コンピューティングでは一般的ですが、非常に扱いにくいものです。この問題は、プログラマがネイティブ メソッドを呼び出すべきか、それとも遅い RPC を呼び出すべきかわからないために発生します。このような混乱により、システムが予測不能になり、デバッグが不必要に複雑になります。 RPC を誤用すると、ソフトウェアが簡素化されるのではなく、保守不能で乱雑なコードが作成される可能性があります。

それを念頭に置いて、次のアドバイスを検討してください:

どの関数呼び出しがローカルでどれがリモートであるかが明らかであることを確認してください。

どの関数呼び出しがローカルでどれがリモートであるかが明らかであることを確認してください。 1 つはリモートです。

システムを文書化します。コンポーネント間の依存関係を明確にします。

  • システムを文書化します。コンポーネント間の依存関係を明確にします。
  • エラーケースを処理します。RPC サーバーが長時間ダウンした場合、クライアントはどのように反応すべきですか?
  • エラー処理。 RPC サービスが長時間閉じられた場合、クライアントはどのように反応すればよいでしょうか?
  • RPC を避けるべき理由は、可能であれば、非同期パイプラインを使用することです。RPC のようなブロックではなく、結果が非​​同期で次の計算ステージにプッシュされます。可能であれば、RPC の代わりに非同期パイプを使用する必要があります。ブロックと同様に、結果は次の計算ステージに非同期でプッシュされます。
  • コールバック キュー
  • 一般に、RabbitMQ 上で RPC を実行するのは簡単です。クライアントがリクエスト メッセージを送信し、サーバーが応答メッセージで応答するために、リクエストとともに「コールバック」キュー アドレスを送信する必要があります。デフォルトのキューを使用できます。試してみましょう:
  • 一般的に言えば、RabbitMQ で RPC を実行するのは簡単です。クライアントはリクエスト メッセージを送信し、サーバーはレスポンス メッセージで応答します。応答 (メッセージ) を受信するには、コールバック キュー アドレスを指定して要求を送信する必要があります。デフォルトのキューを試すことができます:

    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage(    $payload,    array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');# ... then code to read a response message from the callback_queue ...

    メッセージ プロパティ

    AMQP プロトコルは、メッセージに付随する 14 のプロパティのセットを事前に定義します。次の例外を除き、ほとんどのプロパティはめったに使用されません。プロトコルは 14 のメッセージ属性を事前定義します。以下を除いて、ほとんどのプロパティはめったに使用されません:

  • delivery_mode: メッセージを永続的 (値 2) または一時的 (1) としてマークします。このプロパティは 2 番目のチュートリアルで覚えているかもしれません。
  • delivery_mode: 永続的な場合は 2、一時的な場合は 1 に設定します。 2 番目のチュートリアルでこれを使用したことを覚えているかもしれません。
  • content_type: エンコーディングの MIME タイプを記述するために使用されます。たとえば、よく使用される JSON エンコーディングの場合、このプロパティを次のように設定することをお勧めします。
  • content_type: エンコーディングの MIME タイプを記述するために使用されます。エンコーディング。たとえば、一般的に使用される JSON エンコーディングの場合、この属性を次のように設定することをお勧めします。 application/json
  • reply_to: コールバック キューに名前を付けるために一般的に使用されます。
  • reply_to: コールバック キューに名前を付けるために一般的に使用されます。
  • correlation_id: RPC 応答をリクエストと相関させるのに役立ちます。
  • correlation_id: RPC リクエストと応答を相関させるのに役立ちます。
  • 相関 ID (相関 ID)

    上記の方法では、RPC リクエストごとにコールバック キューを作成することをお勧めします。これは非常に非効率ですが、幸いなことに、より良い方法があります。上記のクライアントごとに 1 つのコールバック キューを作成しましょう。ここで説明する方法では、RPC リクエストごとにコールバック キューを作成します。これは非常に非効率的ですが、幸いなことにもっと良い方法があります。クライアントごとにコールバック キューを 1 つだけ作成できます。

    これにより、そのキューで応答を受信したときに、その応答がどのリクエストに属しているかが不明になるという新たな問題が発生します。後で、このプロパティをすべてのリクエストに対して一意の値に設定します。コールバック キューでメッセージを受信すると、このプロパティを確認し、それに基づいて応答とリクエストを照合することができます。不明な correlation_id 値が見つかった場合は、メッセージを安全に破棄できます。は私たちのリクエストに属しません

    しかし、これにより、キューが応答を受信したときに、それがどのリクエストに属しているかがわかりません。ここで、correlation_id が役に立ちます。リクエストごとに一意の correlation_id を設定します。後で、(応答) メッセージがコールバック キューで受信されると、このプロパティが確認され、それに基づいて要求と応答を照合できます。不明な correlation_id が検出された場合、メッセージはリクエストに属していないため、メッセージを安全に破棄できます。

    なぜエラーで失敗するのではなく、コールバック キュー内の不明なメッセージを無視する必要があるのか​​と疑問に思われるかもしれませんが、これはサーバー側で競合状態が発生する可能性があるためです。ただし、RPC サーバーが停止する可能性があります。応答を送信した直後であり、要求に対する確認メッセージを送信する前に、再起動された RPC サーバーが要求を再度処理することになるため、クライアントでは重複した応答を適切に処理する必要があり、RPC は理想的には次のように処理する必要があります。 idempotent.

    なぜ処理が失敗してエラーを返すように設定せずに、コールバック キュー内の不明なメッセージを無視する必要があるのか​​と疑問に思われるかもしれません。サーバー側の障害の可能性があるためです。可能性は非常に低いですが、応答を送信した後、メッセージが確認される前に RPC サービスがクラッシュする可能性があります。その場合、再起動された RPC サービスがリクエストを再度処理します。 このため、クライアントは重複した応答を適切に処理する必要があり、RPC サービスは冪等性が最も優れています。

    概要

    私たちの RPC は次のように動作します:

    RPC 実行プロセス:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • 当客户端启动,便将创建一个匿名的专用回调队列。
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • RPC请求中,客户端消息发送带有两个属性:reply_to用来设置回调队列,和correlation_id用来唯一标示每一个请求。
  • The request is sent to an rpc_queue queue.
  • 请求被发送到一个叫做rpc_queue的队列。
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • RPC worker(也叫:服务)在rpc_queue上守护,等待请求。每当来一个请求,它会进行处理并将结果以消息形式发送到客户端,经由reply_to指定的队列。
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.
  • 客户端在回调队列中等待数据。当出现一条消息,就会检查correlation_id属性。如果和请求时的correlation_id匹配,便会将响应返回给应用程序。
  • Putting it all together(合体!!!终极~\(???)/~啦啦啦!)

    The Fibonacci task:

    斐波那契任务:

    function fib($n) {

        if ($n == 0)        return 0;    if ($n == 1)        return 1;    return fib($n-1) + fib($n-2);}

    We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).

    声明斐波那契函数。假定它只接收正整数。(别指望它能处理很大的数字,它可能是最慢的递归实现)

    The code for our RPC server rpc_server.php looks like this:

    rpc_server.php代码:

    <?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->queue_declare('rpc_queue', false, false, false, false);function fib($n) {    if ($n == 0)        return 0;    if ($n == 1)        return 1;    return fib($n-1) + fib($n-2);}echo " [x] Awaiting RPC requests\n";$callback = function($req) {    $n = intval($req->body);    echo " [.] fib(", $n, ")\n";    $msg = new AMQPMessage(        (string) fib($n),        array('correlation_id' => $req->get('correlation_id'))        );    $req->delivery_info['channel']->basic_publish(        $msg, '', $req->get('reply_to'));    $req->delivery_info['channel']->basic_ack(        $req->delivery_info['delivery_tag']);};$channel->basic_qos(null, 1, null);$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);while(count($channel->callbacks)) {    $channel->wait();}$channel->close();$connection->close();?>

    The server code is rather straightforward:

    服务端代码相当直白:

  • As usual we start by establishing the connection, channel and declaring the queue.
  • 一如既往先建立连接,频道,然后声明队列。
  • We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetch_count setting in $channel.basic_qos.
  • 没准我们想运行多个服务端进程。为了能在多个服务之间均等负载,我们得在$channel.basic_qos中设置prefech_count。
  • We use basic_consume to access the queue. Then we enter the while loop in which we wait for request messages, do the work and send the response back.
  • 使用basic_consume访问队列。然后进入while循环等待请求消息,处理消息,返回响应信息。
  • The code for our RPC client rpc_client.php:

    rpc_client.php代码:

    connection = new AMQPConnection(            'localhost', 5672, 'guest', 'guest');        $this->channel = $this->connection->channel();        list($this->callback_queue, ,) = $this->channel->queue_declare(            "", false, false, true, false);        $this->channel->basic_consume(            $this->callback_queue, '', false, false, false, false,            array($this, 'on_response'));    }    public function on_response($rep) {        if($rep->get('correlation_id') == $this->corr_id) {            $this->response = $rep->body;        }    }    public function call($n) {        $this->response = null;        $this->corr_id = uniqid();        $msg = new AMQPMessage(            (string) $n,            array('correlation_id' => $this->corr_id,                  'reply_to' => $this->callback_queue)            );        $this->channel->basic_publish($msg, '', 'rpc_queue');        while(!$this->response) {            $this->channel->wait();        }        return intval($this->response);    }};$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";?>

    Now is a good time to take a look at our full example source code for rpc_client.php and rpc_server.php.

    是时候看看整个例子的源码了rpc_client.php and rpc_server.php.

    Our RPC service is now ready. We can start the server:

    RPC服务准备就绪,启动!

    $ php rpc_server.php [x] Awaiting RPC requests

    To request a fibonacci number run the client:

    运行客户端请求一个斐波那契数字:

    $ php rpc_client.php [x] Requesting fib(30)

    The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:

    这里展现的设计不是RPC服务的唯一可能实现,但它却有一些重要的优势:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.php in a new console.
  • 如果RPC服务太慢,你可以按比例增加运行其数量。试试在新控制台运行第二个rpc_server.php服务
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.
  • 在客户端,RPC要求只发送和接收一条消息。必须没有像队列声明一样的同步调用。结果呢,对于单一的RPC请求,客户端仅需要一个网络往返。
  • Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:

    我们的代码还是忒简化,并没有想解决更为复杂(但重要)的问题,像:

  • サーバーが実行されていない場合、クライアントはどのように反応すべきですか?
  • サーバーが実行されていない場合、クライアントはどのように反応すべきですか?
  • クライアントは RPC に対して何らかのタイムアウトを持つべきですか?
  • クライアントは RPC に対して何らかのタイムアウトを持つべきですか?
  • クライアントは RPC に対して何らかのタイムアウトを持つべきですか?
  • サーバーが誤動作して例外を発生させた場合、それをクライアントに転送する必要がありますか?
  • サーバーが誤動作して例外を発生させた場合、それをクライアントに転送する必要がありますか?
  • キューを表示する際に役立つ、処理前の無効な受信メッセージの保護 (境界、タイプのチェックなど)。 Rabbitmq-management プラグイン ここでは、キューを表示するための便利なプラグインをいくつか見つけることができます。
  • 声明:
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。