ホームページ  >  記事  >  php教程  >  Linux I/O 多重化

Linux I/O 多重化

高洛峰
高洛峰オリジナル
2016-11-05 10:09:131413ブラウズ

Linux ではすべてがファイルです。ディスクに保存されている文字ファイル、実行可能ファイル、コンピューターに接続されている I/O デバイスなど、それらはすべて VFS によってファイルに抽象化されます。標準入力デバイスを操作する場合、実際にはデフォルトで開かれるファイル記述子 0 のファイルを操作することになります。すべてのソフトウェアは OS を介してハードウェアを操作する必要があり、OS はすべてを操作するための対応するドライバーを必要とします。このドライバーでは、このハードウェアの対応する構成と使用法が構成されます。 Linux I/O は、ブロッキング I/O、ノンブロッキング I/O、I/O 多重化、およびシグナル駆動型 I/O の 4 つのタイプに分類されます。 I/O デバイス ドライバーには、通常、ブロッキングとノンブロッキングの 2 つの構成が提供されます。最も一般的な I/O デバイスの 1 つであるキーボード (標準入力デバイス) のドライバーは、デフォルトでブロックされています。
多重化とは、プロセスが複数のブロッキング I/O から必要なデータを取得し、次のタスクの実行を継続できるようにすることです。主なアイデアは、複数のファイル記述子を同時に監視することです。ファイル記述子の設定状態がトリガーされた場合、プロセスはスリープ状態に入ります。パス多重化の主な用途の 1 つは、「I/O 多重化同時サーバー」を実装することです。この種のサーバーは、マルチスレッド同時実行またはマルチプロセス同時実行と比較して、システムのオーバーヘッドが低く、Web サーバーに適しています。

I/O のブロック

I/O のブロックとは、プロセスが I/O デバイスにアクセスしようとしたときにデバイスの準備ができていないときに、デバイス ドライバーがアクセスしようとしているプロセスをカーネルを通じてスリープ状態に移行させることを意味します。ブロッキング I/O の利点の 1 つは、CPU 時間を大幅に節約できることです。これは、プロセスが準備されていないブロッキング I/O にアクセスしようとするとスリープ状態になり、スリープ状態に入るプロセスはカーネル内にないためです。ターゲット I/O の準備ができるまでプロセス スケジューリング リストを起動し、スケジューリング リストに追加することで、CPU 時間を節約します。もちろん、ブロッキング I/O には固有の欠点もあります。プロセスがブロッキング I/O にアクセスしようとしても、アクセスが成功するかどうかが後続のタスクに決定的な影響を与えない場合、直接スリープ状態に入ります。明らかにタスクの完了が遅れることになります。

典型的なデフォルトのブロッキング IO には、標準入力デバイス、ソケット デバイス、パイプ デバイスなどが含まれます。gets()、scanf()、read() などの操作を使用してこれらの IO をリクエストし、IO にデータが流れないと、スリープするまでのプロセス。
プロセスが 3 つのパイプのいずれかを介してデータを読み取り、表示したいとします。疑似コードは次のとおりです。

read(pipe_0,buf,sizeof(buf));       //sleepprint buf;
read(pipe_1,buf,sizeof(buf));
print buf;read(pipe_2,buf,sizeof(buf));
print buf;

パイプが I/O をブロックするため、pipe_0 にデータが流れ込まない場合、プロセスは最初の読み取りになります。 () スリープ状態に入り、pipe_1、pipe_2 にデータが流れていても読み込まれなくなります。

次のコードを使用してパイプのブロッキング プロパティをリセットすると、明らかに、3 つのパイプにデータが流れ込まなければ、プロセスは要求されたデータを取得できず、データが重要な場合は実行を続行できません。ブロッキング I/O を使用します)、結果は非常に悪くなり、ポーリングに変更すると多くの CPU 時間が占有されます。

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);

プロセスが3つのパイプラインを同時に監視し、そのうちの1つにデータがあればスリープせずに実行を続ける方法。すべてのパイプラインにデータが流れていないと、再度スリープしてしまいます。これは多重化の問題です。テクノロジーが解決する必要がある。

ノンブロッキング I/O

ノンブロッキング I/O とは、プロセスが I/O デバイスにアクセスしようとすると、要求されたデータがそこから取得されたかどうかに関係なく、戻って次のタスクの実行を継続することを意味します。 。ただし、リクエストの成功が次のタスクにほとんど影響を与えない I/O リクエストには非常に適しています。ただし、ノンブロッキング I/O にアクセスし、このリクエストの失敗がプロセスの次のタスクに致命的な影響を与える場合、最も大雑把な方法は while(1){read()} ポーリングを使用することです。明らかに、このアプローチは多くの CPU 時間を消費します。

select メカニズム

select は非常に「古い」同期 I/O インターフェイスですが、I/O 多重化の良いアイデアを提供します

Model

fd_set      //创建fd_set对象,将来从中增减需要监视的
fdFD_ZERO()   //清空fd_set对象FD_SET()    //将一个fd加入
fd_set对象中  select()    //监视
fd_set对象中的文件描述符
pselect()   //先设定信号屏蔽,再监视
FD_ISSET()  //测试fd是否属于
fd_set对象FD_CLR()    //从
fd_set对象中删除fd

注:

select 最初のパラメータ nfds は、セット内の最大のファイル記述子 + 1。select はターゲットが見つかるまでファイル記述子テーブル全体を無差別に走査し、ファイル記述子は 0 から始まるため、合計はセット内の最大のファイル記述子 + 1 倍になります。

上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次

select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备

例子_I/O多路复用并发服务器

关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型

#define BUFSIZE 100#define MAXNFD  1024 int main(){    /***********服务器的listenfd已经准本好了**************/
    fd_set readfds;
    fd_set writefds;
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_SET(listenfd, &readfds);

    fd_set temprfds = readfds;
    fd_set tempwfds = writefds;    int maxfd = listenfd;    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){
        temprfds = readfds;
        tempwfds = writefds;

        nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)        if(FD_ISSET(listenfd, &temprfds)){          
            //如果监听到的是listenfd就进行accept
            int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);            
            //将新accept的scokfd加入监听集合,并保持maxfd为最大fd
            FD_SET(sockfd, &readfds);
            maxfd = maxfd>sockfd?maxfd:sockfd;            
            //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环
            if(--nready==0)                continue;
        }        
        int fd = 0;        //遍历文件描述符表,处理接收到的消息
        for(;fd<=maxfd; fd++){   
            if(fd == listenfd)                continue;            if(FD_ISSET(fd, &temprfds)){                int ret = read(fd, buf[fd], sizeof buf[0]);                if(0 == ret){    //客户端链接已经断开
                    close(fd);
                    FD_CLR(fd, &readfds);                    if(maxfd==fd) 
                        --maxfd;                    continue;
                }                //将fd加入监听可写的集合
                FD_SET(fd, &writefds);  
            }            //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中
            //将在下一次while()循环开始监视
            if(FD_ISSET(fd, &tempwfds)){                int ret = write(fd, buf[fd], sizeof buf[0]);                printf("ret %d: %d\n", fd, ret);
                FD_CLR(fd, &writefds);
            }
        }
    }
    close(listenfd);
}

poll机制

poll是System V提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等

模型

struct pollfd   fds     //创建一个pollfd类型的数组fds[0].
fd               //向fds[0]中放入需要监视的fdfds[0].
events           //向fds[0]中放入需要监视的fd的触发事件
    POLLIN              //I/O有输入
    POLLPRI             //有紧急数据需要读取
    POLLOUT             //I/O可写
    POLLRDHUP           //流式套接字连接断开或套接字处于半关闭状态
    POLLERR             //错误条件(仅针对输出)
    POLLHUP             //挂起(仅针对输出)
    POLLNVAL            //无效的请求:fd没有被打开(仅针对输出)

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    struct pollfd myfds[MAXNFD] = {0};
    myfds[0].fd = listenfd;
    myfds[0].events = POLLIN;    int maxnum = 1;    
    int nready;    //准备二维数组buf,每个fd使用buf的一行,数据干扰
    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //poll直接返回event被触发的fd的个数
        nready = poll(myfds, maxnum, -1)        int i = 0;        for(;i<maxnum; i++){            //poll通过将相应的二进制位置一来表示已经设置
            //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了
            if(myfds[i].revents & POLLIN){                if(myfds[i].fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    //将新accept的scokfd加入监听集合
                    myfds[maxnum].fd = sockfd;
                    myfds[maxnum].events = POLLIN;
                    maxnum++;                    
                    //如果意见检查了nready个fd,就直接下一个循环
                    if(--nready==0)                        continue;
                }                else{                    int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);                    if(0 == ret){    //如果连接断开了
                        close(myfds[i].fd);                        
                         //初始化将文件描述符表所有的文件描述符标记为-1
                         //close的文件描述符也标记为-1
                         //打开新的描述符时从表中搜索第一个-1
                         //open()就是这样实现始终使用最小的fd
                         //这里为了演示并没有使用这种机制
                         myfds[i].fd = -1;  
                        continue;
                    }
                    myfds[i].events = POLLOUT;
                }
            }            else if(myfds[i].revents & POLLOUT){                int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
                myfds[i].events = POLLIN;
            }
        }
    }
    close(listenfd);
}

epoll

epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT (水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT

模型

epoll_create()          //创建epoll对象struct epoll_event      //准备事件结构体和事件结构体数组
    event.events    event.data.fd ...
epoll_ctl()             //配置epoll对象epoll_wait()            //监控epoll对象中的fd及其相应的event

例子_I/O多路复用并发服务器

/* ... */int main(){    /* ... */
    /* 创建epoll对象 */
    int epoll_fd = epoll_create(1024);    
    //准备一个事件结构体
    struct epoll_event event = {0};    event.events = EPOLLIN;    event.data.fd = listenfd;   //data是一个共用体,除了fd还可以返回其他数据
    
    //ctl是监控listenfd是否有event被触发
    //如果发生了就把event通过wait带出。
    //所以,如果event里不标明fd,我们将来获取就不知道哪个fd
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);    
    struct epoll_event revents[MAXNFD] = {0};    int nready;    char buf[MAXNFD][BUFSIZE] = {0};    while(1){        //wait返回等待的event发生的数目
        //并把相应的event放到event类型的数组中
        nready = epoll_wait(epoll_fd, revents, MAXNFD, -1)        int i = 0;        for(;i<nready; i++){            //wait通过在events中设置相应的位来表示相应事件的发生
            //如果输入可用,那么下面的这个结果应该为真
            if(revents[i].events & EPOLLIN){                //如果是listenfd有数据输入
                if(revents[i].data.fd == listenfd){                    int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);                    struct epoll_event event = {0};                    event.events = EPOLLIN;                    event.data.fd = sockfd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
                }                else{                    int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);                    if(0 == ret){
                        close(revents[i].data.fd);
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
                    }
                    
                    revents[i].events = EPOLLOUT;
                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
                }
            }            else if(revents[i].events & EPOLLOUT){                int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
                revents[i].events = EPOLLIN;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
            }
        }
    }
    close(listenfd);
}


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。