Home >Backend Development >Python Tutorial >Detailed description of parsing python select epoll poll

Detailed description of parsing python select epoll poll

高洛峰
高洛峰Original
2017-03-15 13:20:012272browse

The difference between select, poll, and epoll

select

select first appeared in 4.2BSD in 1983. It monitors multiple file descriptions through a select() system call array of descriptors (everything in linux is a file, block device, socket connection, etc.), when select() returns, the ready file descriptors in the array will The flag bit is modified by the kernel (becomes ready) so that the process can obtain these file descriptors for subsequent read and write operations (select will continuously monitor how many file descriptors have changed in a certain directory of the network interface Enter the ready state [In the network interface, a 'file' will be created when a connection is made]. After it becomes the ready state, select can operate this file descriptor).

[Socketserver handles multiple requests through multi-threading. Each connection is assigned a thread to process, but select is a single process. The execution code of a process must be serial, but now it must be passed One process is used to achieve the concurrency effect. There is only one main thread under a process, which means that one thread is used to achieve the concurrency effect. Why use one process to achieve multiple concurrency instead of using multiple threads to achieve multiple concurrency?

==========Answer: Because it is more efficient to achieve multiple concurrencies in one process than in multi-threads, because there will be a lot of overhead in starting multi-threads, and the CPU requires Constantly check the status of each thread to determine whether which thread can execute. This is also stressful for the system. Using a single process can avoid this overhead and pressure on the system.

So how does a single process achieve multiple concurrencies? ? ?

========Answer: The producer and consumer model (asynchronous) is very cleverly used. The producer and consumer can achieve non-blocking. A socketserver receives multiple connections through select. Come here (the previous socket process can only receive one connection, and it will block when receiving a new connection, because the socket process needs to communicate with the client first, and the two are waiting for each other [the client sends a message, the service The client is receiving it, and the client is waiting to return.... The server is waiting to receive it...] It has been blocked. If there is another connection at this time, it will have to wait for the previous connection to be disconnected. This Only then can you connect in. -----------In other words, using basic socket to implement multi-process is blocking. In order to solve this problem, a thread is generated for each connection, which will not block. When there are too many threads, the overhead and pressure on the CPU are relatively large.) For a single socket, most of the time when it is blocked, it is waiting for IO operations (network operations are also IO operations). In order to avoid this situation, asynchronous ============= The client initiates a connection and registers a file handle on the server. The server will continuously poll the list of these file handles. The main process establishes a connection with the client without starting the thread. At this time, the main process interacts with the client, and other clients cannot connect to the main process. In order to realize that the main process can not only send and receive messages to the connected client, but also When establishing a connection with a new client, the polling becomes very fast (deadloop) to flush the list of file handles connected by the client. As long as the client sends a message, the server reads it. After the message, there is another list to receive the message returned to the client, and this list is constantly refreshed. After being refreshed, it is returned to the client. In this way, the communication with the client is completed, but the connection with the client is not yet available. interrupted, but then entered the next polling. 】

select Advantages

select is currently supported on almost all platforms and has good cross-platform functionality.

select Disadvantages

Every time select is called, the fd collection needs to be copied from user mode to kernel mode. This overhead will be very large when there are many fds

There is a maximum limit on the number of fds that a single process can monitor. The default is 1024 on Linux (this limit can be increased by modifying the macro definition or recompiling the kernel)

And because the selected fd is placed In the array, and the entire array must be traversed linearly every time. When there are many fds, the overhead is also very high

python select

The function that calls select is readable, writable, exceptional = select.select(rlist, wlist, xlist[, timeout]), the first three parameters are three lists respectively, the objects in the array are all waitable object: are all file descriptions of integers descriptor (file descriptor) or an object with a method fileno() that returns a file descriptor;

rlist: list waiting to be read

wlist: Waiting for write-ready list

errlist: Waiting for "exception" list

select method is used to monitor the file descriptor, if the file If the descriptor changes, the descriptor is obtained.

1. These three lists can be an empty list, but receiving three empty lists is system-dependent (acceptable on Linux, but not on windows).

2. When the descriptor in the rlist sequence is readable (accetp and read), the changed descriptor is obtained and added to the readable sequence

3. When the wlist sequence is When descriptors are included, all descriptors in the sequence are added to the writable sequence

4. When an error occurs in the handle in the errlist sequence, the error handle is added to the exception sequence

5. When the timeout is not set, select will block until the monitored descriptor changes.

When the timeout = 1, then if there is no change in the monitored handle, then select will block for 1 second, and then return three empty lists. If the monitored descriptor (fd) changes, it will be executed directly.

6. Ptython file objects (such as sys.stdin, or objects returned by open() and os.open()) can be accepted in the list, and the socket object will return socket.socket( ). You can also customize the class, as long as there is a suitable fileno() method (which needs to actually return a file descriptor, not a random integer).

select Example:

Python’s select() method directly calls the IO interface of the operating system, which monitors sockets, open files, and pipes (all with fileno() method's file handle) becomes readable and writeable, or a communication error occurs, select() makes it easy to monitor multiple connections at the same time, and it is more efficient than writing a long loop to wait for and monitor multiple client connections, because select Operate directly through the C network interface provided by the operating system, rather than through the Python interpreter

#coding:UTF8

import select
import socket
import sys
import Queue

#创建一个TCP/IP 进程
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#连接地址和端口
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s prot %s' % server_address
server.bind(server_address)

#最大允许链接数
server.listen(5)

inputs = [ server ]
outputs = []

message_queues = {}

while inputs:
    print >>sys.stderr,'\nwaiting for the next event'
    readable,writable,exceptional = select.select(inputs,outputs,inputs)

    # Handle inputs
    for s in readable:
     
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print >>sys.stderr, 'new connection from', client_address
            #connection.setblocking(0)
            inputs.append(connection)
     
            # Give the connection a queue for data we want to send
            message_queues[connection] = Queue.Queue()
        
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
                message_queues[s].put(data)  #这个s相当于connection
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)

            else:
                # Interpret empty result as closed connection
                print >>sys.stderr, 'closing', client_address, 'after reading no data'
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个连接关闭掉
                 
                # Remove message queue
                del message_queues[s]
    
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            # No messages waiting so stop checking for writability.
            print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
            outputs.remove(s)
        else:
            print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
            s.send(next_msg.upper())
    # Handle "exceptional conditions"
    for s in exceptional:
        print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
     
        # Remove message queue
        del message_queues[s]     

server

Code analysis:

select() method receives and monitors 3 A communication list, the first one is all the input data, which refers to the data sent from the outside, the second one is to monitor and receive all the data to be sent (outgoing data), the third one is to monitor Error message, next we need to create 2 lists to contain input and output information to pass to select().

# Sockets from which we expect to readinputs = [ server ]# Sockets to which we expect to writeoutputs = [ ]

All incoming connections and data from the client will be processed by the server's main loop program in the list above. Our current server needs to wait for the connection. It can only come after it is writable, and then receive the data and return it (so it does not return immediately after receiving the data), because each connection must first cache the input or output data into the queue. Then it is taken out by select and sent out.

# Outgoing message queues (socket:Queue)message_queues = {}

The main portion of the server program loops, call ing select() to block and wait for network activity.

The following is the main loop of this program. When select() is called, it will block and wait until new connections and data come in

while inputs : # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)

When you pass inputs, outputs, exceptional (shared here with inputs) to select(), it returns 3 new lists. We assigned them to readable, writable, exceptional, all above. The socket connection in the readable list represents data that can be received (recv). All socket connections in the writable list store the socket connections that you can send (send) to. When an error occurs in the connection communication, the error will be written to exceptional. List.

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

 

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

 

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

 

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

 

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉

#coding:UTF8

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10003)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

client

客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互,通过循环通过每个socket连接给server发送和接收数据。

server:
starting up on localhost prot 10000

waiting for the next event
new connection from ('127.0.0.1', 54812)

waiting for the next event
new connection from ('127.0.0.1', 54813)
received "This is the message. " from ('127.0.0.1', 54812)

waiting for the next event
received "This is the message. " from ('127.0.0.1', 54813)
sending "This is the message. " to ('127.0.0.1', 54812)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
sending "This is the message. " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "It will be sent " from ('127.0.0.1', 54812)
received "It will be sent " from ('127.0.0.1', 54813)

waiting for the next event
sending "It will be sent " to ('127.0.0.1', 54812)
sending "It will be sent " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "in parts." from ('127.0.0.1', 54812)
received "in parts." from ('127.0.0.1', 54813)

waiting for the next event
sending "in parts." to ('127.0.0.1', 54812)
sending "in parts." to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
closing ('127.0.0.1', 54813) after reading no data
closing ('127.0.0.1', 54813) after reading no data

waiting for the next event





client:
connecting to localhost port 10000
('127.0.0.1', 54812): sending "This is the message. "
('127.0.0.1', 54813): sending "This is the message. "
('127.0.0.1', 54812): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54813): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54812): sending "It will be sent "
('127.0.0.1', 54813): sending "It will be sent "
('127.0.0.1', 54812): received "IT WILL BE SENT "
('127.0.0.1', 54813): received "IT WILL BE SENT "
('127.0.0.1', 54812): sending "in parts."
('127.0.0.1', 54813): sending "in parts."
('127.0.0.1', 54812): received "IN PARTS."
('127.0.0.1', 54813): received "IN PARTS."

运行结果

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

在Python中调用poll

select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的I/O事件发生。fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。

eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRI和 POLLOUT的组合。如果缺省,默认会去检查所有的3种事件类型。

事件常量 意义

POLLIN 有数据读取

POLLPRT 有数据紧急读取

POLLOUT 准备输出:输出不会阻塞

POLLERR 某些错误情况出现

POLLHUP 挂起

POLLNVAL 无效请求:描述无法打开

poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的fd,会引起一个errno为ENOENT的IOError。

poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引起KeyError。

poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。

#coding: utf-8 

import select, socket

response = b"hello world"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

#
poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)

connections = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                connections[con.fileno()] = con
            else:
                con = connections[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = connections[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

在Python中调用epoll

select.epoll([sizehint=-1])返回一个epoll对象。

eventmask

事件常量 意义

EPOLLIN 读就绪

EPOLLOUT 写就绪

EPOLLPRI 有数据紧急读取

EPOLLERR assoc. fd有错误情况发生

EPOLLHUP assoc. fd发生挂起

EPOLLRT 设置边缘触发(ET)(默认的是水平触发)

EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用

EPOLLRDNORM 和 EPOLLIN 相等

EPOLLRDBAND 优先读取的数据带(data band)

EPOLLWRNORM 和 EPOLLOUT 相等

EPOLLWRBAND 优先写的数据带(data band)

EPOLLMSG 忽视

epoll.close()关闭epoll对象的文件描述符。

epoll.fileno返回control fd的文件描述符number。

epoll.fromfd(fd)用给予的fd来创建一个epoll对象。

epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError)

epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。

epoll.unregister(fd)注销一个文件描述符。

epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。

#coding:Utf8
import socket, select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
   connections = {}; requests = {}; responses = {}
   while True:
      events = epoll.poll(1)
      for fileno, event in events:
         if fileno == serversocket.fileno():
            connection, address = serversocket.accept()
            connection.setblocking(0)
            epoll.register(connection.fileno(), select.EPOLLIN)
            connections[connection.fileno()] = connection
            requests[connection.fileno()] = b''
            responses[connection.fileno()] = response
         elif event & select.EPOLLIN:
            requests[fileno] += connections[fileno].recv(1024)
            if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
               epoll.modify(fileno, select.EPOLLOUT)
               print('-'*40 + '\n' + requests[fileno].decode()[:-2])
         elif event & select.EPOLLOUT:
            byteswritten = connections[fileno].send(responses[fileno])
            responses[fileno] = responses[fileno][byteswritten:]
            if len(responses[fileno]) == 0:
               epoll.modify(fileno, 0)
               connections[fileno].shutdown(socket.SHUT_RDWR)
         elif event & select.EPOLLHUP:
            epoll.unregister(fileno)
            connections[fileno].close()
            del connections[fileno]
finally:
   epoll.unregister(serversocket.fileno())
   epoll.close()
   serversocket.close()


The above is the detailed content of Detailed description of parsing python select epoll poll. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn