ホームページ >バックエンド開発 >PHPチュートリアル >swoole_processのプロセス間通信
この 6 か月間、私は主に swoole をいじることに集中してきました。会社のプロジェクトでは、いくつかの swoole サービス ミドルウェアもリリースされました。間もなく発売されます。 swoole_server、swoole_processなどがよく使われるので、ここでゆっくりまとめてみましょう。
swoole_process ドキュメントからわかるように、swoole_process プロセスは 3 つの通信メソッドをサポートしています:
次に、各通信の原理と実装について詳しく紹介します。
これはパイプラインのドキュメントに記載されている内容です
int swoole_process::__construct(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true )
* $redirect_stdin_stdout、子プロセスの標準入出力をリダイレクトします。 このオプションを有効にすると、インプロセス エコーは画面に出力されず、パイプに書き込まれます。キーボード入力の読み取りは、パイプからのデータの読み取りになります。 デフォルトでは読み取りをブロックします。 * $create_pipe、パイプを作成するかどうか。$redirect_stdin_stdout が有効になった後、このオプションはユーザー パラメーターを無視し、子プロセスにプロセス間通信がない場合は false に設定できます。
* $process オブジェクトが存在します。パイプが破棄されると自動的に閉じられます。パイプが子プロセスでリッスンされている場合、CLOSE イベントが受信されます。
* バージョン 1.7.22以降では、パイプのタイプを設定できます。デフォルトは SOCK_STREAM ストリーミング
* パラメーター $create_pipe が 2 の場合、パイプ タイプは SOCK_DGRAM
int swoole_process->write(string $data)
* swoole の基本的な使用方法 Unix ソケットは、IO を消費せずにカーネルによって実装されるフルメモリ通信です。 1プロセス書き込み、1プロセス読み取り、1024バイトのデータを毎回読み書きするテストでは、100万回の通信でわずか1.02秒しかかかりません。 ※パイプライン通信のデフォルトの方式はストリーミングであり、writeで書き込まれたデータがreadで最下層にマージされる場合があります。 swoole_process コンストラクターの 3 番目のパラメーターを 2 に設定して、データグラム形式に変更できます。
上記のドキュメントの抜粋からわかるように、パイプには SOCK_STREAM
と SOCK_DGRAM の 2 種類があり、これら 2 つはタイプとして確立されています。ソケットを使用するときに必要なパラメータ。パイプ演算子「|」は最も一般的な Linux コマンドで使用され、標準の Unix プログラミングでは、パイプの作成は関数 int Pipe(int filedes[2]) によって作成される匿名パイプでもあります。 ; または int mkfifo(const char *pathname, mode_t mode) によって名前付きパイプが作成されます。これらの 2 つのメソッドで作成されたパイプには SOCK_* パラメーターがありません。 以下は、パイプを作成するために swoole 拡張機能から取得したコードです。
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE){ RETURN_FALSE;} ...... if (pipe_type > 0){ swPipe *_pipe = emalloc(sizeof(swWorker)); int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM; if (swPipeUnsock_create(_pipe, 1, socket_type) < 0) { RETURN_FALSE; } process->pipe_object = _pipe; process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER); process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); process->pipe = process->pipe_master; zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_masterTSRMLS_CC);}上記のコードからわかるように、ここでのパイプは関数 swPipeUnsock_create
によって作成されたもので、以下はこの関数の完全な実装です
int swPipeUnsock_create(swPipe *p, int blocking, int protocol){ int ret; swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock)); if (object == NULL) { swWarn("malloc() failed."); return SW_ERR; } p->blocking = blocking; ret = socketpair(AF_UNIX, protocol, 0, object->socks); if (ret < 0) { swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno); return SW_ERR; } else { //Nonblock if (blocking == 0) { swSetNonBlock(object->socks[0]); swSetNonBlock(object->socks[1]); } int sbsize = SwooleG.socket_buffer_size; swSocket_set_buffer_size(object->socks[0], sbsize); swSocket_set_buffer_size(object->socks[1], sbsize); p->object = object; p->read = swPipeUnsock_read; p->write = swPipeUnsock_write; p->getFd = swPipeUnsock_getFd; p->close = swPipeUnsock_close; } return 0;}上記のコードから、これがfunction socketpair
接続された (UNIX ファミリの) 名前のないソケットのペアが作成されます。 Linux では、このソケットのペアはパイプによって返されるファイル記述子として使用できます。唯一の違いは、ファイル記述子のいずれか 1 つが読み取りおよび書き込み可能であることです。したがって、メインプロセスと子プロセス間の通信は完全にソケット通信になります。
関連情報:
Linux での双方向プロセス間通信パイプラインの実装
ソケットペア2. IPC msgqueue
* $msgkey は、デフォルトでは、ftok(FILE)* $mode です。デフォルトは 2 で、競合モードを示します。作成されたすべてのサブプロセスはキューからデータを取得します
* モード 2 を使用した後は、作成されたサブプロセスは個別に通信できなくなります。特定のサブプロセスに送信するなど。
* $process オブジェクトは start を実行しませんが、push/pop を実行してデータをキューにプッシュ/抽出することもできます
* メッセージ キューの通信メソッドとパイプラインは公開されていません。メッセージ キューは EventLoop をサポートしません。メッセージ キューを使用した後は、同期ブロック モードのみを使用できます。
上記はドキュメントから抜粋した説明の一部です。または、ソース コードを参照して実装方法を確認してください。
static PHP_METHOD(swoole_process, useQueue){ long msgkey = 0; long mode = 2; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE) { RETURN_FALSE; } swWorker *process = swoole_get_object(getThis()); if (msgkey <= 0) {#if PHP_MAJOR_VERSION == 7 msgkey = ftok(execute_data->func->op_array.filename->val, 0);#else msgkey = ftok(EG(active_op_array)->filename, 0);#endif } swMsgQueue *queue = emalloc(sizeof(swMsgQueue)); if (swMsgQueue_create(queue, 1, msgkey, 0) < 0) { RETURN_FALSE; } queue->delete = 0; process->queue = queue; process->ipc_mode = mode; RETURN_TRUE;}ソース コードからわかるように、$msgkey のデフォルト値は 0 です。値が 0 以下の場合、デフォルト値
ftok(FILE)
が使用されます。モードはここでは直接使用されませんが、プロセス オブジェクトの ipc_mode 属性に割り当てられます。 ただし、ここでは
swMsgQueue_createを使用してキューが作成されています。 これがこの関数のプロトタイプです
int swMsgQueue_create(swMsgQueue *q, int blocking, key_tmsg_key, long type){ int msg_id; if (blocking == 0) { q->ipc_wait = IPC_NOWAIT; } else { q->ipc_wait = 0; } q->blocking = blocking; msg_id = msgget(msg_key, IPC_CREAT | O_EXCL | 0666); if (msg_id < 0) { swWarn("msgget() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } else { q->msg_id = msg_id; q->type = type; } return 0;}このコードは非常に複雑です。 simple, create キーが $msg_key の msgqueue がブロックされています。完全な msgqueue 作成ロジックは次のとおりです。デフォルトのキーを設定しない場合、システムはデフォルト値をキーとして使用して、1 つ以上の msgqueue を作成することもできます。
上面说了第一个参数$msgkey,下面来看看第二个参数$mode的作用。上面的代码中将mode赋值给了process对象的ipc_mode属性,即系看它都在什么地方被用到。
首先看数据如队列 pop
static PHP_METHOD(swoole_process, push){ ...... struct { long type; char data[65536]; } message; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE) { RETURN_FALSE; } ...... message.type = process->id; memcpy(message.data, data, length); if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgsnd() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } RETURN_TRUE;}
从上面的代码可以看出,我们PHP调用push时候传入的值被放入了一个 message结构体中,然后再调用 swMsgQueue_push将数据入队列的。
下面是 swMsgQueue_push 的实现
int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length){ int ret; while (1) { ret = msgsnd(q->msg_id, in, length, q->ipc_wait); if (ret < 0) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { swYield(); continue; } else { return -1; } } else { return ret; } } return 0;}
这里的代码很简单,就是调用系统调用 msgsnd将数据放入队列中。
现在看数据出队列 pop
static PHP_METHOD(swoole_process, pop){ ...... struct { long type; char data[SW_MSGMAX]; } message; if (process->ipc_mode == 2) { message.type = 0; } else { message.type = process->id; } int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize); if (n < 0) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgrcv() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } SW_RETURN_STRINGL(message.data, n, 1);}
看到这里,终于看到我们之前传入的 $mode参数的作用了。如果我们如文档中所说默认值为2的话, message结构体的type就被设置为0,否则的话取当前进程的ID。说到这里有个要说的是,每个process进程在它的对象被new的时候,它的构造函数会将它自己的属性ID设置一个值,这个值是一个自增计数器,也就是会将一次初始化的process对象排队,所以每个process的id的值是不一样的。
重点是看 swMsgQueue_pop
int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length){ int flag = q->ipc_wait; long type = data->mtype; return msgrcv(q->msg_id, data, length, type, flag);}
看到了吧,type是 msgrcv系统调用需要的第4个参数。下面是对这个参数的解释:
1. =0:接收第一个消息
2. >0:接收类型等于msgtyp的第一个消息
3.<0:接收类型等于或者小于msgtyp绝对值的第一个消息
所以现在分析来看,swoole_process使用msgqueue的话是相当灵活的,让你能随心所欲实现你的需求。多队列、单队列、单队列不同类型,这样的组合带来的收益就是只有你想不到,没有swoole_process实现不了的通信模型。
参考文档:
msgget
msgsnd & msgrcv信号这个是最常见的了。所以这里讲的就不如上面两个详细了。swoole_process给了一个设置异步监听信号的函数
bool swoole_process::signal(int $signo, mixed $callback);
* 此方法基于signalfd和eventloop是异步IO,不能用于同步程序中
* 同步阻塞的程序可以使用pcntl扩展提供的pcntl_signal
* $callback如果为null,表示移除信号监听
文档中已经说明了,这个函数只能用于异步程序中,所以就不多做说明了。而关于同步程序中怎么使用pcntl_signal,rango的blog中已经有说明了,所以直接看就行 《 PHP官方的pcntl_signal性能极差》
好了,整理完了,我的思维也更清晰了。