Home  >  Article  >  php教程  >  Linux I/O multiplexing

Linux I/O multiplexing

高洛峰
高洛峰Original
2016-11-05 10:09:131413browse

Everything in Linux is a file. Whether it is the character files we store on the disk, executable files or our I/O devices connected to the computer, they are all abstracted into files by VFS. For example, the standard input device is the keyboard by default. We When operating the standard input device, you are actually operating a file with a file descriptor of 0 that is opened by default. All software needs to operate the hardware through the OS, and the OS needs a corresponding driver to operate all hardware. In this driver The corresponding configuration and usage of this hardware are configured. Linux I/O is divided into four types: blocking I/O, non-blocking I/O, I/O multiplexing, and signal-driven I/O. For I/O device drivers, two configurations are generally provided: blocking and non-blocking. The driver for one of our most common I/O devices, the keyboard (standard input device), is blocking by default.
Multiplexing is to enable the process to obtain the data it wants from multiple blocking I/O and continue to perform the next task. The main idea is to monitor multiple file descriptors at the same time. If the setting state of a file descriptor is triggered, the process will continue to execute. If the setting state of any file descriptor is not triggered, the process will enter sleep
One of the main uses of path multiplexing is to implement "I/O multiplexing concurrent server". Compared with multi-thread concurrency or multi-process concurrency, this kind of server has lower system overhead and is more suitable for web servers.

Blocking I/O

Blocking I/O means that when a process tries to access the I/O device and the device is not ready, the device driver will let the process trying to access enter the sleep state through the kernel. One benefit of blocking I/O is that it can greatly save CPU time, because once a process attempts to access an unprepared blocking I/O, it will enter the sleep state, and the process that enters the sleep state is not in the kernel's process scheduling list. until the target I/O is ready, wake it up and add it to the scheduling list, thus saving CPU time. Of course, blocking I/O also has its inherent shortcomings. If a process attempts to access a blocking I/O, but whether the access is successful or not does not have a decisive impact on the subsequent task, then directly entering the sleep state will obviously delay the completion of its task.
Typical default blocking IO includes standard input devices, socket devices, pipe devices, etc. When we use gets(), scanf(), read() and other operations to request these IOs and no data flows into the IO, it will cause the process to sleep. .
Suppose a process wants to read and display data through any one of the three pipes. The pseudo code is as follows

read(pipe_0,buf,sizeof(buf));       //sleepprint buf;
read(pipe_1,buf,sizeof(buf));
print buf;read(pipe_2,buf,sizeof(buf));
print buf;

Since the pipe blocks I/O, if no data flows into pipe_0, the process will be at the first read() Enter the sleep state and even if there is data flowing into pipe_1 and pipe_2, it will not be read.
If we use the following code to reset the blocking attribute of the pipe, obviously, if no data flows into the three pipes, the process will not be able to obtain the requested data and continue execution. If the data is important (that's why we use blocking I /O), the result will be very bad, changing to polling will occupy a lot of CPU time.

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);

How to make the process monitor three pipelines at the same time, and continue executing if one of them has data without sleeping. If there is no data flowing into all of them, it will sleep again. This is a problem that multiplexing technology needs to solve.

Non-blocking I/O

Non-blocking I/O means that when a process attempts to access an I/O device, it will return and continue to perform the next task regardless of whether the requested data is obtained from it. , but it is very suitable for I/O requests where the success of the request has little impact on the next task. But if you access a non-blocking I/O, but the failure of this request will have a fatal impact on the next task of the process, the most crude way is to use while(1){read()} polling. Obviously, this approach takes up a lot of CPU time.

select mechanism

select is a very "old" synchronous I/O interface, but it provides a good idea of ​​I/O multiplexing

Model

fd_set      //创建fd_set对象,将来从中增减需要监视的
fdFD_ZERO()   //清空fd_set对象FD_SET()    //将一个fd加入
fd_set对象中  select()    //监视
fd_set对象中的文件描述符
pselect()   //先设定信号屏蔽,再监视
FD_ISSET()  //测试fd是否属于
fd_set对象FD_CLR()    //从
fd_set对象中删除fd

Note:

select The first parameter nfds refers to the largest file descriptor in the set + 1, because select will traverse the entire file descriptor table indiscriminately until the target is found, and the file descriptors start from 0, so the total is the largest in the set file descriptor +1 times.

上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次

select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备

例子_I/O多路复用并发服务器

关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型

#define BUFSIZE 100#define MAXNFD  1024 int main(){    /***********服务器的listenfd已经准本好了**************/
    fd_set readfds;
    fd_set writefds;
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_SET(listenfd, &readfds);

    fd_set temprfds = readfds;
    fd_set tempwfds = writefds;    int maxfd = listenfd;    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){
        temprfds = readfds;
        tempwfds = writefds;

        nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)        if(FD_ISSET(listenfd, &temprfds)){          
            //如果监听到的是listenfd就进行accept
            int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);            
            //将新accept的scokfd加入监听集合,并保持maxfd为最大fd
            FD_SET(sockfd, &readfds);
            maxfd = maxfd>sockfd?maxfd:sockfd;            
            //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环
            if(--nready==0)                continue;
        }        
        int fd = 0;        //遍历文件描述符表,处理接收到的消息
        for(;fd<=maxfd; fd++){   
            if(fd == listenfd)                continue;            if(FD_ISSET(fd, &temprfds)){                int ret = read(fd, buf[fd], sizeof buf[0]);                if(0 == ret){    //客户端链接已经断开
                    close(fd);
                    FD_CLR(fd, &readfds);                    if(maxfd==fd) 
                        --maxfd;                    continue;
                }                //将fd加入监听可写的集合
                FD_SET(fd, &writefds);  
            }            //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中
            //将在下一次while()循环开始监视
            if(FD_ISSET(fd, &tempwfds)){                int ret = write(fd, buf[fd], sizeof buf[0]);                printf("ret %d: %d\n", fd, ret);
                FD_CLR(fd, &writefds);
            }
        }
    }
    close(listenfd);
}

poll机制

poll是System V提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等

模型

struct pollfd   fds     //创建一个pollfd类型的数组fds[0].
fd               //向fds[0]中放入需要监视的fdfds[0].
events           //向fds[0]中放入需要监视的fd的触发事件
    POLLIN              //I/O有输入
    POLLPRI             //有紧急数据需要读取
    POLLOUT             //I/O可写
    POLLRDHUP           //流式套接字连接断开或套接字处于半关闭状态
    POLLERR             //错误条件(仅针对输出)
    POLLHUP             //挂起(仅针对输出)
    POLLNVAL            //无效的请求:fd没有被打开(仅针对输出)

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    struct pollfd myfds[MAXNFD] = {0};
    myfds[0].fd = listenfd;
    myfds[0].events = POLLIN;    int maxnum = 1;    
    int nready;    //准备二维数组buf,每个fd使用buf的一行,数据干扰
    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //poll直接返回event被触发的fd的个数
        nready = poll(myfds, maxnum, -1)        int i = 0;        for(;i<maxnum; i++){            //poll通过将相应的二进制位置一来表示已经设置
            //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了
            if(myfds[i].revents & POLLIN){                if(myfds[i].fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    //将新accept的scokfd加入监听集合
                    myfds[maxnum].fd = sockfd;
                    myfds[maxnum].events = POLLIN;
                    maxnum++;                    
                    //如果意见检查了nready个fd,就直接下一个循环
                    if(--nready==0)                        continue;
                }                else{                    int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);                    if(0 == ret){    //如果连接断开了
                        close(myfds[i].fd);                        
                         //初始化将文件描述符表所有的文件描述符标记为-1
                         //close的文件描述符也标记为-1
                         //打开新的描述符时从表中搜索第一个-1
                         //open()就是这样实现始终使用最小的fd
                         //这里为了演示并没有使用这种机制
                         myfds[i].fd = -1;  
                        continue;
                    }
                    myfds[i].events = POLLOUT;
                }
            }            else if(myfds[i].revents & POLLOUT){                int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
                myfds[i].events = POLLIN;
            }
        }
    }
    close(listenfd);
}

epoll

epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT (水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT

模型

epoll_create()          //创建epoll对象struct epoll_event      //准备事件结构体和事件结构体数组
    event.events    event.data.fd ...
epoll_ctl()             //配置epoll对象epoll_wait()            //监控epoll对象中的fd及其相应的event

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    /* 创建epoll对象 */
    int epoll_fd = epoll_create(1024);    
    //准备一个事件结构体
    struct epoll_event event = {0};    event.events = EPOLLIN;    event.data.fd = listenfd;   //data是一个共用体,除了fd还可以返回其他数据
    
    //ctl是监控listenfd是否有event被触发
    //如果发生了就把event通过wait带出。
    //所以,如果event里不标明fd,我们将来获取就不知道哪个fd
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);    
    struct epoll_event revents[MAXNFD] = {0};    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //wait返回等待的event发生的数目
        //并把相应的event放到event类型的数组中
        nready = epoll_wait(epoll_fd, revents, MAXNFD, -1)        int i = 0;        for(;i<nready; i++){            //wait通过在events中设置相应的位来表示相应事件的发生
            //如果输入可用,那么下面的这个结果应该为真
            if(revents[i].events & EPOLLIN){                //如果是listenfd有数据输入
                if(revents[i].data.fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    struct epoll_event event = {0};                    event.events = EPOLLIN;                    event.data.fd = sockfd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
                }                else{                    int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);                    if(0 == ret){
                        close(revents[i].data.fd);
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
                    }
                    
                    revents[i].events = EPOLLOUT;
                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
                }
            }            else if(revents[i].events & EPOLLOUT){                int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
                revents[i].events = EPOLLIN;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
            }
        }
    }
    close(listenfd);
}


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn