Home  >  Article  >  Backend Development  >  An example tutorial on building a non-blocking download program using Python's Twisted framework

An example tutorial on building a non-blocking download program using Python's Twisted framework

高洛峰
高洛峰Original
2017-02-03 16:42:581295browse

The first poetry server supported by twisted
Although Twisted is used to write server code in most cases, in order to start as simple as possible from the beginning, we first start with a simple client.
Let’s try using Twisted’s client. The source code is in twisted-client-1/get-poetry.py. First, open the three servers as before:

python blocking-server/slowpoetry.py --port 10000 poetry/ecstasy.txt --num-bytes 30
python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txt
python blocking-server/slowpoetry.py --port 10002 poetry/science.txt

and run the client:

python twisted-client-1/get-poetry.py 10000 10001 10002

You will see the command line on the client Printing out:

Task 1: got 60 bytes of poetry from 127.0.0.1:10000
Task 2: got 10 bytes of poetry from 127.0.0.1:10001
Task 3: got 10 bytes of poetry from 127.0.0.1:10002
Task 1: got 30 bytes of poetry from 127.0.0.1:10000
Task 3: got 10 bytes of poetry from 127.0.0.1:10002
Task 2: got 10 bytes of poetry from 127.0.0.1:10001
...
Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0:00:10.134220

is close to what our non-blocking mode client without Twisted prints. This is not surprising since they work the same way.
Next, let’s take a closer look at its source code.
Note: When we start learning to use Twisted, we will use some low-level Twisted APIs. This is done to remove the abstraction layer of Twisted so that we can learn Tiwsted from the inside out. But this means that the APIs we use in learning may not be seen in actual applications. Just remember this: the preceding code is just an exercise, not an example of writing real software.
As you can see, a set of PoetrySocket instances are first created. When PoetrySocket is initialized, it creates a network socket as its own attribute field to connect to the server, and selects non-blocking mode:

self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(address)
self.sock.setblocking(0)

Eventually we will improve to not using sockets On an abstract level, but we still need to use it here. After creating the socket, PoetrySocket passes itself to the reactor through the method addReader:

# tell the Twisted reactor to monitor this socket for reading
from twisted.internet import reactor
reactor.addReader(self)

This method provides Twisted with a file descriptor to monitor the data to be sent. Why don't we pass Twisted a file descriptor or callback function but an object instance? And there is no code inside Twisted related to this poetry service, so how does it know how to interact with our object instance? Trust me, I've checked it out, open the twisted.internet.interfaces module and join me in figuring out what's going on.

Twisted interface
There are many sub-modules called interfaces inside twisted. Each defines a set of interface classes. Since version 8.0, Twisted uses zope.interface as the base class for these classes. But we won't discuss the details here. We're only interested in its Twisted subclasses, the ones you see here.
One of the core purposes of using interfaces is documentation. As a python programmer, you must know Duck Typing. (Python philosophy: "If it looks like a duck, sounds like a duck, then treat it as a duck." Therefore, the interface of python objects strives to be simple and unified, similar to the interface-oriented programming ideas in other languages.) Read twisted.internet .interfaces finds the addReader definition of the method, and its definition can be found in IReactorFDSet:

def addReader(reader):
  """
  I add reader to the set of file descriptors to get read events for.
  @param reader: An L{IReadDescriptor} provider that will be checked for
          read events until it is removed from the reactor with
          L{removeReader}.
  @return: C{None}.
  """

IReactorFDSet is an interface implemented by Twisted reactor. Therefore, any Twisted reactor will have an addReader method, which works as described above. The reason why this method declaration does not have a self parameter is because it only cares about a public interface definition, and the self parameter is only part of the interface implementation (when calling it, a self parameter is not explicitly passed in). Interface classes are never instantiated or implemented as base classes.

Technically speaking, IReactorFDSet will only be used by reactor to listen to file descriptors. As far as I know, all implemented reactors now implement this interface.
Using interfaces is not just for documentation. zope.interface allows you to explicitly declare that a class implements one or more interfaces and provides a mechanism for checking these implementations at runtime. It also provides a proxy mechanism, which can dynamically provide a class that does not implement an interface directly with the interface. But we won’t do in-depth study here.
You may have noticed the similarity between interfaces and the recent additions to virtual base classes in Python. We will not analyze the similarities and differences between them here. If you are interested, you can read an article on this topic written by Glyph, the founder of the Python project.
According to the description of the document, it can be seen that the reader parameter of addReader is to implement the IReadDescriptor interface. This means our PoetrySocket must do the same.
Reading interface module we can see the following code:

class IReadDescriptor(IFileDescriptor):
  def doRead():
    """
    Some data is available for reading on your descriptor.
    """

At the same time you will see that there is a doRead method in our PoetrySocket class. When it is called by Twisted's reactor, it reads data from the socket asynchronously. Therefore, doRead is actually a callback function, but it is not passed directly to the reactor, but an object instance that implements this method is passed. This is also the convention in the Twisted framework - instead of passing the function that implements an interface directly, you pass the object that implements it. In this way we can pass a set of related callback functions through a parameter. And it is also possible to communicate between callback functions through data stored in the object.
What about implementing other callback functions in PoetrySocket? Note that IReadDescriptor is a subclass of IFileDescriptor. This means that anyone who implements IReadDescriptor must implement IFileDescriptor. If you read the code carefully you will see the following:

class IFileDescriptor(ILoggingContext):
  """
  A file descriptor.
  """
  def fileno():
    ...
  def connectionLost(reason):
    …

我将文档描述省略掉了,但这些函数的功能从字面上就可以理解:fileno返回我们想监听的文件描述符,connectionLost是当连接关闭时被调用。你也看到了,PoetrySocket实现了这些方法。
最后,IFileDescriptor继承了ILoggingContext,这里我不想再展现其源码。我想说的是,这就是为什么我们要实现一个logPrefix回调函数。你可以在interface模块中找到答案。
注意:你也许注意到了,当连接关闭时,在doRead中返回了一个特殊的值。我是如何知道的?说实话,没有它程序是无法正常工作的。我是在分析Twisted源码中发现其它相应的方法采取相同的方法。你也许想好好研究一下:但有时一些文档或书的解释是错误的或不完整的。

更多关于回调的知识
我们使用Twisted的异步客户端和前面的没有使用Twisted的异步客户非常的相似。两者都要连接它们自己的socket,并以异步的方式从中读取数据。最大的区别在于:使用Twisted的客户端并没有使用自己的select循环-而使用了Twisted的reactor。 doRead回调函数是非常重要的一个回调。Twisted调用它来告诉我们已经有数据在socket接收完毕。我可以通过图7来形象地说明这一过程:

An example tutorial on building a non-blocking download program using Pythons Twisted framework

每当回调被激活,就轮到我们的代码将所有能够读的数据读回来然后非阻塞式的停止。Twisted是不会因为什么异常状况(如没有必要的阻塞)而终止我们的代码。那么我们就故意写个会产生异常状况的客户端看看到底能发生什么事情。可以在twisted-client-1/get-poetry-broken.py中看到源代码。这个客户端与你前面看到的同样有两个异常状况出现:
这个客户端并没有选择非阻塞式的socket
doRead回调方法在socket关闭连接前一直在不停地读socket
现在让我们运行一下这个客户端:

python twisted-client-1/get-poetry-broken.py 10000 10001 10002

我们出得到如同下面一样的输出:

Task 1: got 3003 bytes of poetry from 127.0.0.1:10000
Task 3: got 653 bytes of poetry from 127.0.0.1:10002
Task 2: got 623 bytes of poetry from 127.0.0.1:10001
Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0:00:10.132753

可能除了任务的完成顺序不太一致外,和我前面阻塞式客户端是一样的。这是因为这个客户端是一个阻塞式的。
由于使用了阻塞式的连接,就将我们的非阻塞式客户端变成了阻塞式的客户端。这样一来,我们尽管遭受了使用select的复杂但却没有享受到其带来的异步优势。
像诸如Twisted这样的事件循环所提供的多任务的能力是需要用户的合作来实现的。Twisted会告诉我们什么时候读或写一个文件描述符,但我们必须要尽可能高效而没有阻塞地完成读写工作。同样我们应该禁止使用其它各类的阻塞函数,如os.system中的函数。除此之外,当我们遇到计算型的任务(长时间占用CPU),最好是将任务切成若干个部分执行以让I/O操作尽可能地执行。
你也许已经注意到这个客户端所花费的时间少于先前那个阻塞的客户端。这是由于这个在一开始就与所有的服务建立连接,由于服务是一旦连接建立就立即发送数据,而且我们的操作系统会缓存一部分发送过来但尚读不到的数据到缓冲区中(缓冲区大小是有上限的)。因此就明白了为什么前面那个会慢了:它是在完成一个后再建立下一个连接并接收数据。
但这种小优势仅仅在小数据量的情况下才会得以体现。如果我们下载三首20M个单词的诗,那时OS的缓冲区会在瞬间填满,这样一来我们这个客户端与前面那个阻塞式客户端相比就没有什么优势可言了。

Building the client abstractly
First of all, this client actually has such boring code as creating a network port and receiving data at the port. Twisted is supposed to implement these routine functions for us, saving us from having to implement them ourselves every time we write a new program. This is particularly useful, as it frees us from some of the tricky exception handling involved in asynchronous I/O (see the previous client), which involves more tricky details if it is to be cross-platform. If you have free time one afternoon, you can look through Twisted's WIN32 implementation source code and see how many little threads there are to handle cross-platform.
Another problem is related to error handling. When the Twisted client running version 1 downloads poetry from a port that is not served, it crashes. Of course we can fix this error, but it's easier to handle these types of errors through Twisted's APIs, which we'll introduce below.
Finally, that client cannot be reused. What if there is another module that needs to download poetry through our client? How do people know that your poems have been downloaded? We cannot use a method to simply download a poem and then pass it on to others, but leave them in a waiting state before. This is indeed a problem, but we are not going to address it in this section—it will definitely be addressed in a future section.
We will use some high-level APIs and interfaces to solve the first and second problems. The Twisted framework is loosely composed of many abstraction layers. Therefore, learning Twisted also means learning what functions these layers provide, such as what APIs, interfaces and instances are available for each layer. Next, we will analyze the most important parts of Twisted to get a better feel for how Twisted is organized. Once you are familiar with the overall structure of Twisted, learning new parts will be much easier.
Generally speaking, each Twisted abstraction is only related to a specific concept. For example, the client in Part 4 uses IReadDescriptor, which is an abstraction of "a file descriptor that can read bytes". An abstraction often specifies the behavior of objects that want to implement the abstraction (that is, implement the interface) by defining an interface. When learning new Twisted abstract concepts, the most important thing to remember is:
Most high-level abstractions are built on the basis of low-level abstractions, and few establish separate portals.
So when you learn a new Twisted abstraction, always remember what it does and doesn't do. In particular, if an early abstraction A implements feature F, then feature F is unlikely to be implemented by any other abstraction. In addition, if another abstraction requires the F feature, it will use A instead of implementing F itself. (Usually, B may inherit A or obtain a reference to an instance of A)
The network is very complex, so Twisted contains many abstract concepts. By starting with a low-level abstraction, we hope to see more clearly how the various parts of a Twisted program are organized.
Core loop body

The first abstraction we need to learn, and the most important one in Twisted, is reactor. At the center of every program built through Twisted, no matter how many layers your program has, there will always be a reactor loop that drives the program without stopping. There is no more basic support than reactor. In fact, other parts of Twisted (that is, except for the reactor loop) can be understood this way: they are all here to assist X to better use reactor. content. Although it is possible to insist on using the low-level APIs like the previous client, if we insist on doing that, then we will have to implement a lot of content ourselves. And at a higher level, it means we can write a lot less code.
But when thinking and dealing with problems from the outside, it is easy to forget the existence of reactor. In any Twisted program of any common size, there will indeed be very little direct interaction with reactor APIs. The same goes for low-level abstractions (i.e. we rarely interact with them directly). The file descriptor abstraction we used in the previous client is so well summarized by higher-level abstractions that we rarely encounter it in real Twisted programs. (They are still used internally, we just can't see it)
As for the file descriptor abstraction message, this is not a problem. Let Twisted take the helm of asynchronous I/O processing so we can focus more on the problem we're actually trying to solve. But it's different with reactor, it will never go away. When you choose to use Twisted, it means you choose to use the Reactor pattern, and it means you need "interactive" programming using callbacks and multitasking.

Transports
Transports抽象是通过Twisted中interfaces模块中ITransport接口定义的。一个Twisted的Transport代表一个可以收发字节的单条连接。对于我们的诗歌下载客户端而言,就是对一条TCP连接的抽象。但是Twisted也支持诸如Unix中管道和UDP。Transport抽象可以代表任何这样的连接并为其代表的连接处理具体的异步I/O操作细节。
如果你浏览一下ITransport中的方法,可能找不到任何接收数据的方法。这是因为Transports总是在低层完成从连接中异步读取数据的许多细节工作,然后通过回调将数据发给我们。相似的原理,Transport对象的写相关的方法为避免阻塞也不会选择立即写我们要发送的数据。告诉一个Transport要发送数据,只是意味着:尽快将这些数据发送出去,别产生阻塞就行。当然,数据会按照我们提交的顺序发送。
通常我们不会自己实现一个Transport。我们会去使用Twisted提供的实现类,即在传递给reactor时会为我们创建一个对象实例。

Protocols
Twisted的Protocols抽象由interfaces模块中的IProtocol定义。也许你已经想到,Protocol对象实现协议内容。也就是说,一个具体的Twisted的Protocol的实现应该对应一个具体网络协议的实现,像FTP、IMAP或其它我们自己制定的协议。我们的诗歌下载协议,正如它表现的那样,就是在连接建立后将所有的诗歌内容全部发送出去并且在发送完毕后关闭连接。
严格意义上讲,每一个Twisted的Protocols类实例都为一个具体的连接提供协议解析。因此我们的程序每建立一条连接(对于服务方就是每接受一条连接),都需要一个协议实例。这就意味着,Protocol实例是存储协议状态与间断性(由于我们是通过异步I/O方式以任意大小来接收数据的)接收并累积数据的地方。
因此,Protocol实例如何得知它为哪条连接服务呢?如果你阅读IProtocol定义会发现一个makeConnection函数。这是一个回调函数,Twisted会在调用它时传递给其一个也是仅有的一个参数,即Transport实例。这个Transport实例就代表Protocol将要使用的连接。
Twisted内置了很多实现了通用协议的Protocol。你可以在twisted.protocols.basic中找到一些稍微简单点的。在你尝试写新Protocol时,最好是看看Twisted源码是不是已经有现成的存在。如果没有,那实现一个自己的协议是非常好的,正如我们为诗歌下载客户端做的那样。

Protocol Factories
因此每个连接需要一个自己的Protocol,而且这个Protocol是我们自己定义的类的实例。由于我们会将创建连接的工作交给Twisted来完成,Twisted需要一种方式来为一个新的连接创建一个合适的协议。创建协议就是Protocol Factories的工作了。
也许你已经猜到了,Protocol Factory的API由IProtocolFactory来定义,同样在interfaces模块中。Protocol Factory就是Factory模式的一个具体实现。buildProtocol方法在每次被调用时返回一个新Protocol实例,它就是Twisted用来为新连接创建新Protocol实例的方法。

诗歌下载客户端2.0:第一滴心血
好吧,让我们来看看由Twisted支持的诗歌下载客户端2.0。源码可以在这里twisted-client-2/get-poetry.py。你可以像前面一样运行它,并得到相同的输出。这也是最后一个在接收到数据时打印其任务的客户端版本了。到现在为止,对于所有Twisted程序都是交替执行任务并处理相对较少数量数据的,应该很清晰了。我们依然通过print函数来展示在关键时刻在进行什么内容,但将来客户端不会在这样繁锁。
在第二个版本中,sockets不会再出现了。我们甚至不需要引入socket模块也不用引用socket对象和文件描述符。取而代之的是,我们告诉reactor来创建到诗歌服务器的连接,代码如下面所示:

factory = PoetryClientFactory(len(addresses))
 
from twisted.internet import reactor
 
for address in addresses:
  host, port = address
  reactor.connectTCP(host, port, factory)

我们需要关注的是connectTCP这个函数。前两个参数的含义很明显,不解释了。第三个参数是我们自定义的PoetryClientFactory类的实例对象。这是一个专门针对诗歌下载客户端的Protocol Factory,将它传递给reactor可以让Twisted为我们创建一个PoetryProtocol实例。
值得注意的是,从一开始我们既没有实现Factory也没有去实现Protocol,不像在前面那个客户端中我们去实例化我们PoetrySocket类。我们只是继承了Twisted在twisted.internet.protocol 中提供的基类。Factory的基类是twisted.internet.protocol.Factory,但我们使用客户端专用(即不像服务器端那样监听一个连接,而是主动创建一个连接)的ClientFactory子类来继承。
我们同样利用了Twisted的Factory已经实现了buildProtocol方法这一优势来为我们所用。我们要在子类中调用基类中的实现:

def buildProtocol(self, address):
  proto = ClientFactory.buildProtocol(self, address)
  proto.task_num = self.task_num
  self.task_num += 1
  return proto

基类怎么会知道我们要创建什么样的Protocol呢?注意,我们的PoetryClientFactory中有一个protocol类变量:

class PoetryClientFactory(ClientFactory):
 
  task_num = 1
 
  protocol = PoetryProtocol # tell base class what proto to build

基类Factory实现buildProtocol的过程是:安装(创建一个实例)我们设置在protocol变量上的Protocol类与在这个实例(此处即PoetryProtocol的实例)的factory属性上设置一个产生它的Factory的引用(此处即实例化PoetryProtocol的PoetryClientFactory)。这个过程如图

正如我们提到的那样,位于Protocol对象内的factory属性字段允许在都由同一个factory产生的Protocol之间共享数据。由于Factories都是由用户代码来创建的(即在用户的控制中),因此这个属性也可以实现Protocol对象将数据传递回一开始初始化请求的代码中来,这将在第六部分看到。
值得注意的是,虽然在Protocol中有一个属性指向生成其的Protocol Factory,在Factory中也有一个变量指向一个Protocol类,但通常来说,一个Factory可以生成多个Protocol。
在Protocol创立的第二步便是通过makeConnection与一个Transport联系起来。我们无需自己来实现这个函数而使用Twisted提供的默认实现。默认情况是,makeConnection将Transport的一个引用赋给(Protocol的)transport属性,同时置(同样是Protocol的)connected属性为True

An example tutorial on building a non-blocking download program using Pythons Twisted framework

一旦初始化到这一步后,Protocol开始其真正的工作—将低层的数据流翻译成高层的协议规定格式的消息。处理接收到数据的主要方法是dataReceived,我们的客户端是这样实现的:

def dataReceived(self, data):
  self.poem += data
  msg = 'Task %d: got %d bytes of poetry from %s'
  print msg % (self.task_num, len(data), self.transport.getHost())

每次dateReceved被调用就意味着我们得到一个新字符串。由于与异步I/O交互,我们不知道能接收到多少数据,因此将接收到的数据缓存下来直到完成一个完整的协议规定格式的消息。在我们的例子中,诗歌只有在连接关闭时才下载完毕,因此我们只是不断地将接收到的数据添加到我们的.poem属性字段中。
注意我们使用了Transport的getHost方法来取得数据来自的服务器信息。我们这样做只是与前面的客户端保持一致。相反,我们的代码没有必要这样做,因为我们没有向服务器发送任何消息,也就没有必要知道服务器的信息了。
我们来看一下dataReceved运行时的快照。在2.0版本相同的目录下有一个twisted-client-2/get-poetry-stack.py。它与2.0版本的不同之处只在于:

def dataReceived(self, data):
  traceback.print_stack()
  os._exit(0)

这样一改,我们就能打印出跟踪堆栈的信息,然后离开程序,可以用下面的命令来运行它:
python twisted-client-2/get-poetry-stack.py 10000
你会得到内容如下的跟踪堆栈:

File "twisted-client-2/get-poetry-stack.py", line 125, in
  poetry_main()
 
... # I removed a bunch of lines here
 
File ".../twisted/internet/tcp.py", line 463, in doRead # Note the doRead callback
  return self.protocol.dataReceived(data)
File "twisted-client-2/get-poetry-stack.py", line 58, in dataReceived
  traceback.print_stack()

   

看见没,有我们在1.0版本客户端的doRead回调函数。我们前面也提到过,Twisted在建立新抽象层会使用已有的实现而不是另起炉灶。因此必然会有一个IReadDescriptor的实例在辛苦的工作,它是由Twisted代码而非我们自己的代码来实现。如果你表示怀疑,那么就看看twisted.internet.tcp中的实现吧。如果你浏览代码会发现,由同一个类实现了IWriteDescriptor与ITransport。因此 IReadDescriptor实际上就是变相的Transport类。可以用图10来形象地说明dateReceived的回调过程:

An example tutorial on building a non-blocking download program using Pythons Twisted framework

一旦诗歌下载完成,PoetryProtocol就会通知它的PooetryClientFactory:

def connectionLost(self, reason):  
 self.poemReceived(self.poem)
def poemReceived(self, poem): 
 self.factory.poem_finished(self.task_num, poem)

当transport的连接关闭时,conncetionLost回调会被激活。reason参数是一个twisted.python.failure.Failure的实例对象,其携带的信息能够说明连接是被安全的关闭还是由于出错被关闭的。我们的客户端因认为总是能完整地下载完诗歌而忽略了这一参数。
工厂会在所有的诗歌都下载完毕后关闭reactor。再次重申:我们代码的工作就是用来下载诗歌-这意味我们的PoetryClientFactory缺少复用性。我们将在下一部分修正这一缺陷。值得注意的是,poem_finish回调函数是如何通过跟踪剩余诗歌数的:

...
self.poetry_count -= 1
 
if self.poetry_count == 0:
...

如果我们采用多线程以让每个线程分别下载诗歌,这样我们就必须使用一把锁来管理这段代码以免多个线程在同一时间调用poem_finish。但是在交互式体系下就不必担心了。由于reactor只能一次启用一个回调。
新的客户端实现在处理错误上也比先前的优雅的多,下面是PoetryClientFactory处理错误连接的回调实现代码:

def clientConnectionFailed(self, connector, reason):
  print 'Failed to connect to:', connector.getDestination()
  self.poem_finished()

   

注意,回调是在工厂内部而不是协议内部实现。由于协议是在连接建立后才创建的,而工厂能够在连接未能成功建立时捕获消息。

更多An example tutorial on building a non-blocking download program using Pythons Twisted framework相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn