伏線
多くの実践では、常に同じような方法で非同期プログラミングを使用しているようです:
イベントをリッスンする
イベントが発生したときに対応するコールバック関数を実行する
コールバックが完了する(新しいイベントが生成される可能性がある)
1 に戻り、イベントをリッスンします
そこで、iOS 開発における Run Loop の概念など、このような非同期モードを Reactor モードと呼びます。これは、実際には Reactor ループに非常に似ています。メイン スレッドのループは、画面 UI イベントをリッスンします。UI イベントが発生すると、対応するイベント処理コードが GCD およびその他のメソッドを通じて実行のために生成されます。
上の写真は、Boost による Reactor モードの図です。Twisted の設計は、この Reactor モードに基づいており、イベントの待機と処理のプロセスで継続的にループします。
from twisted.internet import reactor reactor.run()
reactor は、Twisted プログラムのシングルトン オブジェクトです。
reactor
reactor はイベント マネージャーであり、イベントの登録と登録解除、イベント ループの実行、イベント発生時のコールバック関数の呼び出しに使用されます。リアクターについては、いくつかの結論があります:
Twisted リアクターは、reactor.run() を呼び出すことによってのみ開始できます。
リアクター ループは、開始プロセスで実行されます。つまり、メイン プロセスで実行されます。
一度開始すると、実行し続けます。リアクターはプログラムの制御下にあります (または、具体的にはそれを開始したスレッドの制御下にあります)。
reactor ループは CPU リソースを消費しません。
リアクターを明示的に作成する必要はなく、インポートするだけで済みます。
最後の点は明確に説明する必要があります。 Twisted では、リアクターはシングルトン (つまり、シングルトン モード) です。つまり、プログラム内にリアクターは 1 つだけ存在でき、それを導入する限り、それに応じてリアクターが 1 つ作成されます。上記で紹介したメソッドはTwistedで使用されるデフォルトのメソッドです。もちろん、Twistedには他にもreactorを導入できるメソッドがあります。たとえば、select メソッドの代わりに、Twisted.internet.pollreactor のシステム コールを使用してポーリングできます。
他のリアクターを使用する場合は、Twisted.internet.reactor を導入する前にインストールする必要があります。 pollreactor をインストールする方法は次のとおりです:
from twisted.internet import pollreactor pollreactor.install()
他の特別なリアクターをインストールせずにTwisted.internet.reactorを導入した場合、Twistedはオペレーティングシステムに従ってデフォルトのリアクターをインストールします。このため、デフォルトのリアクターのインストールを避けるために最上位モジュールにリアクターを導入せず、リアクターを使用したい領域にリアクターをインストールするのが一般的です。
以下は、polreactor を使用して上記のプログラムを書き直したものです:
from twited.internet import pollreactor pollreactor.install() from twisted.internet import reactor reactor.run()
それでは、reactor はどのようにシングルトンを実装するのでしょうか? fromTwisted.internet import リアクターが何をするのか見てみましょう。そうすれば理解できるでしょう。
以下は、Twisted/internet/reactor.py のコードの一部です:
# twisted/internet/reactor.py import sys del sys.modules['twisted.internet.reactor'] from twisted.internet import default default.install()
注: Python でメモリにロードされるすべてのモジュールは、グローバル ディクショナリである sys.modules に配置されます。モジュールをインポートするとき、モジュールがこのリストにロードされているかどうかが最初にチェックされ、ロードされている場合、モジュール名はインポートを呼び出しているモジュールの名前空間にのみ追加されます。ロードされていない場合は、sys.path ディレクトリ内のモジュール名に基づいてモジュール ファイルを検索し、モジュールを見つけたら、そのモジュールをメモリにロードし、sys.modules に追加し、その名前を現在の名前空間にインポートします。
初めてTwisted.internetインポートリアクターから実行する場合、sys.modulesにTwisted.internet.reactorがないため、reactor.py内のコードが実行され、デフォルトのリアクターがインストールされます。その後、インポートすると、sys.modules にモジュールが既に存在するため、sys.modules 内のTwisted.internet.reactor が現在の名前空間に直接インポートされます。
デフォルトでインストール:
# twisted/internet/default.py def _getInstallFunction(platform): """ Return a function to install the reactor most suited for the given platform. @param platform: The platform for which to select a reactor. @type platform: L{twisted.python.runtime.Platform} @return: A zero-argument callable which will install the selected reactor. """ try: if platform.isLinux(): try: from twisted.internet.epollreactor import install except ImportError: from twisted.internet.pollreactor import install elif platform.getType() == 'posix' and not platform.isMacOSX(): from twisted.internet.pollreactor import install else: from twisted.internet.selectreactor import install except ImportError: from twisted.internet.selectreactor import install return install install = _getInstallFunction(platform)
明らかに、デフォルトではプラットフォームに応じて対応するインストールが取得されます。 Linux では、カーネルが epollreactor をサポートしていない場合、最初に epollreactor が使用されます。 Mac プラットフォームは Paulreactor を使用し、Windows は selectreactor を使用します。各インストールの実装は似ています。ここでは、selectreactor でインストールを抽出して確認します。
# twisted/internet/selectreactor.py: def install(): """Configure the twisted mainloop to be run using the select() reactor. """ # 单例 reactor = SelectReactor() from twisted.internet.main import installReactor installReactor(reactor) # twisted/internet/main.py: def installReactor(reactor): """ Install reactor C{reactor}. @param reactor: An object that provides one or more IReactor* interfaces. """ # this stuff should be common to all reactors. import twisted.internet import sys if 'twisted.internet.reactor' in sys.modules: raise error.ReactorAlreadyInstalledError("reactor already installed") twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor
installReactor で、Twisted.internet.reactor キーを sys.modules に追加します。その値は、インストールで作成されたシングルトン リアクターです。将来的にreactorを使用したい場合は、このシングルトンをインポートします。
SelectReactor # twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase)
implementer は、SelectReactor が IReactorFDSet インターフェースのメソッドを実装することを意味します。これは、Python でのインターフェース実装です。興味のある学生は参照してください。
IReactorFDSet インターフェイスは主に、記述子の取得、追加、削除、その他の操作のためのメソッドを提供します。これらのメソッドの意味は名前を見れば分かるのでコメントは省略しました。この例の
# twisted/internet/interfaces.py class IReactorFDSet(Interface): def addReader(reader): def addWriter(writer): def removeReader(reader): def removeWriter(writer): def removeAll(): def getReaders(): def getWriters(): reactor.listenTCP()
reactor.listenTCP() は、親クラス PosixReactorBase のメソッドである listen イベントを登録します。
りー
整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。
reacotr.run()事件主循环
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase) # twisted/internet/base.py class _SignalReactorMixin(object): def startRunning(self, installSignalHandlers=True): """ PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是 _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。 """ self._installSignalHandlers = installSignalHandlers ReactorBase.startRunning(self) def run(self, installSignalHandlers=True): self.startRunning(installSignalHandlers=installSignalHandlers) self.mainLoop() def mainLoop(self): while self._started: try: while self._started: # Advance simulation time in delayed event # processors. self.runUntilCurrent() t2 = self.timeout() t = self.running and t2 # doIteration是关键,select,poll,epool实现各有不同 self.doIteration(t) except: log.msg("Unexpected error in main loop.") log.err() else: log.msg('Main loop terminated.')
mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。
# twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase): def __init__(self): """ Initialize file descriptor tracking dictionaries and the base class. """ self._reads = set() self._writes = set() posixbase.PosixReactorBase.__init__(self) def doSelect(self, timeout): """ Run one iteration of the I/O monitor loop. This will run all selectables who had input or output readiness waiting for them. """ try: # 调用select方法监控读写集合,返回准备好读写的描述符 r, w, ignored = _select(self._reads, self._writes, [], timeout) except ValueError: # Possibly a file descriptor has gone negative? self._preenDescriptors() return except TypeError: # Something *totally* invalid (object w/o fileno, non-integral # result) was passed log.err() self._preenDescriptors() return except (select.error, socket.error, IOError) as se: # select(2) encountered an error, perhaps while calling the fileno() # method of a socket. (Python 2.6 socket.error is an IOError # subclass, but on Python 2.5 and earlier it is not.) if se.args[0] in (0, 2): # windows does this if it got an empty list if (not self._reads) and (not self._writes): return else: raise elif se.args[0] == EINTR: return elif se.args[0] == EBADF: self._preenDescriptors() return else: # OK, I really don't know what's going on. Blow up. raise _drdw = self._doReadOrWrite _logrun = log.callWithLogger for selectables, method, fdset in ((r, "doRead", self._reads), (w,"doWrite", self._writes)): for selectable in selectables: # if this was disconnected in another thread, kill it. # ^^^^ --- what the !@#*? serious! -exarkun if selectable not in fdset: continue # This for pausing input when we're not ready for more. # 调用_doReadOrWrite方法 _logrun(selectable, _drdw, selectable, method) doIteration = doSelect def _doReadOrWrite(self, selectable, method): try: # 调用method,doRead或者是doWrite, # 这里的selectable可能是我们监听的tcp.Port why = getattr(selectable, method)() except: why = sys.exc_info()[1] log.err() if why: self._disconnectSelectable(selectable, why, method=="doRead")
那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def doRead(self): """Called when my socket is ready for reading. 当套接字准备好读的时候调用 This accepts a connection and calls self.protocol() to handle the wire-level protocol. """ try: if platformType == "posix": numAccepts = self.numberAccepts else: numAccepts = 1 for i in range(numAccepts): if self.disconnecting: return try: # 调用accept skt, addr = self.socket.accept() except socket.error as e: if e.args[0] in (EWOULDBLOCK, EAGAIN): self.numberAccepts = i break elif e.args[0] == EPERM: continue elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED): log.msg("Could not accept new connection (%s)" % ( errorcode[e.args[0]],)) break raise fdesc._setCloseOnExec(skt.fileno()) protocol = self.factory.buildProtocol(self._buildAddr(addr)) if protocol is None: skt.close() continue s = self.sessionno self.sessionno = s+1 # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备 # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了 transport = self.transport(skt, protocol, addr, self, s, self.reactor) protocol.makeConnection(transport) else: self.numberAccepts = self.numberAccepts+20 except: log.deferr()
doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。
Connection是Server(transport实例的类)的父类,它实现了doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle) class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser, _AbortingMixin): def doRead(self): try: # 接收数据 data = self.socket.recv(self.bufferSize) except socket.error as se: if se.args[0] == EWOULDBLOCK: return else: return main.CONNECTION_LOST return self._dataReceived(data) def _dataReceived(self, data): if not data: return main.CONNECTION_DONE # 调用我们自定义protocol的dataReceived方法处理数据 rval = self.protocol.dataReceived(data) if rval is not None: offender = self.protocol.dataReceived warningFormat = ( 'Returning a value other than None from %(fqpn)s is ' 'deprecated since %(version)s.') warningString = deprecate.getDeprecationWarningString( offender, versions.Version('Twisted', 11, 0, 0), format=warningFormat) deprecate.warnAboutFunction(offender, warningString) return rval
_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。
至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。
更多Python の Twisted フレームワークにおけるリアクター イベント マネージャーの使用法の詳細な説明相关文章请关注PHP中文网!