>  기사  >  php教程  >  Linux I/O 멀티플렉싱

Linux I/O 멀티플렉싱

高洛峰
高洛峰원래의
2016-11-05 10:09:131379검색

Linux의 모든 것은 파일입니다. 디스크에 저장하는 문자 파일, 실행 파일, 컴퓨터에 연결된 I/O 장치 등 모두 VFS에 의해 파일로 추상화됩니다. 장치는 기본적으로 키보드입니다. 표준 입력 장치를 작동할 때 실제로는 기본적으로 열리는 파일 설명자 0을 사용하여 파일을 작동합니다. 모든 소프트웨어는 OS를 통해 하드웨어를 작동해야 하며 OS에는 해당 드라이버가 필요합니다. 모든 하드웨어를 작동하십시오. 이 드라이버는 이 하드웨어의 해당 구성과 사용법을 프로그램에서 구성합니다. Linux I/O는 블로킹 I/O, 비블로킹 I/O, I/O 멀티플렉싱, 신호 구동 I/O의 네 가지 유형으로 구분됩니다. I/O 장치 드라이버의 경우 일반적으로 차단 및 비차단이라는 두 가지 구성이 제공됩니다. 가장 일반적인 I/O 장치 중 하나인 키보드(표준 입력 장치)의 드라이버는 기본적으로 차단됩니다.
멀티플렉싱은 프로세스가 다중 블로킹 I/O에서 원하는 데이터를 얻고 다음 작업을 계속 수행할 수 있도록 하는 것입니다. 주요 아이디어는 여러 파일 설명자를 동시에 모니터링하는 것입니다. 파일 설명자의 설정 상태가 트리거되면 프로세스가 계속 실행됩니다. 파일 설명자의 설정 상태가 트리거되지 않으면 프로세스가 절전 모드로 전환됩니다.
One 멀티플렉싱의 주요 용도 중 하나는 "I/O 멀티플렉싱 동시 서버"를 구현하는 것입니다. 다중 스레드 동시성 또는 다중 프로세스 동시성에 비해 이러한 종류의 서버는 시스템 오버헤드가 낮고 웹 서버에 더 적합합니다.

I/O 차단

I/O 차단은 프로세스가 I/O 장치에 액세스하려고 시도하고 장치가 준비되지 않은 경우 장치 드라이버가 장치가 커널을 통과하도록 허용하는 것을 의미합니다. . 액세스하려는 프로세스가 절전 상태로 전환됩니다. I/O 차단의 이점 중 하나는 CPU 시간을 크게 절약할 수 있다는 것입니다. 프로세스가 준비되지 않은 차단 I/O에 액세스하려고 하면 절전 상태로 들어가고 절전 상태로 들어간 프로세스는 커널에 있지 않기 때문입니다. 대상 I/O가 준비될 때까지 프로세스 스케줄링 목록을 깨우고 스케줄링 목록에 추가하여 CPU 시간을 절약합니다. 물론, Blocking I/O에도 고유한 단점이 있습니다. 프로세스가 Blocking I/O에 접근을 시도하지만, 접근의 성공 여부가 후속 작업에 결정적인 영향을 미치지 않으면 바로 Sleep 상태로 진입하게 됩니다. 분명히 임무 완료가 지연될 것입니다.
일반적인 기본 차단 IO에는 표준 입력 장치, 소켓 장치, 파이프 장치 등이 포함됩니다. gets(), scanf(), read() 및 기타 작업을 사용하여 이러한 IO를 요청하고 IO로 데이터 흐름이 없으면 프로세스가 잠자기 상태가 됩니다.
프로세스가 세 개의 파이프 중 하나를 통해 데이터를 읽고 표시하려고 한다고 가정합니다.

read(pipe_0,buf,sizeof(buf));       //sleepprint buf;
read(pipe_1,buf,sizeof(buf));
print buf;read(pipe_2,buf,sizeof(buf));
print buf;

파이프가 I/O를 차단하므로 데이터가 유입되지 않는 경우 Pipe_0의 경우 첫 번째 read()에서 슬립 상태로 진입하며, Pipe_1, Pipe_2에 데이터가 유입되더라도 읽혀지지 않습니다.
다음 코드를 사용하여 파이프의 차단 속성을 재설정하면 당연히 세 개의 파이프에 데이터가 흐르지 않으면 프로세스가 요청한 데이터를 얻지 못하고 데이터가 중요한 경우 계속 실행될 수 있습니다. 이것이 우리가 Blocking I/O를 사용하는 이유입니다.) 결과가 매우 나쁠 것이며 폴링으로 변경하면 CPU 시간이 많이 소모됩니다.

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);

프로세스가 세 개의 파이프를 동시에 모니터링하고 그 중 하나에 데이터가 있으면 Sleep 없이 계속 실행하도록 하는 방법은 데이터가 유입되지 않고 Sleep 상태가 되면 다중화 기술의 문제입니다. 해결해야합니다.

비차단 I/O

비차단 I/O는 프로세스가 I/O 장치에 액세스하려고 시도할 때 반환 여부에 관계없이 연결을 계속 실행하는 것을 의미합니다. 요청된 데이터는 다운 미션에서 획득됩니다. , 그러나 요청 성공이 다음 작업에 거의 영향을 미치지 않는 I/O 요청에 매우 적합합니다. 그러나 비차단 I/O에 액세스했지만 이 요청의 실패가 프로세스의 다음 작업에 치명적인 영향을 미칠 경우 가장 조잡한 방법은 while(1){read()} 폴링을 사용하는 것입니다. 분명히 이 접근 방식은 많은 CPU 시간을 차지합니다.

select 메커니즘

select는 매우 "오래된" 동기 I/O 인터페이스이지만 우수한 I/O 다중화 아이디어를 제공합니다

모델

fd_set      //创建fd_set对象,将来从中增减需要监视的
fdFD_ZERO()   //清空fd_set对象FD_SET()    //将一个fd加入
fd_set对象中  select()    //监视
fd_set对象中的文件描述符
pselect()   //先设定信号屏蔽,再监视
FD_ISSET()  //测试fd是否属于
fd_set对象FD_CLR()    //从
fd_set对象中删除fd

참고:

select의 첫 번째 매개변수 nfds는 집합에서 가장 큰 파일 설명자 + 1을 참조합니다. 왜냐하면 select는 전체 파일 설명자를 무차별적으로 순회하기 때문입니다. 테이블은 대상을 찾을 때까지 사용되며, 파일 설명자는 0부터 시작하므로 총합은 세트 + 1번에서 가장 큰 파일 설명자가 됩니다.

上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次

select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备

例子_I/O多路复用并发服务器

关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型

#define BUFSIZE 100#define MAXNFD  1024 int main(){    /***********服务器的listenfd已经准本好了**************/
    fd_set readfds;
    fd_set writefds;
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_SET(listenfd, &readfds);

    fd_set temprfds = readfds;
    fd_set tempwfds = writefds;    int maxfd = listenfd;    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){
        temprfds = readfds;
        tempwfds = writefds;

        nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)        if(FD_ISSET(listenfd, &temprfds)){          
            //如果监听到的是listenfd就进行accept
            int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);            
            //将新accept的scokfd加入监听集合,并保持maxfd为最大fd
            FD_SET(sockfd, &readfds);
            maxfd = maxfd>sockfd?maxfd:sockfd;            
            //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环
            if(--nready==0)                continue;
        }        
        int fd = 0;        //遍历文件描述符表,处理接收到的消息
        for(;fd<=maxfd; fd++){   
            if(fd == listenfd)                continue;            if(FD_ISSET(fd, &temprfds)){                int ret = read(fd, buf[fd], sizeof buf[0]);                if(0 == ret){    //客户端链接已经断开
                    close(fd);
                    FD_CLR(fd, &readfds);                    if(maxfd==fd) 
                        --maxfd;                    continue;
                }                //将fd加入监听可写的集合
                FD_SET(fd, &writefds);  
            }            //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中
            //将在下一次while()循环开始监视
            if(FD_ISSET(fd, &tempwfds)){                int ret = write(fd, buf[fd], sizeof buf[0]);                printf("ret %d: %d\n", fd, ret);
                FD_CLR(fd, &writefds);
            }
        }
    }
    close(listenfd);
}

poll机制

poll是System V提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等

模型

struct pollfd   fds     //创建一个pollfd类型的数组fds[0].
fd               //向fds[0]中放入需要监视的fdfds[0].
events           //向fds[0]中放入需要监视的fd的触发事件
    POLLIN              //I/O有输入
    POLLPRI             //有紧急数据需要读取
    POLLOUT             //I/O可写
    POLLRDHUP           //流式套接字连接断开或套接字处于半关闭状态
    POLLERR             //错误条件(仅针对输出)
    POLLHUP             //挂起(仅针对输出)
    POLLNVAL            //无效的请求:fd没有被打开(仅针对输出)

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    struct pollfd myfds[MAXNFD] = {0};
    myfds[0].fd = listenfd;
    myfds[0].events = POLLIN;    int maxnum = 1;    
    int nready;    //准备二维数组buf,每个fd使用buf的一行,数据干扰
    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //poll直接返回event被触发的fd的个数
        nready = poll(myfds, maxnum, -1)        int i = 0;        for(;i<maxnum; i++){            //poll通过将相应的二进制位置一来表示已经设置
            //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了
            if(myfds[i].revents & POLLIN){                if(myfds[i].fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    //将新accept的scokfd加入监听集合
                    myfds[maxnum].fd = sockfd;
                    myfds[maxnum].events = POLLIN;
                    maxnum++;                    
                    //如果意见检查了nready个fd,就直接下一个循环
                    if(--nready==0)                        continue;
                }                else{                    int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);                    if(0 == ret){    //如果连接断开了
                        close(myfds[i].fd);                        
                         //初始化将文件描述符表所有的文件描述符标记为-1
                         //close的文件描述符也标记为-1
                         //打开新的描述符时从表中搜索第一个-1
                         //open()就是这样实现始终使用最小的fd
                         //这里为了演示并没有使用这种机制
                         myfds[i].fd = -1;  
                        continue;
                    }
                    myfds[i].events = POLLOUT;
                }
            }            else if(myfds[i].revents & POLLOUT){                int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
                myfds[i].events = POLLIN;
            }
        }
    }
    close(listenfd);
}

epoll

epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT (水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT

模型

epoll_create()          //创建epoll对象struct epoll_event      //准备事件结构体和事件结构体数组
    event.events    event.data.fd ...
epoll_ctl()             //配置epoll对象epoll_wait()            //监控epoll对象中的fd及其相应的event

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    /* 创建epoll对象 */
    int epoll_fd = epoll_create(1024);    
    //准备一个事件结构体
    struct epoll_event event = {0};    event.events = EPOLLIN;    event.data.fd = listenfd;   //data是一个共用体,除了fd还可以返回其他数据
    
    //ctl是监控listenfd是否有event被触发
    //如果发生了就把event通过wait带出。
    //所以,如果event里不标明fd,我们将来获取就不知道哪个fd
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);    
    struct epoll_event revents[MAXNFD] = {0};    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //wait返回等待的event发生的数目
        //并把相应的event放到event类型的数组中
        nready = epoll_wait(epoll_fd, revents, MAXNFD, -1)        int i = 0;        for(;i<nready; i++){            //wait通过在events中设置相应的位来表示相应事件的发生
            //如果输入可用,那么下面的这个结果应该为真
            if(revents[i].events & EPOLLIN){                //如果是listenfd有数据输入
                if(revents[i].data.fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    struct epoll_event event = {0};                    event.events = EPOLLIN;                    event.data.fd = sockfd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
                }                else{                    int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);                    if(0 == ret){
                        close(revents[i].data.fd);
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
                    }
                    
                    revents[i].events = EPOLLOUT;
                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
                }
            }            else if(revents[i].events & EPOLLOUT){                int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
                revents[i].events = EPOLLIN;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
            }
        }
    }
    close(listenfd);
}


성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.