博客好久没更新了,这大半年来主要精力放在了折腾swoole上,公司项目也上线了几个swoole的服务于中间件,最近马上也要上线一个中间件。swoole_server、swoole_process等都用的比较多了,现在就来慢慢总结。
从 swoole_process文档中可以看出,swoole_process进程间支持3种通信方式:
接下来就详细介绍下每一种通信的原理以及实现。
关于管道文档中是这么说的
int swoole_process::__construct(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true)
* $redirect_stdin_stdout,重定向子进程的标准输入和输出。 启用此选项后,在进程内echo将不是打印屏幕,而是写入到管道。读取键盘输入将变为从管道中读取数据。 默认为阻塞读取。* $create_pipe,是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true 如果子进程内没有进程间通信,可以设置为false
* $process对象在销毁时会自动关闭管道,子进程内如果监听了管道会收到CLOSE事件
* 1.7.22或更高版本允许设置管道的类型,默认为SOCK_STREAM流式
* 参数$create_pipe为2时,管道类型将设置为SOCK_DGRAM
int swoole_process->write(string $data)
* swoole底层使用Unix Socket实现通信,UnixSocket是内核实现的全内存通信,无任何IO消耗。在1进程write,1进程read,每次读写1024字节数据的测试中,100万次通信仅需1.02秒。* 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并。可以设置swoole_process构造函数的第三个参数为2改变为数据报式。
从上面摘自文档中的部分就可以看出,管道有两种类型: SOCK_STREAM和 SOCK_DGRAM,而这两个正是建立socket时候需要的类型参数。我们最常见的linux命令中使用管道操作符“|”,而在标准unix编程中,管道的创建也是通过函数 int pipe(int filedes[2])创建的匿名管道;或者通过 int mkfifo(const char *pathname, mode_t mode)创建的命名管道,这两种方式创建的管道都没有SOCK_*参数的。
下面是摘自swoole扩展中创建pipe的代码
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE){ RETURN_FALSE;} ...... if (pipe_type > 0){ swPipe *_pipe = emalloc(sizeof(swWorker)); int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM; if (swPipeUnsock_create(_pipe, 1, socket_type) < 0) { RETURN_FALSE; } process->pipe_object = _pipe; process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER); process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); process->pipe = process->pipe_master; zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_masterTSRMLS_CC);}
从上面的代码中可以看出,这里的pipe是通过函数 swPipeUnsock_create创建的,下面是这个函数的完整实现
int swPipeUnsock_create(swPipe *p, int blocking, int protocol){ int ret; swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock)); if (object == NULL) { swWarn("malloc() failed."); return SW_ERR; } p->blocking = blocking; ret = socketpair(AF_UNIX, protocol, 0, object->socks); if (ret < 0) { swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno); return SW_ERR; } else { //Nonblock if (blocking == 0) { swSetNonBlock(object->socks[0]); swSetNonBlock(object->socks[1]); } int sbsize = SwooleG.socket_buffer_size; swSocket_set_buffer_size(object->socks[0], sbsize); swSocket_set_buffer_size(object->socks[1], sbsize); p->object = object; p->read = swPipeUnsock_read; p->write = swPipeUnsock_write; p->getFd = swPipeUnsock_getFd; p->close = swPipeUnsock_close; } return 0;}
从上面的代码就可以很直观的看到,是通过函数 socketpair创建了一对已连接的(UNIX族)无名socket。在Linux中,完全可以把这一对socket当成pipe返回的文件描述符一样使用,唯一的区别就是这一对文件描述符中的任何一个都可读和可写。所以这样主进程和子进程间的通信就完全是在进行socket通信。
相关资料:
Linux 上实现双向进程间通信管道
socketpair
bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2);
* $msgkey是消息队列的key,默认会使用ftok(FILE)* $mode通信模式,默认为2,表示争抢模式,所有创建的子进程都会从队列中取数据
* 使用模式2后,创建的子进程无法进行单独通信,比如发给特定子进程。
* $process对象并未执行start,也可以执行push/pop向队列推送/提取数据
* 消息队列通信方式与管道不可公用。消息队列不支持EventLoop,使用消息队列后只能使用同步阻塞模式
上面是摘自文档中的一些描述。还是直接看源码怎么实现的
static PHP_METHOD(swoole_process, useQueue){ long msgkey = 0; long mode = 2; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE) { RETURN_FALSE; } swWorker *process = swoole_get_object(getThis()); if (msgkey <= 0) {#if PHP_MAJOR_VERSION == 7 msgkey = ftok(execute_data->func->op_array.filename->val, 0);#else msgkey = ftok(EG(active_op_array)->filename, 0);#endif } swMsgQueue *queue = emalloc(sizeof(swMsgQueue)); if (swMsgQueue_create(queue, 1, msgkey, 0) < 0) { RETURN_FALSE; } queue->delete = 0; process->queue = queue; process->ipc_mode = mode; RETURN_TRUE;}
从源码可以看到,$msgkey的默认值为0,且当它的值小于等于0的时候,就会用使用默认值 ftok(FILE)。而mode在这里并没有直接使用,而是将它赋值给了process对象的ipc_mode属性。
但是这里使用 swMsgQueue_create创建了队列,下面是这个函数的原型
int swMsgQueue_create(swMsgQueue *q, int blocking, key_tmsg_key, long type){ int msg_id; if (blocking == 0) { q->ipc_wait = IPC_NOWAIT; } else { q->ipc_wait = 0; } q->blocking = blocking; msg_id = msgget(msg_key, IPC_CREAT | O_EXCL | 0666); if (msg_id < 0) { swWarn("msgget() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } else { q->msg_id = msg_id; q->type = type; } return 0;}
这个代码很简单,创建了一个key为$msg_key并且是阻塞性的msgqueue。这里就是完整的msgqueue的创建逻辑,当你没有给它设置默认的key的时候,系统取一个默认的值作为key,当然你也能根据你的需要设置任意的key来创建一个或者多个msgqueue。
上面说了第一个参数$msgkey,下面来看看第二个参数$mode的作用。上面的代码中将mode赋值给了process对象的ipc_mode属性,即系看它都在什么地方被用到。
首先看数据如队列 pop
static PHP_METHOD(swoole_process, push){ ...... struct { long type; char data[65536]; } message; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE) { RETURN_FALSE; } ...... message.type = process->id; memcpy(message.data, data, length); if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgsnd() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } RETURN_TRUE;}
从上面的代码可以看出,我们PHP调用push时候传入的值被放入了一个 message结构体中,然后再调用 swMsgQueue_push将数据入队列的。
下面是 swMsgQueue_push 的实现
int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length){ int ret; while (1) { ret = msgsnd(q->msg_id, in, length, q->ipc_wait); if (ret < 0) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { swYield(); continue; } else { return -1; } } else { return ret; } } return 0;}
这里的代码很简单,就是调用系统调用 msgsnd将数据放入队列中。
现在看数据出队列 pop
static PHP_METHOD(swoole_process, pop){ ...... struct { long type; char data[SW_MSGMAX]; } message; if (process->ipc_mode == 2) { message.type = 0; } else { message.type = process->id; } int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize); if (n < 0) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgrcv() failed. Error: %s[%d]", strerror(errno), errno); RETURN_FALSE; } SW_RETURN_STRINGL(message.data, n, 1);}
看到这里,终于看到我们之前传入的 $mode参数的作用了。如果我们如文档中所说默认值为2的话, message结构体的type就被设置为0,否则的话取当前进程的ID。说到这里有个要说的是,每个process进程在它的对象被new的时候,它的构造函数会将它自己的属性ID设置一个值,这个值是一个自增计数器,也就是会将一次初始化的process对象排队,所以每个process的id的值是不一样的。
重点是看 swMsgQueue_pop
int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length){ int flag = q->ipc_wait; long type = data->mtype; return msgrcv(q->msg_id, data, length, type, flag);}
看到了吧,type是 msgrcv系统调用需要的第4个参数。下面是对这个参数的解释:
1. =0:接收第一个消息
2. >0:接收类型等于msgtyp的第一个消息
3.
所以现在分析来看,swoole_process使用msgqueue的话是相当灵活的,让你能随心所欲实现你的需求。多队列、单队列、单队列不同类型,这样的组合带来的收益就是只有你想不到,没有swoole_process实现不了的通信模型。
参考文档:
msgget
msgsnd & msgrcv信号这个是最常见的了。所以这里讲的就不如上面两个详细了。swoole_process给了一个设置异步监听信号的函数
bool swoole_process::signal(int $signo, mixed $callback);
* 此方法基于signalfd和eventloop是异步IO,不能用于同步程序中
* 同步阻塞的程序可以使用pcntl扩展提供的pcntl_signal
* $callback如果为null,表示移除信号监听
文档中已经说明了,这个函数只能用于异步程序中,所以就不多做说明了。而关于同步程序中怎么使用pcntl_signal,rango的blog中已经有说明了,所以直接看就行 《 PHP官方的pcntl_signal性能极差》
好了,整理完了,我的思维也更清晰了。