>  기사  >  백엔드 개발  >  Python의 Twisted 프레임워크에서 리액터 이벤트 관리자 사용법에 대한 자세한 설명

Python의 Twisted 프레임워크에서 리액터 이벤트 관리자 사용법에 대한 자세한 설명

高洛峰
高洛峰원래의
2017-02-03 16:28:243148검색

예표
실습에서 우리는 항상 비슷한 방식으로 비동기 프로그래밍을 사용하는 것 같습니다.

이벤트 수신

이벤트가 발생하면 해당 콜백 함수를 실행합니다.

콜백 완료(새 이벤트가 생성되어 청취 대기열에 추가될 수 있음)

1로 돌아가서 이벤트를 수신합니다.

그래서 이러한 비동기 모드를 Reactor 모드라고 부릅니다. iOS 개발의 예제 Run Loop의 개념은 실제로 Reactor 루프와 매우 유사합니다. 메인 스레드의 Run Loop는 화면 UI 이벤트를 모니터링하며, UI 이벤트가 발생하면 해당 이벤트 처리 코드도 실행될 수 있습니다. 실행을 위해 메인 스레드에 GCD 및 기타 메서드를 추가합니다.

Python의 Twisted 프레임워크에서 리액터 이벤트 관리자 사용법에 대한 자세한 설명

위 그림은 Boost의 Reactor 모드 묘사입니다. Twisted의 디자인은 이 Reactor 모드를 기반으로 합니다. 이벤트를 처리합니다.

from twisted.internet import reactor
reactor.run()

리액터는 Twisted 프로그램의 싱글톤 객체입니다.

리액터
리액터는 이벤트 등록 및 등록 취소, 이벤트 루프 실행, 이벤트 발생 시 콜백 함수 호출에 사용되는 이벤트 관리자입니다. 리액터에 대한 몇 가지 결론이 있습니다:

Twisted Reactor는 Reactor.run()을 호출해야만 시작할 수 있습니다.

리액터 루프는 시작하는 프로세스, 즉 메인 프로세스에서 실행됩니다.

한 번 시작하면 계속 실행됩니다. 리액터는 프로그램의 제어를 받습니다(또는 특히 이를 시작한 스레드의 제어를 받습니다).

리액터 루프는 CPU 리소스를 소비하지 않습니다.

리액터를 명시적으로 생성할 필요는 없고 import만 하면 됩니다.

마지막을 명확하게 설명해야 합니다. Twisted에서 리액터는 싱글턴(즉, 싱글턴 모드)입니다. 즉, 프로그램에는 리액터가 하나만 있을 수 있으며, 도입만 하면 그에 따라 하나가 생성됩니다. 위에서 소개한 방법은 Twisted가 사용하는 기본 방법입니다. 물론 Twisted에는 리액터를 도입할 수 있는 다른 방법도 있습니다. 예를 들어, select 메소드 대신 Twisted.internet.pollreactor의 시스템 호출을 사용하여 폴링할 수 있습니다.

다른 리액터를 사용하는 경우에는 Twisted.internet.reactor를 도입하기 전에 먼저 설치해야 합니다. pollreactor 설치 방법은 다음과 같습니다.

from twisted.internet import pollreactor
pollreactor.install()

다른 특수 리액터를 설치하지 않고 Twisted.internet.reactor를 도입하면 Twisted는 운영 체제에 따라 기본 리액터를 설치합니다. 이 때문에 기본 리액터 설치를 피하기 위해 최상위 모듈에는 리액터를 도입하지 않고, 리액터를 사용하고 싶은 부위에 설치하는 것이 관례다.
다음은 pollreactor를 사용하여 위 프로그램을 다시 작성한 것입니다.

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

그럼 리액터는 싱글톤을 어떻게 구현합니까? from Twisted.internet 가져오기 리액터가 수행하는 작업을 살펴보고 이해하게 될 것입니다.

다음은 Twisted/internet/reactor.py 코드의 일부입니다:

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

참고: Python에서 메모리에 로드된 모든 모듈은 sys.modules에 배치됩니다. , 이는 글로벌 사전입니다. 모듈을 가져올 때 먼저 이 목록에 모듈이 로드되었는지 확인합니다. 로드된 경우 가져오기를 호출하는 모듈의 네임스페이스에 모듈 이름을 추가합니다. 로드되지 않은 경우 sys.path 디렉터리에서 모듈 이름에 따라 모듈 파일을 검색한 후 모듈을 메모리에 로드하고 sys.modules에 추가한 후 현재 네임스페이스로 이름을 가져옵니다.

twisted.internet 가져오기 리액터에서 처음 실행하면 sys.modules에 Twisted.internet.reactor가 없기 때문에 Reactor.py의 코드가 실행되고 기본 리액터가 설치됩니다. . 이후 임포트하면 모듈이 이미 sys.modules에 존재하기 때문에 sys.modules에 있는 Twisted.internet.reactor를 현재 네임스페이스로 직접 임포트하게 됩니다.

기본적으로 설치:

# twisted/internet/default.py
def _getInstallFunction(platform):
  """
  Return a function to install the reactor most suited for the given platform.
 
  @param platform: The platform for which to select a reactor.
  @type platform: L{twisted.python.runtime.Platform}
 
  @return: A zero-argument callable which will install the selected
    reactor.
  """
  try:
    if platform.isLinux():
      try:
        from twisted.internet.epollreactor import install
      except ImportError:
        from twisted.internet.pollreactor import install
    elif platform.getType() == 'posix' and not platform.isMacOSX():
      from twisted.internet.pollreactor import install
    else:
      from twisted.internet.selectreactor import install
  except ImportError:
    from twisted.internet.selectreactor import install
  return install
 
 
install = _getInstallFunction(platform)

분명히 기본값은 플랫폼에 따라 해당 설치를 가져옵니다. Linux에서는 epollreactor가 먼저 사용됩니다. 커널이 이를 지원하지 않으면 pollreactor만 사용할 수 있습니다. Mac 플랫폼은 pollreactor를 사용하고 Windows는 selectreactor를 사용합니다. 각 설치의 구현은 유사합니다. 여기서는 selectreactor에서 설치를 추출하여 살펴보겠습니다.

# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  # 单例
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.
 
  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor

installReactor에서 sys.modules에 Twisted.internet.reactor 키를 추가하고, 값은 설치 시 생성된 싱글톤 리액터입니다. 나중에 리액터를 사용하려면 이 싱글톤을 가져옵니다.

SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer는 SelectReactor가 Python으로 인터페이스를 구현한 IReactorFDSet 인터페이스의 메소드를 구현했음을 나타냅니다. 봐.

IReactorFDSet 인터페이스는 주로 설명자에 대한 획득, 추가, 삭제 및 기타 작업을 위한 메서드를 제공합니다. 이 메소드들은 이름만 봐도 그 의미를 알 수 있기 때문에 코멘트를 추가하지 않았습니다. 예제의

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
 
  def addReader(reader):
 
  def addWriter(writer):
 
  def removeReader(reader):
 
  def removeWriter(writer):
 
  def removeAll():
 
  def getReaders():
 
  def getWriters():
reactor.listenTCP()

reactor.listenTCP()는 부모 클래스 PosixReactorBase의 메서드인 수신 이벤트를 등록합니다.

아아아아

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)
 
# twisted/internet/base.py
class _SignalReactorMixin(object):
 
  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
    _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)
 
  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()
 
  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration是关键,select,poll,epool实现各有不同
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

   

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
 
  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)
 
  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.
 
    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      # 调用select方法监控读写集合,返回准备好读写的描述符
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise
 
    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.
 
        # 调用_doReadOrWrite方法
        _logrun(selectable, _drdw, selectable, method)
 
  doIteration = doSelect
 
  def _doReadOrWrite(self, selectable, method):
    try:
      # 调用method,doRead或者是doWrite,
      # 这里的selectable可能是我们监听的tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

   

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
  def doRead(self):
    """Called when my socket is ready for reading.
    当套接字准备好读的时候调用
 
    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          # 调用accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise
 
        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
        # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

   

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):
 
  def doRead(self):
    try:
      # 接收数据
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST
 
    return self._dataReceived(data)
 
  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    # 调用我们自定义protocol的dataReceived方法处理数据
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

   

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

更多Python의 Twisted 프레임워크에서 리액터 이벤트 관리자 사용법에 대한 자세한 설명相关文章请关注PHP中文网!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.