ホームページ  >  記事  >  バックエンド開発  >  Python select epoll Paul の解析の詳細な説明

Python select epoll Paul の解析の詳細な説明

高洛峰
高洛峰オリジナル
2017-03-15 13:20:012164ブラウズ

select、poll、epoll の違い

select

select は、1983 年に 4.2BSD で初めて登場しました。これは、select() システム コール (linux では、その中のすべてがselect() が返されると、配列内の準備完了ファイル記述子がカーネルによって変更され (準備完了状態になり)、プロセスがこれらのファイル記述子を取得して後続の処理を実行できるようになります。読み取りおよび書き込み操作 (選択は、ネットワーク インターフェイス が準備完了 ステータス のディレクトリにあるファイル記述子の数を継続的に監視します [ネットワーク インターフェイスでは、接続によって「ファイル」が作成されます]、準備ができたら、選択しますこのファイル記述子を操作できます)。 【ソケットサーバーはマルチスレッドで複数のリクエストを処理します。各接続には処理するスレッドが割り当てられますが、プロセスの実行コードはシリアルである必要がありますが、その効果はプロセスを通じて実装される必要があります。同時実行性の観点からは、プロセス内にメイン スレッドは 1 つだけあります。つまり、1 つのスレッドを使用して同時実行性の効果が得られます。複数のスレッドを使用して複数の同時実行を実現するのではなく、1 つのプロセスを使用して複数の同時実行を実現するのはなぜでしょうか?

==========答え: マルチスレッドよりも 1 つのプロセスで複数の同時実行を達成する方が効率的であるためです。マルチスレッドの開始には多くのオーバーヘッドがあり、CPU が必要とするためです。各プロセスのステータスを常にチェックして、どのスレッドが実行できるかを決定します。これはシステムにとってもストレスになります。単一プロセスを使用すると、このオーバーヘッドとシステムへの負荷を回避できます。では、単一プロセスはどのようにして複数の同時実行を実現するのでしょうか。 ? ?

======== 答え: プロデューサとコンシューマのモデル (非同期) は非常に巧みに使用されており、プロデューサとコンシューマは選択を通じて複数の接続を受信できます (以前のソケット プロセスは受信のみ可能です)。 1 つの接続。新しい接続を受信すると、ソケット プロセスが最初にクライアントと通信する必要があるため、この 2 つの接続はブロックされます (クライアントはメッセージを送信し、サーバーはそれを受信するのを待機します)。 ..サーバーは受信を待機しています...] がブロックされています。この時点で別の接続がある場合は、前の接続が切断されるまで待機する必要があります。 -----言い換えれば、基本的なソケットを使用してマルチプロセスを実装するとブロックされます。この問題を解決するために、接続ごとに 1 つのスレッドが生成され、ブロックされませんが、スレッドが多すぎると、オーバーヘッドと CPU への負荷は比較的大きくなります。) 単一ソケットの場合、ブロックされているときはほとんどの場合、IO 操作を待機しています (ネットワーク操作も IO 操作です)。この状況を回避するために、非同期 ============= クライアントは接続を開始し、サーバー上でファイル ハンドルを登録し、サーバーはこれらのファイル ハンドルのリストを継続的にポーリングします。スレッドを開始せずにクライアントとの接続を確立します。このとき、メイン プロセスはメッセージを送受信できるだけでなく、他のクライアントもメイン プロセスに接続できません。接続されたクライアントだけでなく、新しいクライアントとの接続を確立するときも、クライアントがメッセージを送信し、サーバーがメッセージを読み取る限り、クライアントによって接続されたファイル ハンドルのリストを更新するためのポーリングが非常に高速になります (デッド ループ

)。クライアントに返されるメッセージを受信するために別のリストが使用され、このリストも常にブラッシングされ、クライアントに返されます。このようにして、クライアントとの通信は完了します。クライアントとの接続はまだ切断されていません。次のポーリングに入ります。 】

select 利点

select は現在、ほぼすべてのプラットフォームでサポートされており、優れたクロスプラットフォーム機能を備えています。

select の欠点

select を呼び出すたびに、FD コレクションをユーザー モードからカーネル モードにコピーする必要があります。FD が多数ある場合、このオーバーヘッドは非常に大きくなります

数には上限があります。単一プロセスが監視できる FD の数は、Linux のデフォルトは 1024 です (この制限は、マクロ定義を変更するかカーネルを再コンパイルすることで増やすことができます)

そして、選択の fd は配列に配置されるため、配列全体がfd が多い場合、毎回線形に走査する必要があり、オーバーヘッドも非常に高くなります

python

select

読み取り可能、書き込み可能、​​例外al = select.select(rlist, wlist, xlist[, timeout])の場合はselectの関数を呼び出します。最初の3つのパラメータはそれぞれ3つのリストと配列です。 objects はすべて待機可能です object: それらはすべて integers (file 記述子) のファイル記述子、またはファイル記述子を返す fileno() メソッドを持つオブジェクトです

rlist: 読み取りを待機しています。 readiness list

wlist: 書き込み待機リスト readiness

errlist: 「異常」待機リスト

selectメソッドは、ファイル記述子を監視し、ファイル記述子が変更された場合に記述子を取得するために使用されます。

1. これら 3 つのリストは空のリストにすることができますが、3 つの空のリストを受け取るかどうかはシステムに依存します (Linux では許容されますが、Windows では許容されません)。

2. rlist シーケンス内の記述子が読み取り可能になると (accetp および read)、変更された記述子が取得され、読み取り可能なシーケンスに追加されます。

3. wlist シーケンスに記述子が含まれる場合、その記述子が読み取り可能なシーケンスに追加されます。シーケンス内のすべての記述子が書き込み可能なシーケンスに追加されます

4. errlist シーケンスのハンドルでエラーが発生した場合、エラー ハンドルが例外シーケンスに追加されます

5. タイムアウトが設定されていない場合は、を選択します。監視対象の記述子が変更されるまでブロックします

タイムアウト = 1 の場合、監視対象のハンドルに変更がない場合は、1 秒間ブロックするを選択し、監視対象の記述子 (fd) が変更された場合は 3 つの空のリストを返します。直接実行されます。

6. Ptython ファイル オブジェクト (sys.stdin、または open() および os.open() によって返されるオブジェクトなど) はリストで受け入れることができ、ソケット オブジェクトはソケット.socket() を返します。適切な fileno() メソッド (ランダムな整数ではなく、実際にファイル記述子を返す必要がある) がある限り、クラスをカスタマイズすることもできます。

select 例:

Python の select() メソッドは、オペレーティング システムの IO インターフェイスを直接呼び出し、ソケット、オープン ファイル、およびパイプ (fileno() メソッドによるすべてのファイル ハンドル) が読み取り可能および書き込み可能になったことを監視します。 select() を使用すると、複数の接続を同時に監視することが簡単になります。また、select は、コード分​​析:

select() メソッドは、3 つの通信リストを受信して​​監視します。最初のデータはすべて、外部から送信されたデータを指します。 2 つ目はすべての入力データであり、外部から送信されるデータを指します。1 つ目は、すべての送信データ (発信データ) を監視して受信することです。次に、

エラー メッセージ

を監視する必要があります。 select() に渡す入出力情報を含む 2 つのリスト。# 読み取りが期待されるソケットinputs = [server ]# 書き込みが期待されるソケットoutputs = [ ]


すべての受信接続とデータクライアントからのデータは、上記のリストにあるサーバーのメイン ループ プログラムによって処理されます。現在のサーバーは、接続が書き込み可能になるまで待機してから、データを受信して​​返す必要があります (そのため、接続直後には戻りません)。これは、各接続がまず入力データまたは出力データをキューにキャッシュし、次に選択によって取り出して送信する必要があるためです。

# 送信メッセージ キュー (socket:Queue)message_queues = {}

サーバー プログラムの

main
部分はループし、select() をすべて実行してネットワーク アクティビティをブロックして待機します。

これは次のとおりです。 select() を呼び出すと、プログラムのメイン ループはブロックされ、新しい接続とデータが入ってくるまで待機します 入力中: # 少なくとも 1 つのソケットが処理の準備ができるまで待機します print >>sys. stderr、「次のイベントを待っています」 readable、writable、excellent = select.select(inputs, Outputs, inputs)

入力、出力、例外 (ここでは入力と共有) を select() に渡すと、3 が返されます。新しいリストでは、読み取り可能、書き込み可能、​​および例外として割り当てられました。読み取り可能リスト内のすべてのソケット接続は、受信できるデータ (recv) を表します。書き込み可能リスト内のすべてのソケット接続は、送信 (送信) できるデータを表します。 . ) はソケット接続を操作します。接続通信でエラーが発生した場合、エラーは例外リストに書き込まれます。

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

 

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

 

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

 

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

 

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉

#coding:UTF8

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10003)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

client

客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互,通过循环通过每个socket连接给server发送和接收数据。

server:
starting up on localhost prot 10000

waiting for the next event
new connection from ('127.0.0.1', 54812)

waiting for the next event
new connection from ('127.0.0.1', 54813)
received "This is the message. " from ('127.0.0.1', 54812)

waiting for the next event
received "This is the message. " from ('127.0.0.1', 54813)
sending "This is the message. " to ('127.0.0.1', 54812)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
sending "This is the message. " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "It will be sent " from ('127.0.0.1', 54812)
received "It will be sent " from ('127.0.0.1', 54813)

waiting for the next event
sending "It will be sent " to ('127.0.0.1', 54812)
sending "It will be sent " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "in parts." from ('127.0.0.1', 54812)
received "in parts." from ('127.0.0.1', 54813)

waiting for the next event
sending "in parts." to ('127.0.0.1', 54812)
sending "in parts." to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
closing ('127.0.0.1', 54813) after reading no data
closing ('127.0.0.1', 54813) after reading no data

waiting for the next event





client:
connecting to localhost port 10000
('127.0.0.1', 54812): sending "This is the message. "
('127.0.0.1', 54813): sending "This is the message. "
('127.0.0.1', 54812): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54813): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54812): sending "It will be sent "
('127.0.0.1', 54813): sending "It will be sent "
('127.0.0.1', 54812): received "IT WILL BE SENT "
('127.0.0.1', 54813): received "IT WILL BE SENT "
('127.0.0.1', 54812): sending "in parts."
('127.0.0.1', 54813): sending "in parts."
('127.0.0.1', 54812): received "IN PARTS."
('127.0.0.1', 54813): received "IN PARTS."

运行结果

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

在Python中调用poll

select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的I/O事件发生。fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。

eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRI和 POLLOUT的组合。如果缺省,默认会去检查所有的3种事件类型。

事件常量 意义

POLLIN 有数据读取

POLLPRT 有数据紧急读取

POLLOUT 准备输出:输出不会阻塞

POLLERR 某些错误情况出现

POLLHUP 挂起

POLLNVAL 无效请求:描述无法打开

poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的fd,会引起一个errno为ENOENT的IOError。

poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引起KeyError。

poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。

#coding: utf-8 

import select, socket

response = b"hello world"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

#
poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)

connections = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                connections[con.fileno()] = con
            else:
                con = connections[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = connections[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

在Python中调用epoll

select.epoll([sizehint=-1])返回一个epoll对象。

eventmask

事件常量 意义

EPOLLIN 读就绪

EPOLLOUT 写就绪

EPOLLPRI 有数据紧急读取

EPOLLERR assoc. fd有错误情况发生

EPOLLHUP assoc. fd发生挂起

EPOLLRT 设置边缘触发(ET)(默认的是水平触发)

EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用

EPOLLRDNORM 和 EPOLLIN 相等

EPOLLRDBAND 优先读取的数据带(data band)

EPOLLWRNORM 和 EPOLLOUT 相等

EPOLLWRBAND 优先写的数据带(data band)

EPOLLMSG 忽视

epoll.close()关闭epoll对象的文件描述符。

epoll.fileno返回control fd的文件描述符number。

epoll.fromfd(fd)用给予的fd来创建一个epoll对象。

epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError)

epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。

epoll.unregister(fd)注销一个文件描述符。

epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。

#coding:Utf8
import socket, select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
   connections = {}; requests = {}; responses = {}
   while True:
      events = epoll.poll(1)
      for fileno, event in events:
         if fileno == serversocket.fileno():
            connection, address = serversocket.accept()
            connection.setblocking(0)
            epoll.register(connection.fileno(), select.EPOLLIN)
            connections[connection.fileno()] = connection
            requests[connection.fileno()] = b''
            responses[connection.fileno()] = response
         elif event & select.EPOLLIN:
            requests[fileno] += connections[fileno].recv(1024)
            if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
               epoll.modify(fileno, select.EPOLLOUT)
               print('-'*40 + '\n' + requests[fileno].decode()[:-2])
         elif event & select.EPOLLOUT:
            byteswritten = connections[fileno].send(responses[fileno])
            responses[fileno] = responses[fileno][byteswritten:]
            if len(responses[fileno]) == 0:
               epoll.modify(fileno, 0)
               connections[fileno].shutdown(socket.SHUT_RDWR)
         elif event & select.EPOLLHUP:
            epoll.unregister(fileno)
            connections[fileno].close()
            del connections[fileno]
finally:
   epoll.unregister(serversocket.fileno())
   epoll.close()
   serversocket.close()


以上がPython select epoll Paul の解析の詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。