Maison >développement back-end >Tutoriel Python >Explication détaillée de l'utilisation du gestionnaire d'événements de réacteur dans le framework Twisted de Python

Explication détaillée de l'utilisation du gestionnaire d'événements de réacteur dans le framework Twisted de Python

高洛峰
高洛峰original
2017-02-03 16:28:243250parcourir

Préfiguration
Dans beaucoup de pratique, il semble que nous utilisons toujours la programmation asynchrone de la même manière :

Écouter les événements

Exécuter la fonction de rappel correspondante lorsque l'événement se produit

Rappel terminé (de nouveaux événements peuvent être générés et ajoutés à la file d'attente d'écoute)

Retour à 1, écoute des événements

Nous appelons donc ce mode asynchrone mode Réacteur, pour exemple dans le développement iOS Le concept de Run Loop in est en fait très similaire à celui de la boucle Reactor. La boucle d'exécution du thread principal surveille les événements de l'interface utilisateur. Une fois qu'un événement de l'interface utilisateur se produit, le code de traitement d'événement correspondant peut également être généré via les événements. GCD et autres méthodes vers le thread principal pour exécution.

Explication détaillée de lutilisation du gestionnaire dévénements de réacteur dans le framework Twisted de Python

L'image ci-dessus est la représentation par boost du mode Reactor. La conception de Twisted est basée sur ce mode Reactor. Le programme Twisted boucle continuellement en attendant les événements et. traitement des événements.

from twisted.internet import reactor
reactor.run()

reactor est un objet singleton dans le programme Twisted.

reactor
reactor est un gestionnaire d'événements, utilisé pour enregistrer et désenregistrer des événements, exécuter des boucles d'événements et appeler des fonctions de rappel lorsque des événements se produisent. Il y a plusieurs conclusions sur le réacteur :

Le réacteur torsadé ne peut être démarré qu'en appelant le réacteur.run().

La boucle du réacteur fonctionne dans le processus à partir duquel elle démarre, c'est-à-dire qu'elle fonctionne dans le processus principal.

Une fois démarré, il continue de fonctionner. Le réacteur sera sous le contrôle du programme (ou spécifiquement sous le contrôle d'un thread qui l'a démarré).

la boucle du réacteur ne consomme aucune ressource CPU.

Il n'est pas nécessaire de créer explicitement un réacteur, il suffit de l'importer.

Le dernier doit être expliqué clairement. Dans Twisted, le réacteur est un Singleton (c'est-à-dire un mode singleton), c'est-à-dire qu'il ne peut y avoir qu'un seul réacteur dans un programme, et tant que vous l'introduisez, un sera créé en conséquence. La méthode présentée ci-dessus est la méthode par défaut utilisée par twisted. Bien entendu, twisted a d'autres méthodes qui peuvent introduire un réacteur. Par exemple, vous pouvez utiliser l'appel système dans twisted.internet.pollreactor pour interroger au lieu de la méthode select.

Si vous utilisez d'autres réacteurs, vous devez l'installer avant d'introduire twisted.internet.reactor. Voici comment installer pollreactor :

from twisted.internet import pollreactor
pollreactor.install()

Si vous n'installez pas d'autres réacteurs spéciaux et introduisez twisted.internet.reactor, alors Twisted installera le réacteur par défaut en fonction du système d'exploitation . De ce fait, il est d'usage de ne pas introduire le réacteur dans le module de niveau supérieur pour éviter d'installer le réacteur par défaut, mais de l'installer dans la zone où l'on souhaite utiliser le réacteur.
Ce qui suit est une réécriture du programme ci-dessus à l'aide de pollreactor :

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

Alors, comment le réacteur implémente-t-il un singleton ? Jetons un coup d'œil à ce que fait le réacteur d'importation twisted.internet et vous comprendrez.

Ce qui suit fait partie du code de twisted/internet/reactor.py :

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

Remarque : tous les modules chargés en mémoire en Python sont placés dans sys .modules , qui est un dictionnaire global. Lors de l'importation d'un module, il vérifiera d'abord si le module a été chargé dans cette liste. S'il est chargé, il ajoutera simplement le nom du module à l'espace de noms du module qui appelle l'importation. S'il n'est pas chargé, recherchez le fichier du module en fonction du nom du module dans le répertoire sys.path, après l'avoir trouvé, chargez le module en mémoire, ajoutez-le à sys.modules et importez le nom dans l'espace de noms actuel.

Si nous exécutons le réacteur d'importation twisted.internet pour la première fois, car il n'y a pas de twisted.internet.reactor dans sys.modules, le code dans réacteur.py sera exécuté et le réacteur par défaut sera installé . Après cela, s'il est importé, car le module existe déjà dans sys.modules, twisted.internet.reactor dans sys.modules sera directement importé dans l'espace de noms actuel.

installer par défaut :

# twisted/internet/default.py
def _getInstallFunction(platform):
  """
  Return a function to install the reactor most suited for the given platform.
 
  @param platform: The platform for which to select a reactor.
  @type platform: L{twisted.python.runtime.Platform}
 
  @return: A zero-argument callable which will install the selected
    reactor.
  """
  try:
    if platform.isLinux():
      try:
        from twisted.internet.epollreactor import install
      except ImportError:
        from twisted.internet.pollreactor import install
    elif platform.getType() == 'posix' and not platform.isMacOSX():
      from twisted.internet.pollreactor import install
    else:
      from twisted.internet.selectreactor import install
  except ImportError:
    from twisted.internet.selectreactor import install
  return install
 
 
install = _getInstallFunction(platform)

Évidemment, par défaut, l'installation correspondante sera obtenue en fonction de la plateforme. Sous Linux, epollreactor sera utilisé en premier. Si le noyau ne le supporte pas, pollreactor ne peut être utilisé. La plate-forme Mac utilise pollreactor et Windows utilise selectreactor. L'implémentation de chaque installation est similaire. Ici, nous extrayons l'installation dans selectreactor pour y jeter un œil.

# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  # 单例
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.
 
  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor

Dans installReactor, ajoutez la clé twisted.internet.reactor à sys.modules, et la valeur est le réacteur singleton créé lors de l'installation. Si vous souhaitez utiliser le réacteur à l'avenir, vous importerez ce singleton.

SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implémenteur signifie que SelectReactor implémente la méthode de l'interface IReactorFDSet zope.interface est utilisée ici, qui est l'implémentation de l'interface en python. je peux aller voir.

L'interface IReactorFDSet fournit principalement des méthodes d'obtention, d'ajout, de suppression et d'autres opérations sur les descripteurs. Vous pouvez connaître la signification de ces méthodes simplement en regardant leurs noms, je n’ai donc pas ajouté de commentaires.

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
 
  def addReader(reader):
 
  def addWriter(writer):
 
  def removeReader(reader):
 
  def removeWriter(writer):
 
  def removeAll():
 
  def getReaders():
 
  def getWriters():
reactor.listenTCP()

reactor.listenTCP() dans l'exemple enregistre un événement d'écoute, qui est une méthode de la classe parent PosixReactorBase.

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase):
 
  def listenTCP(self, port, factory, backlog=50, interface=''):
    p = tcp.Port(port, factory, backlog, interface, self)
    p.startListening()
    return p
 
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
  def __init__(self, port, factory, backlog=50, interface='', reactor=None):
    """Initialize with a numeric port to listen on.
    """
    base.BasePort.__init__(self, reactor=reactor)
    self.port = port
    self.factory = factory
    self.backlog = backlog
    if abstract.isIPv6Address(interface):
      self.addressFamily = socket.AF_INET6
      self._addressType = address.IPv6Address
    self.interface = interface
  ...
 
  def startListening(self):
    """Create and bind my socket, and begin listening on it.
     创建并绑定套接字,开始监听。
 
    This is called on unserialization, and must be called after creating a
    server to begin listening on the specified port.
    """
    if self._preexistingSocket is None:
      # Create a new socket and make it listen
      try:
        # 创建套接字
        skt = self.createInternetSocket()
        if self.addressFamily == socket.AF_INET6:
          addr = _resolveIPv6(self.interface, self.port)
        else:
          addr = (self.interface, self.port)
        # 绑定
        skt.bind(addr)
      except socket.error as le:
        raise CannotListenError(self.interface, self.port, le)
      # 监听
      skt.listen(self.backlog)
    else:
      # Re-use the externally specified socket
      skt = self._preexistingSocket
      self._preexistingSocket = None
      # Avoid shutting it down at the end.
      self._shouldShutdown = False
 
    # Make sure that if we listened on port 0, we update that to
    # reflect what the OS actually assigned us.
    self._realPortNumber = skt.getsockname()[1]
 
    log.msg("%s starting on %s" % (
        self._getLogPrefix(self.factory), self._realPortNumber))
 
    # The order of the next 5 lines is kind of bizarre. If no one
    # can explain it, perhaps we should re-arrange them.
    self.factory.doStart()
    self.connected = True
    self.socket = skt
    self.fileno = self.socket.fileno
    self.numberAccepts = 100
 
    # startReading调用reactor的addReader方法将Port加入读集合
    self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)
 
# twisted/internet/base.py
class _SignalReactorMixin(object):
 
  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
    _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)
 
  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()
 
  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration是关键,select,poll,epool实现各有不同
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

   

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
 
  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)
 
  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.
 
    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      # 调用select方法监控读写集合,返回准备好读写的描述符
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise
 
    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.
 
        # 调用_doReadOrWrite方法
        _logrun(selectable, _drdw, selectable, method)
 
  doIteration = doSelect
 
  def _doReadOrWrite(self, selectable, method):
    try:
      # 调用method,doRead或者是doWrite,
      # 这里的selectable可能是我们监听的tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

   

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
  def doRead(self):
    """Called when my socket is ready for reading.
    当套接字准备好读的时候调用
 
    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          # 调用accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise
 
        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
        # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

   

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):
 
  def doRead(self):
    try:
      # 接收数据
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST
 
    return self._dataReceived(data)
 
  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    # 调用我们自定义protocol的dataReceived方法处理数据
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

   

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

更多Explication détaillée de lutilisation du gestionnaire dévénements de réacteur dans le framework Twisted de Python相关文章请关注PHP中文网!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn