Home  >  Article  >  Backend Development  >  Detailed explanation of the usage of reactor event manager in Python's Twisted framework

Detailed explanation of the usage of reactor event manager in Python's Twisted framework

高洛峰
高洛峰Original
2017-02-03 16:28:243196browse

Preparation
In a lot of practice, it seems that we always use asynchronous programming in a similar way:

Listen for events

The event occurs and execute the corresponding callback function

Callback completed (new events may be generated and added to the listening queue)

Return to 1, listen for events

So we call this asynchronous mode Reactor mode, for example in iOS development The concept of Run Loop in is actually very similar to Reactor loop. The Run Loop of the main thread monitors screen UI events. Once a UI event occurs, the corresponding event processing code is executed. Events can also be generated through GCD and other methods to the main thread for execution.

Detailed explanation of the usage of reactor event manager in Pythons Twisted framework

The above picture is boost's depiction of the Reactor mode. The design of Twisted is based on this Reactor mode. The Twisted program continuously loops in the process of waiting for events and processing events.

from twisted.internet import reactor
reactor.run()

reactor is a singleton object in the Twisted program.

reactor
reactor is an event manager, used to register and unregister events, run event loops, and call callback functions when events occur. There are several conclusions about reactor:

Twisted reactor can only be started by calling reactor.run().

Thereactor loop runs in the process where it starts, that is, in the main process.

Once started, it will keep running. The reactor will be under the control of the program (or specifically under the control of a thread that started it).

reactor loop does not consume any CPU resources.

There is no need to explicitly create a reactor, just import it.

The last one needs to be explained clearly. In Twisted, reactor is a Singleton (that is, singleton mode), that is, there can only be one reactor in a program, and as long as you introduce it, one will be created accordingly. The method introduced above is the default method used by twisted. Of course, twisted has other methods that can introduce reactor. For example, you can use the system call in twisted.internet.pollreactor to poll instead of the select method.

If you use other reactors, you need to install it before introducing twisted.internet.reactor. Here is how to install pollreactor:

from twisted.internet import pollreactor
pollreactor.install()

If you have introduced twisted.internet.reactor without installing any other special reactor, Twisted will install the default reactor according to the operating system. Because of this, it is customary not to introduce reactor in the top-level module to avoid installing the default reactor, but to install it in the area where you want to use reactor.
The following is a rewrite of the above program using pollreactor:

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

So how does reactor implement a singleton? Let's take a look at what from twisted.internet import reactor does and you will understand.

The following is part of the code of twisted/internet/reactor.py:

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

Note: All modules loaded into memory in Python are placed in sys.modules, which is a global dictionary. When importing a module, it will first check whether the module has been loaded in this list. If it is loaded, the module name will only be added to the namespace of the module that is calling the import. If it is not loaded, search the module file according to the module name in the sys.path directory. After finding the module, load the module into memory, add it to sys.modules, and import the name into the current namespace.

If we run from twisted.internet import reactor for the first time, because there is no twisted.internet.reactor in sys.modules, the code in reactor.py will be run and the default reactor will be installed. After that, if imported, because the module already exists in sys.modules, twisted.internet.reactor in sys.modules will be directly imported into the current namespace.

Install in default:

# twisted/internet/default.py
def _getInstallFunction(platform):
  """
  Return a function to install the reactor most suited for the given platform.
 
  @param platform: The platform for which to select a reactor.
  @type platform: L{twisted.python.runtime.Platform}
 
  @return: A zero-argument callable which will install the selected
    reactor.
  """
  try:
    if platform.isLinux():
      try:
        from twisted.internet.epollreactor import install
      except ImportError:
        from twisted.internet.pollreactor import install
    elif platform.getType() == 'posix' and not platform.isMacOSX():
      from twisted.internet.pollreactor import install
    else:
      from twisted.internet.selectreactor import install
  except ImportError:
    from twisted.internet.selectreactor import install
  return install
 
 
install = _getInstallFunction(platform)

Obviously, default will get the corresponding install according to the platform. Under Linux, epollreactor will be used first. If the kernel does not support it, pollreactor can only be used. The Mac platform uses pollreactor, and windows uses selectreactor. The implementation of each install is similar. Here we extract the install in selectreactor to take a look.

# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  # 单例
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.
 
  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor

In installReactor, add the twisted.internet.reactor key to sys.modules, and the value is the singleton reactor created in install. If you want to use reactor in the future, you will import this singleton.

SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer means that SelectReactor implements the method of IReactorFDSet interface. zope.interface is used here, which is the interface implementation in python. Interested students can go Take a look.

The IReactorFDSet interface mainly provides methods for obtaining, adding, deleting and other operations on descriptors. You can know the meaning of these methods just by looking at their names, so I didn’t add comments.

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
 
  def addReader(reader):
 
  def addWriter(writer):
 
  def removeReader(reader):
 
  def removeWriter(writer):
 
  def removeAll():
 
  def getReaders():
 
  def getWriters():
reactor.listenTCP()

reactor.listenTCP() in the example registers a listening event, which is a method in the parent class PosixReactorBase.

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase):
 
  def listenTCP(self, port, factory, backlog=50, interface=''):
    p = tcp.Port(port, factory, backlog, interface, self)
    p.startListening()
    return p
 
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
  def __init__(self, port, factory, backlog=50, interface='', reactor=None):
    """Initialize with a numeric port to listen on.
    """
    base.BasePort.__init__(self, reactor=reactor)
    self.port = port
    self.factory = factory
    self.backlog = backlog
    if abstract.isIPv6Address(interface):
      self.addressFamily = socket.AF_INET6
      self._addressType = address.IPv6Address
    self.interface = interface
  ...
 
  def startListening(self):
    """Create and bind my socket, and begin listening on it.
     创建并绑定套接字,开始监听。
 
    This is called on unserialization, and must be called after creating a
    server to begin listening on the specified port.
    """
    if self._preexistingSocket is None:
      # Create a new socket and make it listen
      try:
        # 创建套接字
        skt = self.createInternetSocket()
        if self.addressFamily == socket.AF_INET6:
          addr = _resolveIPv6(self.interface, self.port)
        else:
          addr = (self.interface, self.port)
        # 绑定
        skt.bind(addr)
      except socket.error as le:
        raise CannotListenError(self.interface, self.port, le)
      # 监听
      skt.listen(self.backlog)
    else:
      # Re-use the externally specified socket
      skt = self._preexistingSocket
      self._preexistingSocket = None
      # Avoid shutting it down at the end.
      self._shouldShutdown = False
 
    # Make sure that if we listened on port 0, we update that to
    # reflect what the OS actually assigned us.
    self._realPortNumber = skt.getsockname()[1]
 
    log.msg("%s starting on %s" % (
        self._getLogPrefix(self.factory), self._realPortNumber))
 
    # The order of the next 5 lines is kind of bizarre. If no one
    # can explain it, perhaps we should re-arrange them.
    self.factory.doStart()
    self.connected = True
    self.socket = skt
    self.fileno = self.socket.fileno
    self.numberAccepts = 100
 
    # startReading调用reactor的addReader方法将Port加入读集合
    self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)
 
# twisted/internet/base.py
class _SignalReactorMixin(object):
 
  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
    _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)
 
  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()
 
  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration是关键,select,poll,epool实现各有不同
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

   

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
 
  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)
 
  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.
 
    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      # 调用select方法监控读写集合,返回准备好读写的描述符
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise
 
    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.
 
        # 调用_doReadOrWrite方法
        _logrun(selectable, _drdw, selectable, method)
 
  doIteration = doSelect
 
  def _doReadOrWrite(self, selectable, method):
    try:
      # 调用method,doRead或者是doWrite,
      # 这里的selectable可能是我们监听的tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

   

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
  def doRead(self):
    """Called when my socket is ready for reading.
    当套接字准备好读的时候调用
 
    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          # 调用accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise
 
        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
        # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

   

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):
 
  def doRead(self):
    try:
      # 接收数据
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST
 
    return self._dataReceived(data)
 
  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    # 调用我们自定义protocol的dataReceived方法处理数据
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

   

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

更多Detailed explanation of the usage of reactor event manager in Pythons Twisted framework相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn