Maison  >  Article  >  développement back-end  >  Description détaillée de l'analyse du sondage Python Select epoll

Description détaillée de l'analyse du sondage Python Select epoll

高洛峰
高洛峰original
2017-03-15 13:20:012214parcourir

La différence entre select, poll et epoll

select

select est apparu pour la première fois dans 4.2BSD en 1983. Il surveille plusieurs descriptions de fichiers via un appel système select() array de symboles (tout dans linux est un fichier, un périphérique de bloc, une connexion socket, etc.), lorsque select() revient, les descripteurs de fichiers prêts dans le tableau le seront. Le bit d'indicateur est modifié par le noyau (devient prêt) afin que le processus puisse obtenir ces descripteurs de fichiers pour les opérations de lecture et d'écriture ultérieures (select surveillera en permanence le nombre de descripteurs de fichiers qui ont changé dans un répertoire du réseau interface Devenez prêt état [Dans l'interface réseau, une connexion créera un « fichier »] Après être devenu prêt, sélectionnez peut utiliser ce descripteur de fichier).

[Socketserver gère plusieurs requêtes via multi-threading. Chaque connexion se voit attribuer un thread à traiter, mais select est un processus unique. Le code d'exécution d'un processus doit être en série, mais il doit maintenant être transmis à un seul processus. est utilisé pour obtenir l'effet de concurrence. Il n'y a qu'un seul thread principal sous un processus, ce qui signifie qu'un seul thread est utilisé pour obtenir l'effet de concurrence. Pourquoi utiliser un seul processus pour obtenir plusieurs accès simultanés au lieu d'utiliser plusieurs threads pour obtenir plusieurs accès simultanés ?

==========Réponse : Parce qu'il est plus efficace d'obtenir plusieurs simultanéités dans un processus que dans plusieurs threads, car il y a beaucoup de surcharge lors du démarrage de plusieurs threads, et le processeur nécessite Vérifiez constamment l'état de chaque thread pour déterminer si quel thread peut s'exécuter. Cela est également stressant pour le système. L'utilisation d'un seul processus peut éviter cette surcharge et cette pression sur le système

Alors, comment un seul processus permet-il d'obtenir plusieurs accès simultanés ? ? ?

========Réponse : Le modèle producteur et consommateur (asynchrone) est utilisé de manière très intelligente. Le producteur et le consommateur peuvent obtenir un non-blocage. Un serveur de sockets reçoit plusieurs connexions via select. le processus socket précédent ne peut recevoir qu'une seule connexion, et il se bloquera lors de la réception d'une nouvelle connexion, car le processus socket doit d'abord communiquer avec le client, et les deux s'attendent [le client envoie un message, le service Le le client le reçoit, et le client attend de revenir.... Le serveur attend de le recevoir...] Il a été bloqué s'il y a une autre connexion à ce moment, il devra attendre la connexion précédente. pour être déconnecté. Ce n'est qu'alors que vous pourrez vous connecter. -----------En d'autres termes, utiliser le socket de base pour implémenter le multi-processus est bloquant. Afin de résoudre ce problème, un thread est généré pour. chaque connexion, qui ne bloquera pas. Lorsqu'il y a trop de threads, la surcharge et la pression sur le CPU sont relativement importantes.) Pour un seul socket, la plupart du temps lorsqu'il est bloqué, il attend les opérations d'E/S (opérations réseau). sont également des opérations IO). Afin d'éviter cette situation, asynchrone ============= Le client initie une connexion et enregistre un descripteur de fichier sur le serveur. Le serveur interrogera en permanence la liste de ces descripteurs de fichiers. établit une connexion avec le client sans démarrer le thread. À ce stade, le processus principal interagit avec le client et les autres clients ne peuvent pas se connecter au processus principal afin de réaliser que le processus principal peut non seulement envoyer et recevoir des messages au client. client connecté, mais aussi Lors de l'établissement d'une connexion avec un nouveau client, le polling devient très rapide (mort boucle ) pour rafraîchir la liste des descripteurs de fichiers connectés par le client. Tant que le client envoie un message, le serveur le lit. Après le message, il y a une autre liste pour recevoir le message renvoyé au client, et cette liste est constamment actualisée, elle est renvoyée au client. terminé, mais la connexion avec le client n'est pas encore disponible, interrompue, puis entrée dans l'interrogation suivante. 】

select Avantages

select est actuellement pris en charge sur presque toutes les plateformes et possède de bonnes fonctionnalités multiplateformes.

select Inconvénients

Chaque fois que vous appelez select, vous devez copier la collection fd du mode utilisateur vers le mode noyau. Cette surcharge sera très importante lorsqu'il y en a plusieurs. fds

Il existe une limite maximale sur le nombre de fds qu'un seul processus peut surveiller. La valeur par défaut est de 1024 sous Linux (cette limite peut être augmentée en modifiant la définition de la macro ou en recompilant le noyau)

Et parce que le fd sélectionné est placé dans le tableau et que le tableau entier doit être parcouru linéairement à chaque fois, lorsqu'il y a beaucoup de fd, la surcharge est également très importante

python select

L'appel de la fonction de select est lisible, inscriptible, exceptional = select.select(rlist, wlist, xlist[, timeout]), les trois premiers paramètres sont respectivement trois listes, et les objets du tableau sont tous waitable objet : sont toutes des descriptions de fichiers de entiers descripteur (fichier descripteur) ou un objet avec une méthode fileno() qui renvoie un descripteur de fichier

rlist : liste en attente d'être lue prête

wlist : Liste en attente d'écriture prête

errlist : Liste d'attente "exception"

la méthode de sélection est utilisée pour surveiller le descripteur de fichier, si le fichier Si le descripteur change, le descripteur est obtenu.

1. Ces trois listes peuvent être une liste vide, mais la réception de trois listes vides dépend du système (acceptable sous Linux, mais pas sous Windows).

2. Lorsque le descripteur dans la séquence rlist est lisible (accepter et lire), le descripteur modifié est obtenu et ajouté à la séquence lisible

3. inclus, tous les descripteurs de la séquence sont ajoutés à la séquence inscriptible

4. Lorsqu'une erreur se produit dans le handle de la séquence errlist, le handle d'erreur est ajouté à la séquence d'exception

5. Lorsque le délai d'attente n'est pas défini, select bloquera jusqu'à ce que le descripteur surveillé change

Lorsque le délai d'attente = 1, alors s'il n'y a aucun changement dans le handle surveillé, alors select bloquera pendant 1 seconde, puis renverra trois listes vides. Si le descripteur surveillé (fd) change, il sera exécuté directement.

6. Les objets de fichier Ptython (tels que sys.stdin ou les objets renvoyés par open() et os.open()) peuvent être acceptés dans la liste, et l'objet socket renverra socket.socket( ) . Vous pouvez également personnaliser la classe, à condition qu'il existe une méthode fileno() appropriée (qui doit réellement renvoyer un descripteur de fichier, pas un entier aléatoire).

select Exemple :

La méthode select() de Python appelle directement l'interface IO du système d'exploitation, qui surveille les sockets, les fichiers ouverts et les tuyaux (le tout avec fileno( ) le handle de fichier de la méthode) devient lisible et inscriptible, ou une erreur de communication se produit, select() facilite la surveillance de plusieurs connexions en même temps, et c'est plus efficace que d'écrire une longue boucle pour attendre et surveiller plusieurs connexions client, car sélectionnez Opérer directement via l'interface réseau C fournie par le système d'exploitation, plutôt que via l'interpréteur Python

#coding:UTF8

import select
import socket
import sys
import Queue

#创建一个TCP/IP 进程
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#连接地址和端口
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s prot %s' % server_address
server.bind(server_address)

#最大允许链接数
server.listen(5)

inputs = [ server ]
outputs = []

message_queues = {}

while inputs:
    print >>sys.stderr,'\nwaiting for the next event'
    readable,writable,exceptional = select.select(inputs,outputs,inputs)

    # Handle inputs
    for s in readable:
     
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print >>sys.stderr, 'new connection from', client_address
            #connection.setblocking(0)
            inputs.append(connection)
     
            # Give the connection a queue for data we want to send
            message_queues[connection] = Queue.Queue()
        
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
                message_queues[s].put(data)  #这个s相当于connection
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)

            else:
                # Interpret empty result as closed connection
                print >>sys.stderr, 'closing', client_address, 'after reading no data'
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个连接关闭掉
                 
                # Remove message queue
                del message_queues[s]
    
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            # No messages waiting so stop checking for writability.
            print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
            outputs.remove(s)
        else:
            print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
            s.send(next_msg.upper())
    # Handle "exceptional conditions"
    for s in exceptional:
        print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
     
        # Remove message queue
        del message_queues[s]     

server

Analyse du code :

la méthode select() reçoit et Surveiller 3 listes de communication. La première concerne toutes les données d'entrée, qui font référence aux données envoyées de l'extérieur. La seconde consiste à surveiller et à recevoir toutes les données à envoyer (données sortantes). .Informations, nous devons ensuite créer 2 listes contenant les informations d'entrée et de sortie à transmettre à select().

# Sockets à partir desquels nous nous attendons à lire les entrées = [server]# Sockets auxquels nous nous attendons. to writeoutputs = [ ]


Toutes les connexions entrantes et les données du client seront traitées par le programme de boucle principal du serveur dans la liste ci-dessus. Nos besoins actuels du serveur Attendent la connexion. être accessible en écriture avant de pouvoir arriver, puis recevoir les données et revenir (donc il ne revient pas immédiatement après avoir reçu les données), car chaque connexion doit d'abord

mettre en cache les données d'entrée ou de sortie dans la file d'attente à l'intérieur, puis retirez-le par sélection et envoyez-le.

# Files d'attente de messages sortants (socket:Queue)message_queues = {}


La partie

principale du programme serveur boucle, ctouting select() pour bloquer et attendre l'activité du réseau.

Ce qui suit est la boucle principale de ce programme. Lorsque select() est appelé, il bloquera et attendra que de nouvelles connexions et données arrivent

while inputs : # Attendre qu'au moins une des sockets soit prête pour le traitement print >>sys.stderr, 'en attente du prochain événement' lisible, inscriptible, exceptionnel = select.select(inputs, outputs , entrées)


Lorsque vous transmettez des entrées, des sorties, exceptionnelles (partagées ici avec des entrées) à select(), il renvoie 3 nouvelles listes Nous les avons assignées à lisible, inscriptible, exceptionnelle, toutes ci-dessus. La connexion socket dans la liste lisible représente les données qui peuvent être reçues (recv). Toutes les connexions socket dans la liste inscriptible stockent les connexions socket auxquelles vous pouvez envoyer (envoyer) Lorsqu'une erreur se produit dans la communication de connexion, l'erreur sera. écrit à exceptionnel dans la liste.

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

 

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

 

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

 

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

 

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉

#coding:UTF8

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10003)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

client

客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互,通过循环通过每个socket连接给server发送和接收数据。

server:
starting up on localhost prot 10000

waiting for the next event
new connection from ('127.0.0.1', 54812)

waiting for the next event
new connection from ('127.0.0.1', 54813)
received "This is the message. " from ('127.0.0.1', 54812)

waiting for the next event
received "This is the message. " from ('127.0.0.1', 54813)
sending "This is the message. " to ('127.0.0.1', 54812)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
sending "This is the message. " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "It will be sent " from ('127.0.0.1', 54812)
received "It will be sent " from ('127.0.0.1', 54813)

waiting for the next event
sending "It will be sent " to ('127.0.0.1', 54812)
sending "It will be sent " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "in parts." from ('127.0.0.1', 54812)
received "in parts." from ('127.0.0.1', 54813)

waiting for the next event
sending "in parts." to ('127.0.0.1', 54812)
sending "in parts." to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
closing ('127.0.0.1', 54813) after reading no data
closing ('127.0.0.1', 54813) after reading no data

waiting for the next event





client:
connecting to localhost port 10000
('127.0.0.1', 54812): sending "This is the message. "
('127.0.0.1', 54813): sending "This is the message. "
('127.0.0.1', 54812): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54813): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54812): sending "It will be sent "
('127.0.0.1', 54813): sending "It will be sent "
('127.0.0.1', 54812): received "IT WILL BE SENT "
('127.0.0.1', 54813): received "IT WILL BE SENT "
('127.0.0.1', 54812): sending "in parts."
('127.0.0.1', 54813): sending "in parts."
('127.0.0.1', 54812): received "IN PARTS."
('127.0.0.1', 54813): received "IN PARTS."

运行结果

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

在Python中调用poll

select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的I/O事件发生。fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。

eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRI和 POLLOUT的组合。如果缺省,默认会去检查所有的3种事件类型。

事件常量 意义

POLLIN 有数据读取

POLLPRT 有数据紧急读取

POLLOUT 准备输出:输出不会阻塞

POLLERR 某些错误情况出现

POLLHUP 挂起

POLLNVAL 无效请求:描述无法打开

poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的fd,会引起一个errno为ENOENT的IOError。

poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引起KeyError。

poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。

#coding: utf-8 

import select, socket

response = b"hello world"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

#
poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)

connections = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                connections[con.fileno()] = con
            else:
                con = connections[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = connections[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

在Python中调用epoll

select.epoll([sizehint=-1])返回一个epoll对象。

eventmask

事件常量 意义

EPOLLIN 读就绪

EPOLLOUT 写就绪

EPOLLPRI 有数据紧急读取

EPOLLERR assoc. fd有错误情况发生

EPOLLHUP assoc. fd发生挂起

EPOLLRT 设置边缘触发(ET)(默认的是水平触发)

EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用

EPOLLRDNORM 和 EPOLLIN 相等

EPOLLRDBAND 优先读取的数据带(data band)

EPOLLWRNORM 和 EPOLLOUT 相等

EPOLLWRBAND 优先写的数据带(data band)

EPOLLMSG 忽视

epoll.close()关闭epoll对象的文件描述符。

epoll.fileno返回control fd的文件描述符number。

epoll.fromfd(fd)用给予的fd来创建一个epoll对象。

epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError)

epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。

epoll.unregister(fd)注销一个文件描述符。

epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。

#coding:Utf8
import socket, select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
   connections = {}; requests = {}; responses = {}
   while True:
      events = epoll.poll(1)
      for fileno, event in events:
         if fileno == serversocket.fileno():
            connection, address = serversocket.accept()
            connection.setblocking(0)
            epoll.register(connection.fileno(), select.EPOLLIN)
            connections[connection.fileno()] = connection
            requests[connection.fileno()] = b''
            responses[connection.fileno()] = response
         elif event & select.EPOLLIN:
            requests[fileno] += connections[fileno].recv(1024)
            if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
               epoll.modify(fileno, select.EPOLLOUT)
               print('-'*40 + '\n' + requests[fileno].decode()[:-2])
         elif event & select.EPOLLOUT:
            byteswritten = connections[fileno].send(responses[fileno])
            responses[fileno] = responses[fileno][byteswritten:]
            if len(responses[fileno]) == 0:
               epoll.modify(fileno, 0)
               connections[fileno].shutdown(socket.SHUT_RDWR)
         elif event & select.EPOLLHUP:
            epoll.unregister(fileno)
            connections[fileno].close()
            del connections[fileno]
finally:
   epoll.unregister(serversocket.fileno())
   epoll.close()
   serversocket.close()


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn