


Preparation
In a lot of practice, it seems that we always use asynchronous programming in a similar way:
Listen for events
The event occurs and execute the corresponding callback function
Callback completed (new events may be generated and added to the listening queue)
Return to 1, listen for events
So we call this asynchronous mode Reactor mode, for example in iOS development The concept of Run Loop in is actually very similar to Reactor loop. The Run Loop of the main thread monitors screen UI events. Once a UI event occurs, the corresponding event processing code is executed. Events can also be generated through GCD and other methods to the main thread for execution.
The above picture is boost's depiction of the Reactor mode. The design of Twisted is based on this Reactor mode. The Twisted program continuously loops in the process of waiting for events and processing events.
from twisted.internet import reactor reactor.run()
reactor is a singleton object in the Twisted program.
reactor
reactor is an event manager, used to register and unregister events, run event loops, and call callback functions when events occur. There are several conclusions about reactor:
Twisted reactor can only be started by calling reactor.run().
Thereactor loop runs in the process where it starts, that is, in the main process.
Once started, it will keep running. The reactor will be under the control of the program (or specifically under the control of a thread that started it).
reactor loop does not consume any CPU resources.
There is no need to explicitly create a reactor, just import it.
The last one needs to be explained clearly. In Twisted, reactor is a Singleton (that is, singleton mode), that is, there can only be one reactor in a program, and as long as you introduce it, one will be created accordingly. The method introduced above is the default method used by twisted. Of course, twisted has other methods that can introduce reactor. For example, you can use the system call in twisted.internet.pollreactor to poll instead of the select method.
If you use other reactors, you need to install it before introducing twisted.internet.reactor. Here is how to install pollreactor:
from twisted.internet import pollreactor pollreactor.install()
If you have introduced twisted.internet.reactor without installing any other special reactor, Twisted will install the default reactor according to the operating system. Because of this, it is customary not to introduce reactor in the top-level module to avoid installing the default reactor, but to install it in the area where you want to use reactor.
The following is a rewrite of the above program using pollreactor:
from twited.internet import pollreactor pollreactor.install() from twisted.internet import reactor reactor.run()
So how does reactor implement a singleton? Let's take a look at what from twisted.internet import reactor does and you will understand.
The following is part of the code of twisted/internet/reactor.py:
# twisted/internet/reactor.py import sys del sys.modules['twisted.internet.reactor'] from twisted.internet import default default.install()
Note: All modules loaded into memory in Python are placed in sys.modules, which is a global dictionary. When importing a module, it will first check whether the module has been loaded in this list. If it is loaded, the module name will only be added to the namespace of the module that is calling the import. If it is not loaded, search the module file according to the module name in the sys.path directory. After finding the module, load the module into memory, add it to sys.modules, and import the name into the current namespace.
If we run from twisted.internet import reactor for the first time, because there is no twisted.internet.reactor in sys.modules, the code in reactor.py will be run and the default reactor will be installed. After that, if imported, because the module already exists in sys.modules, twisted.internet.reactor in sys.modules will be directly imported into the current namespace.
Install in default:
# twisted/internet/default.py def _getInstallFunction(platform): """ Return a function to install the reactor most suited for the given platform. @param platform: The platform for which to select a reactor. @type platform: L{twisted.python.runtime.Platform} @return: A zero-argument callable which will install the selected reactor. """ try: if platform.isLinux(): try: from twisted.internet.epollreactor import install except ImportError: from twisted.internet.pollreactor import install elif platform.getType() == 'posix' and not platform.isMacOSX(): from twisted.internet.pollreactor import install else: from twisted.internet.selectreactor import install except ImportError: from twisted.internet.selectreactor import install return install install = _getInstallFunction(platform)
Obviously, default will get the corresponding install according to the platform. Under Linux, epollreactor will be used first. If the kernel does not support it, pollreactor can only be used. The Mac platform uses pollreactor, and windows uses selectreactor. The implementation of each install is similar. Here we extract the install in selectreactor to take a look.
# twisted/internet/selectreactor.py: def install(): """Configure the twisted mainloop to be run using the select() reactor. """ # 单例 reactor = SelectReactor() from twisted.internet.main import installReactor installReactor(reactor) # twisted/internet/main.py: def installReactor(reactor): """ Install reactor C{reactor}. @param reactor: An object that provides one or more IReactor* interfaces. """ # this stuff should be common to all reactors. import twisted.internet import sys if 'twisted.internet.reactor' in sys.modules: raise error.ReactorAlreadyInstalledError("reactor already installed") twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor
In installReactor, add the twisted.internet.reactor key to sys.modules, and the value is the singleton reactor created in install. If you want to use reactor in the future, you will import this singleton.
SelectReactor # twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase)
implementer means that SelectReactor implements the method of IReactorFDSet interface. zope.interface is used here, which is the interface implementation in python. Interested students can go Take a look.
The IReactorFDSet interface mainly provides methods for obtaining, adding, deleting and other operations on descriptors. You can know the meaning of these methods just by looking at their names, so I didn’t add comments.
# twisted/internet/interfaces.py class IReactorFDSet(Interface): def addReader(reader): def addWriter(writer): def removeReader(reader): def removeWriter(writer): def removeAll(): def getReaders(): def getWriters(): reactor.listenTCP()
reactor.listenTCP() in the example registers a listening event, which is a method in the parent class PosixReactorBase.
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase): def listenTCP(self, port, factory, backlog=50, interface=''): p = tcp.Port(port, factory, backlog, interface, self) p.startListening() return p # twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def __init__(self, port, factory, backlog=50, interface='', reactor=None): """Initialize with a numeric port to listen on. """ base.BasePort.__init__(self, reactor=reactor) self.port = port self.factory = factory self.backlog = backlog if abstract.isIPv6Address(interface): self.addressFamily = socket.AF_INET6 self._addressType = address.IPv6Address self.interface = interface ... def startListening(self): """Create and bind my socket, and begin listening on it. 创建并绑定套接字,开始监听。 This is called on unserialization, and must be called after creating a server to begin listening on the specified port. """ if self._preexistingSocket is None: # Create a new socket and make it listen try: # 创建套接字 skt = self.createInternetSocket() if self.addressFamily == socket.AF_INET6: addr = _resolveIPv6(self.interface, self.port) else: addr = (self.interface, self.port) # 绑定 skt.bind(addr) except socket.error as le: raise CannotListenError(self.interface, self.port, le) # 监听 skt.listen(self.backlog) else: # Re-use the externally specified socket skt = self._preexistingSocket self._preexistingSocket = None # Avoid shutting it down at the end. self._shouldShutdown = False # Make sure that if we listened on port 0, we update that to # reflect what the OS actually assigned us. self._realPortNumber = skt.getsockname()[1] log.msg("%s starting on %s" % ( self._getLogPrefix(self.factory), self._realPortNumber)) # The order of the next 5 lines is kind of bizarre. If no one # can explain it, perhaps we should re-arrange them. self.factory.doStart() self.connected = True self.socket = skt self.fileno = self.socket.fileno self.numberAccepts = 100 # startReading调用reactor的addReader方法将Port加入读集合 self.startReading()
整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。
reacotr.run()事件主循环
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase) # twisted/internet/base.py class _SignalReactorMixin(object): def startRunning(self, installSignalHandlers=True): """ PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是 _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。 """ self._installSignalHandlers = installSignalHandlers ReactorBase.startRunning(self) def run(self, installSignalHandlers=True): self.startRunning(installSignalHandlers=installSignalHandlers) self.mainLoop() def mainLoop(self): while self._started: try: while self._started: # Advance simulation time in delayed event # processors. self.runUntilCurrent() t2 = self.timeout() t = self.running and t2 # doIteration是关键,select,poll,epool实现各有不同 self.doIteration(t) except: log.msg("Unexpected error in main loop.") log.err() else: log.msg('Main loop terminated.')
mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。
# twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase): def __init__(self): """ Initialize file descriptor tracking dictionaries and the base class. """ self._reads = set() self._writes = set() posixbase.PosixReactorBase.__init__(self) def doSelect(self, timeout): """ Run one iteration of the I/O monitor loop. This will run all selectables who had input or output readiness waiting for them. """ try: # 调用select方法监控读写集合,返回准备好读写的描述符 r, w, ignored = _select(self._reads, self._writes, [], timeout) except ValueError: # Possibly a file descriptor has gone negative? self._preenDescriptors() return except TypeError: # Something *totally* invalid (object w/o fileno, non-integral # result) was passed log.err() self._preenDescriptors() return except (select.error, socket.error, IOError) as se: # select(2) encountered an error, perhaps while calling the fileno() # method of a socket. (Python 2.6 socket.error is an IOError # subclass, but on Python 2.5 and earlier it is not.) if se.args[0] in (0, 2): # windows does this if it got an empty list if (not self._reads) and (not self._writes): return else: raise elif se.args[0] == EINTR: return elif se.args[0] == EBADF: self._preenDescriptors() return else: # OK, I really don't know what's going on. Blow up. raise _drdw = self._doReadOrWrite _logrun = log.callWithLogger for selectables, method, fdset in ((r, "doRead", self._reads), (w,"doWrite", self._writes)): for selectable in selectables: # if this was disconnected in another thread, kill it. # ^^^^ --- what the !@#*? serious! -exarkun if selectable not in fdset: continue # This for pausing input when we're not ready for more. # 调用_doReadOrWrite方法 _logrun(selectable, _drdw, selectable, method) doIteration = doSelect def _doReadOrWrite(self, selectable, method): try: # 调用method,doRead或者是doWrite, # 这里的selectable可能是我们监听的tcp.Port why = getattr(selectable, method)() except: why = sys.exc_info()[1] log.err() if why: self._disconnectSelectable(selectable, why, method=="doRead")
那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def doRead(self): """Called when my socket is ready for reading. 当套接字准备好读的时候调用 This accepts a connection and calls self.protocol() to handle the wire-level protocol. """ try: if platformType == "posix": numAccepts = self.numberAccepts else: numAccepts = 1 for i in range(numAccepts): if self.disconnecting: return try: # 调用accept skt, addr = self.socket.accept() except socket.error as e: if e.args[0] in (EWOULDBLOCK, EAGAIN): self.numberAccepts = i break elif e.args[0] == EPERM: continue elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED): log.msg("Could not accept new connection (%s)" % ( errorcode[e.args[0]],)) break raise fdesc._setCloseOnExec(skt.fileno()) protocol = self.factory.buildProtocol(self._buildAddr(addr)) if protocol is None: skt.close() continue s = self.sessionno self.sessionno = s+1 # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备 # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了 transport = self.transport(skt, protocol, addr, self, s, self.reactor) protocol.makeConnection(transport) else: self.numberAccepts = self.numberAccepts+20 except: log.deferr()
doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。
Connection是Server(transport实例的类)的父类,它实现了doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle) class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser, _AbortingMixin): def doRead(self): try: # 接收数据 data = self.socket.recv(self.bufferSize) except socket.error as se: if se.args[0] == EWOULDBLOCK: return else: return main.CONNECTION_LOST return self._dataReceived(data) def _dataReceived(self, data): if not data: return main.CONNECTION_DONE # 调用我们自定义protocol的dataReceived方法处理数据 rval = self.protocol.dataReceived(data) if rval is not None: offender = self.protocol.dataReceived warningFormat = ( 'Returning a value other than None from %(fqpn)s is ' 'deprecated since %(version)s.') warningString = deprecate.getDeprecationWarningString( offender, versions.Version('Twisted', 11, 0, 0), format=warningFormat) deprecate.warnAboutFunction(offender, warningString) return rval
_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。
至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。
更多Detailed explanation of the usage of reactor event manager in Pythons Twisted framework相关文章请关注PHP中文网!

This article explains how to use Beautiful Soup, a Python library, to parse HTML. It details common methods like find(), find_all(), select(), and get_text() for data extraction, handling of diverse HTML structures and errors, and alternatives (Sel

Python's statistics module provides powerful data statistical analysis capabilities to help us quickly understand the overall characteristics of data, such as biostatistics and business analysis. Instead of looking at data points one by one, just look at statistics such as mean or variance to discover trends and features in the original data that may be ignored, and compare large datasets more easily and effectively. This tutorial will explain how to calculate the mean and measure the degree of dispersion of the dataset. Unless otherwise stated, all functions in this module support the calculation of the mean() function instead of simply summing the average. Floating point numbers can also be used. import random import statistics from fracti

Serialization and deserialization of Python objects are key aspects of any non-trivial program. If you save something to a Python file, you do object serialization and deserialization if you read the configuration file, or if you respond to an HTTP request. In a sense, serialization and deserialization are the most boring things in the world. Who cares about all these formats and protocols? You want to persist or stream some Python objects and retrieve them in full at a later time. This is a great way to see the world on a conceptual level. However, on a practical level, the serialization scheme, format or protocol you choose may determine the speed, security, freedom of maintenance status, and other aspects of the program

This article compares TensorFlow and PyTorch for deep learning. It details the steps involved: data preparation, model building, training, evaluation, and deployment. Key differences between the frameworks, particularly regarding computational grap

The article discusses popular Python libraries like NumPy, Pandas, Matplotlib, Scikit-learn, TensorFlow, Django, Flask, and Requests, detailing their uses in scientific computing, data analysis, visualization, machine learning, web development, and H

This article guides Python developers on building command-line interfaces (CLIs). It details using libraries like typer, click, and argparse, emphasizing input/output handling, and promoting user-friendly design patterns for improved CLI usability.

This tutorial builds upon the previous introduction to Beautiful Soup, focusing on DOM manipulation beyond simple tree navigation. We'll explore efficient search methods and techniques for modifying HTML structure. One common DOM search method is ex

Solution to permission issues when viewing Python version in Linux terminal When you try to view Python version in Linux terminal, enter python...


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

SublimeText3 English version
Recommended: Win version, supports code prompts!

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

SublimeText3 Linux new version
SublimeText3 Linux latest version

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.
