首頁  >  文章  >  後端開發  >  使用Python的Twisted框架建立非阻塞下載程式的實例教程

使用Python的Twisted框架建立非阻塞下載程式的實例教程

高洛峰
高洛峰原創
2017-02-03 16:42:581251瀏覽

第一個twisted支援的詩歌伺服器
儘管Twisted大多數情況下用來寫伺服器程式碼,但為了一開始盡量從簡單處著手,我們先從簡單的客戶端講起。
讓我們來試試使用Twisted的客戶端。原始碼在twisted-client-1/get-poetry.py。首先像前面一樣要開啟三個伺服器:

python blocking-server/slowpoetry.py --port 10000 poetry/ecstasy.txt --num-bytes 30
python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txt
python blocking-server/slowpoetry.py --port 10002 poetry/science.txt

並且運行客戶端:

python twisted-client-1/get-poetry.py 10000 10001 10002

你會看到在客戶端的命令列列印出:

Task 1: got 60 bytes of poetry from 127.0.0.1:10000
Task 2: got 10 bytes of poetry from 127.0.0.1:10001
Task 3: got 10 bytes of poetry from 127.0.0.1:10002
Task 1: got 30 bytes of poetry from 127.0.0.1:10000
Task 3: got 10 bytes of poetry from 127.0.0.1:10002
Task 2: got 10 bytes of poetry from 127.0.0.1:10001
...
Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0:00:10.134220

和我們的沒有使用Twisted阻塞模式客戶端列印的內容接近。這並不奇怪,因為它們的工作方式是一樣的。
下面,我們來仔細研究它的原始碼。
注意:我們開始學習使用Twisted時會使用一些低層次Twisted的APIs。這樣做是為揭去Twisted的抽象層,這樣我們就可以從內向外的來學習Tiwsted。但這意味著,我們在學習中所使用的APIs在實際應用上可能都不會見到。記住這麼一點就好:前面這些程式碼只是用來當練習,而不是寫真實軟體的例子。
可以看到,首先創建了一組PoetrySocket的實例。當PoetrySocket初始化時,其創建了一個網路socket作為自己的屬性欄位來連接伺服器,並且選擇了非阻塞模式:

self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(address)
self.sock.setblocking(0)

最終我們雖然會提高到不使用socket的抽象層次上,但這裡我們仍然需要使用它。在創建完socket後,PoetrySocket透過方法addReader將自己傳遞給 reactor:

# tell the Twisted reactor to monitor this socket for reading
from twisted.internet import reactor
reactor.addReader(self)

這個方法給Twisted提供了一個檔案描述符來監視要傳送來的資料。為什麼我們不傳遞給Twisted一個文件描述符或回調函數而是一個物件實例?而Twisted內部沒有任何與這個詩歌服務相關的程式碼,它怎麼知道該如何與我們的物件實例互動?相信我,我已經查看過了,打開twisted.internet.interfaces模組,和我一起來搞清楚是怎麼回事。

Twisted介面
在twisted內部有很多被稱作介面的子模組。每個都定義了一組介面類別。由於在8.0版本中,Twisted使用zope.interface作為這些類別的基底類別。但我們這裡並不來討論它其中的細節。我們只關心其在Twisted的子類,就是你看到的那些。
使用介面的核心目的之一就是文檔化。作為一個python程式設計師,你肯定知道Duck Typing。 (python哲學思想:「如果看起來像鴨子,聽起來像鴨子,就可以把它當作鴨子」。因此python 物件的介面力求簡單且統一,類似其他語言中面向介面程式設計思想。)翻閱twisted.internet .interfaces找到方法的addReader定義,它的定義在IReactorFDSet中可以找到:

def addReader(reader):
  """
  I add reader to the set of file descriptors to get read events for.
  @param reader: An L{IReadDescriptor} provider that will be checked for
          read events until it is removed from the reactor with
          L{removeReader}.
  @return: C{None}.
  """

IReactorFDSet是一個Twisted的reactor實現的介面。因此任何一個Twisted的reactor都會一個 addReader的方法,如同上面所描述的一樣運作。這個方法宣告之所以沒有self參數是因為它只關心一個公共介面定義,self參數只是介面實作時的一部分(在呼叫它時,也沒有明確地傳入一個self參數)。介面類別永遠不會被實例化或作為基底類別來繼承實作。

技術上講,IReactorFDSet只會由reactor實作用來監聽檔案描述子。具我所知,現在所有已實作reactor都會實作這個介面。
使用介面並不是為了文檔化。 zope.interface允許你明確地來聲明一個類別實作一個或多個接口,並提供運行時檢查這些實作的機制。同樣也提供代理這一機制,它可以動態地為一個沒有實作某接口的類別直接提供該介面。但我們這裡就不做深入學習了。
你可能已經注意到介面與最近加入Python中虛基底類別的相似性了。這裡我們並不去分析它們之間的相似性與差異。若有興趣,可以讀讀Python計畫的創辦人Glyph寫的一篇關於這個主題的文章。
根據文檔的描述可以看出,addReader的reader參數是要實作IReadDescriptor介面的。這也就意味著我們的PoetrySocket也必須這麼做。
閱讀介面模組我們可以看到下面這段程式碼:

class IReadDescriptor(IFileDescriptor):
  def doRead():
    """
    Some data is available for reading on your descriptor.
    """

同時你會看到在我們的PoetrySocket類別中有一個doRead方法。當其被Twisted的reactor呼叫時,就會採用非同步的方式從socket讀取資料。因此,doRead其實就是一個回呼函數,只是沒有直接將其傳遞給reactor,而是傳遞實現此方法的物件實例。這也是Twisted框架中的慣例—不是直接傳遞實作某個介面的函數而是傳遞實現它的物件。這樣我們透過一個參數就可以傳遞一組相關的回呼函數。而且也可以讓回呼函數之間透過儲存在物件中的資料進行通訊。
那在PoetrySocket中實現其它的回呼函數呢?注意到IReadDescriptor是IFileDescriptor的一個子類別。這也意味著任何一個實作IReadDescriptor都必須實作IFileDescriptor。若是仔細閱讀程式碼會看到下面的內容:

class IFileDescriptor(ILoggingContext):
  """
  A file descriptor.
  """
  def fileno():
    ...
  def connectionLost(reason):
    …

我将文档描述省略掉了,但这些函数的功能从字面上就可以理解:fileno返回我们想监听的文件描述符,connectionLost是当连接关闭时被调用。你也看到了,PoetrySocket实现了这些方法。
最后,IFileDescriptor继承了ILoggingContext,这里我不想再展现其源码。我想说的是,这就是为什么我们要实现一个logPrefix回调函数。你可以在interface模块中找到答案。
注意:你也许注意到了,当连接关闭时,在doRead中返回了一个特殊的值。我是如何知道的?说实话,没有它程序是无法正常工作的。我是在分析Twisted源码中发现其它相应的方法采取相同的方法。你也许想好好研究一下:但有时一些文档或书的解释是错误的或不完整的。

更多关于回调的知识
我们使用Twisted的异步客户端和前面的没有使用Twisted的异步客户非常的相似。两者都要连接它们自己的socket,并以异步的方式从中读取数据。最大的区别在于:使用Twisted的客户端并没有使用自己的select循环-而使用了Twisted的reactor。 doRead回调函数是非常重要的一个回调。Twisted调用它来告诉我们已经有数据在socket接收完毕。我可以通过图7来形象地说明这一过程:

使用Python的Twisted框架建立非阻塞下載程式的實例教程

每当回调被激活,就轮到我们的代码将所有能够读的数据读回来然后非阻塞式的停止。Twisted是不会因为什么异常状况(如没有必要的阻塞)而终止我们的代码。那么我们就故意写个会产生异常状况的客户端看看到底能发生什么事情。可以在twisted-client-1/get-poetry-broken.py中看到源代码。这个客户端与你前面看到的同样有两个异常状况出现:
这个客户端并没有选择非阻塞式的socket
doRead回调方法在socket关闭连接前一直在不停地读socket
现在让我们运行一下这个客户端:

python twisted-client-1/get-poetry-broken.py 10000 10001 10002

我们出得到如同下面一样的输出:

Task 1: got 3003 bytes of poetry from 127.0.0.1:10000
Task 3: got 653 bytes of poetry from 127.0.0.1:10002
Task 2: got 623 bytes of poetry from 127.0.0.1:10001
Task 1: 3003 bytes of poetry
Task 2: 623 bytes of poetry
Task 3: 653 bytes of poetry
Got 3 poems in 0:00:10.132753

可能除了任务的完成顺序不太一致外,和我前面阻塞式客户端是一样的。这是因为这个客户端是一个阻塞式的。
由于使用了阻塞式的连接,就将我们的非阻塞式客户端变成了阻塞式的客户端。这样一来,我们尽管遭受了使用select的复杂但却没有享受到其带来的异步优势。
像诸如Twisted这样的事件循环所提供的多任务的能力是需要用户的合作来实现的。Twisted会告诉我们什么时候读或写一个文件描述符,但我们必须要尽可能高效而没有阻塞地完成读写工作。同样我们应该禁止使用其它各类的阻塞函数,如os.system中的函数。除此之外,当我们遇到计算型的任务(长时间占用CPU),最好是将任务切成若干个部分执行以让I/O操作尽可能地执行。
你也许已经注意到这个客户端所花费的时间少于先前那个阻塞的客户端。这是由于这个在一开始就与所有的服务建立连接,由于服务是一旦连接建立就立即发送数据,而且我们的操作系统会缓存一部分发送过来但尚读不到的数据到缓冲区中(缓冲区大小是有上限的)。因此就明白了为什么前面那个会慢了:它是在完成一个后再建立下一个连接并接收数据。
但这种小优势仅仅在小数据量的情况下才会得以体现。如果我们下载三首20M个单词的诗,那时OS的缓冲区会在瞬间填满,这样一来我们这个客户端与前面那个阻塞式客户端相比就没有什么优势可言了。

抽像地建構客戶端
首先是,這個客戶端竟然有創建網路連接埠並接收連接埠處的資料這樣枯燥的程式碼。 Twisted理應為我們實現這些例程性功能,省得我們每次寫一個新的程式時都要自己去實現。這樣做特別有用,可以將我們從非同步I/O涉及的一些棘手的異常處理中解放出來(參看前面的客戶端) , 如果要跨平台就涉及到更多更加棘手的細節。如果你哪天下午有空,可以翻Twisted的WIN32實作原始碼,看看裡面有多少小針線是來處理跨平台的。
另一問題是與錯誤處理有關。當運行版本1的Twisted客戶端從並沒有提供服務的連接埠上下載詩歌時,它就會崩潰。當然我們是可以修正這個錯誤,但透過下面我們要介紹Twisted的APIs來處理這些類型的錯誤會更簡單。
最後,那個客戶端也不能重複使用。如果有另一個模組需要透過我們的客戶端下載詩歌呢?人家怎麼知道你的詩已經下載完畢?我們不能用一個方法簡單地將一首詩下載完成後再傳給人家,而在之前讓人家處於等待狀態。這確實是一個問題,但我們不准備在這個部分解決這個問題——在未來的部分中一定會解決這個問題。
我們將會使用一些高層次的APIs和介面來解決第一、二個問題。 Twisted框架是由眾多抽象層鬆散地組合起來的。因此,學習Twisted也意味著需要學習這些層提供什麼功能,例如每層都有哪些APIs,介面和實例可供使用。接下來我們會透過剖析Twisted最最重要的部分來更好地感受一下Twisted都是怎麼組織的。一旦你對Twisted的整個結構熟悉了,學習新的部分會簡單多了。
一般來說,每個Twisted的抽像都只與一個特定的概念有關。例如,第四部分中的客戶端所使用的IReadDescriptor,它就是"一個可以讀取位元組的檔案描述符"的抽象。一個抽象往往會透過定義介面來指定那些想實現這個抽象(也就是實作這個介面)的物件的形為。在學習新的Twisted抽象概念時,最需要謹記的就是:
多數高層次抽像都是在低層次抽象的基礎上建立的,很少有另立門戶的。
因此,你在學習新的Twisted抽象概念時,總是要記住它做什麼和不做什麼。特別是,如果一個早期的抽象A實現了F特性,那麼F特性不太可能再由其它任何抽象來實現。另外,如果另外一個抽象需要F特性,那麼它就會使用A而不是自己再去實現F。 (通常的做法,B可能會透過繼承A或獲得一個指向A實例的引用)
網路非常的複雜,因此Twisted包含許多抽象的概念。透過從低層的抽象講起,我們希望能更清楚起看到在一個Twisted程式中各個部分是怎麼組織起來的。
核心的循環體

第一個我們要學習的抽象,也是Twisted中最重要的,就是reactor。在每個透過Twisted搭建起來的程式中心處,不管你這個程式有多少層,總會有一個reactor循環在不停止地驅動程式的運作。再也沒有比reactor提供更基礎的支援了。實際上,Twisted的其它部分(即除了reactor循環體)可以這樣理解:它們都是來輔助X來更好地使用reactor,這裡的X可以是提供Web網頁、處理一個資料庫查詢請求或其它更加具體的內容。儘管堅持像上一個客戶端一樣使用低層APIs是可能的,但如果我們執意那樣做,那麼我們必需自己來實現非常多的內容。而在更高的層次上,意味著我們可以少寫很多程式碼。
但是當在外層思考與處理問題時, 很容易就忘記了reactor的存在了。在任何一個常見大小的Twisted程式中 ,確實很少會有直接與reactor的APIs互動。低層的抽像也是一樣(即我們很少會直接與其互動)。我們在上一個客戶端中用到的文件描述符抽象,就被更高層的抽象更好的歸納以至於我們很少會在真正的Twisted程式中遇到。 (他們在內部依然在被使用,只是我們看不到而已)
至於文件描述符抽象的訊息,這並不是一個問題。讓Twisted掌舵異步I/O處理,這樣我們就可以更加關注我們實際要解決的問題。但對於reactor不一樣,它永遠不會消失。當你選擇使用Twisted,也意味著你選擇使用Reactor模式,並且意味著你需要使用回呼與多任務合作的"互動式"程式設計方式。

Transports
Transports抽象是通过Twisted中interfaces模块中ITransport接口定义的。一个Twisted的Transport代表一个可以收发字节的单条连接。对于我们的诗歌下载客户端而言,就是对一条TCP连接的抽象。但是Twisted也支持诸如Unix中管道和UDP。Transport抽象可以代表任何这样的连接并为其代表的连接处理具体的异步I/O操作细节。
如果你浏览一下ITransport中的方法,可能找不到任何接收数据的方法。这是因为Transports总是在低层完成从连接中异步读取数据的许多细节工作,然后通过回调将数据发给我们。相似的原理,Transport对象的写相关的方法为避免阻塞也不会选择立即写我们要发送的数据。告诉一个Transport要发送数据,只是意味着:尽快将这些数据发送出去,别产生阻塞就行。当然,数据会按照我们提交的顺序发送。
通常我们不会自己实现一个Transport。我们会去使用Twisted提供的实现类,即在传递给reactor时会为我们创建一个对象实例。

Protocols
Twisted的Protocols抽象由interfaces模块中的IProtocol定义。也许你已经想到,Protocol对象实现协议内容。也就是说,一个具体的Twisted的Protocol的实现应该对应一个具体网络协议的实现,像FTP、IMAP或其它我们自己制定的协议。我们的诗歌下载协议,正如它表现的那样,就是在连接建立后将所有的诗歌内容全部发送出去并且在发送完毕后关闭连接。
严格意义上讲,每一个Twisted的Protocols类实例都为一个具体的连接提供协议解析。因此我们的程序每建立一条连接(对于服务方就是每接受一条连接),都需要一个协议实例。这就意味着,Protocol实例是存储协议状态与间断性(由于我们是通过异步I/O方式以任意大小来接收数据的)接收并累积数据的地方。
因此,Protocol实例如何得知它为哪条连接服务呢?如果你阅读IProtocol定义会发现一个makeConnection函数。这是一个回调函数,Twisted会在调用它时传递给其一个也是仅有的一个参数,即Transport实例。这个Transport实例就代表Protocol将要使用的连接。
Twisted内置了很多实现了通用协议的Protocol。你可以在twisted.protocols.basic中找到一些稍微简单点的。在你尝试写新Protocol时,最好是看看Twisted源码是不是已经有现成的存在。如果没有,那实现一个自己的协议是非常好的,正如我们为诗歌下载客户端做的那样。

Protocol Factories
因此每个连接需要一个自己的Protocol,而且这个Protocol是我们自己定义的类的实例。由于我们会将创建连接的工作交给Twisted来完成,Twisted需要一种方式来为一个新的连接创建一个合适的协议。创建协议就是Protocol Factories的工作了。
也许你已经猜到了,Protocol Factory的API由IProtocolFactory来定义,同样在interfaces模块中。Protocol Factory就是Factory模式的一个具体实现。buildProtocol方法在每次被调用时返回一个新Protocol实例,它就是Twisted用来为新连接创建新Protocol实例的方法。

诗歌下载客户端2.0:第一滴心血
好吧,让我们来看看由Twisted支持的诗歌下载客户端2.0。源码可以在这里twisted-client-2/get-poetry.py。你可以像前面一样运行它,并得到相同的输出。这也是最后一个在接收到数据时打印其任务的客户端版本了。到现在为止,对于所有Twisted程序都是交替执行任务并处理相对较少数量数据的,应该很清晰了。我们依然通过print函数来展示在关键时刻在进行什么内容,但将来客户端不会在这样繁锁。
在第二个版本中,sockets不会再出现了。我们甚至不需要引入socket模块也不用引用socket对象和文件描述符。取而代之的是,我们告诉reactor来创建到诗歌服务器的连接,代码如下面所示:

factory = PoetryClientFactory(len(addresses))
 
from twisted.internet import reactor
 
for address in addresses:
  host, port = address
  reactor.connectTCP(host, port, factory)

我们需要关注的是connectTCP这个函数。前两个参数的含义很明显,不解释了。第三个参数是我们自定义的PoetryClientFactory类的实例对象。这是一个专门针对诗歌下载客户端的Protocol Factory,将它传递给reactor可以让Twisted为我们创建一个PoetryProtocol实例。
值得注意的是,从一开始我们既没有实现Factory也没有去实现Protocol,不像在前面那个客户端中我们去实例化我们PoetrySocket类。我们只是继承了Twisted在twisted.internet.protocol 中提供的基类。Factory的基类是twisted.internet.protocol.Factory,但我们使用客户端专用(即不像服务器端那样监听一个连接,而是主动创建一个连接)的ClientFactory子类来继承。
我们同样利用了Twisted的Factory已经实现了buildProtocol方法这一优势来为我们所用。我们要在子类中调用基类中的实现:

def buildProtocol(self, address):
  proto = ClientFactory.buildProtocol(self, address)
  proto.task_num = self.task_num
  self.task_num += 1
  return proto

基类怎么会知道我们要创建什么样的Protocol呢?注意,我们的PoetryClientFactory中有一个protocol类变量:

class PoetryClientFactory(ClientFactory):
 
  task_num = 1
 
  protocol = PoetryProtocol # tell base class what proto to build

基类Factory实现buildProtocol的过程是:安装(创建一个实例)我们设置在protocol变量上的Protocol类与在这个实例(此处即PoetryProtocol的实例)的factory属性上设置一个产生它的Factory的引用(此处即实例化PoetryProtocol的PoetryClientFactory)。这个过程如图

正如我们提到的那样,位于Protocol对象内的factory属性字段允许在都由同一个factory产生的Protocol之间共享数据。由于Factories都是由用户代码来创建的(即在用户的控制中),因此这个属性也可以实现Protocol对象将数据传递回一开始初始化请求的代码中来,这将在第六部分看到。
值得注意的是,虽然在Protocol中有一个属性指向生成其的Protocol Factory,在Factory中也有一个变量指向一个Protocol类,但通常来说,一个Factory可以生成多个Protocol。
在Protocol创立的第二步便是通过makeConnection与一个Transport联系起来。我们无需自己来实现这个函数而使用Twisted提供的默认实现。默认情况是,makeConnection将Transport的一个引用赋给(Protocol的)transport属性,同时置(同样是Protocol的)connected属性为True

使用Python的Twisted框架建立非阻塞下載程式的實例教程

一旦初始化到这一步后,Protocol开始其真正的工作—将低层的数据流翻译成高层的协议规定格式的消息。处理接收到数据的主要方法是dataReceived,我们的客户端是这样实现的:

def dataReceived(self, data):
  self.poem += data
  msg = 'Task %d: got %d bytes of poetry from %s'
  print msg % (self.task_num, len(data), self.transport.getHost())

每次dateReceved被调用就意味着我们得到一个新字符串。由于与异步I/O交互,我们不知道能接收到多少数据,因此将接收到的数据缓存下来直到完成一个完整的协议规定格式的消息。在我们的例子中,诗歌只有在连接关闭时才下载完毕,因此我们只是不断地将接收到的数据添加到我们的.poem属性字段中。
注意我们使用了Transport的getHost方法来取得数据来自的服务器信息。我们这样做只是与前面的客户端保持一致。相反,我们的代码没有必要这样做,因为我们没有向服务器发送任何消息,也就没有必要知道服务器的信息了。
我们来看一下dataReceved运行时的快照。在2.0版本相同的目录下有一个twisted-client-2/get-poetry-stack.py。它与2.0版本的不同之处只在于:

def dataReceived(self, data):
  traceback.print_stack()
  os._exit(0)

这样一改,我们就能打印出跟踪堆栈的信息,然后离开程序,可以用下面的命令来运行它:
python twisted-client-2/get-poetry-stack.py 10000
你会得到内容如下的跟踪堆栈:

File "twisted-client-2/get-poetry-stack.py", line 125, in
  poetry_main()
 
... # I removed a bunch of lines here
 
File ".../twisted/internet/tcp.py", line 463, in doRead # Note the doRead callback
  return self.protocol.dataReceived(data)
File "twisted-client-2/get-poetry-stack.py", line 58, in dataReceived
  traceback.print_stack()

   

看见没,有我们在1.0版本客户端的doRead回调函数。我们前面也提到过,Twisted在建立新抽象层会使用已有的实现而不是另起炉灶。因此必然会有一个IReadDescriptor的实例在辛苦的工作,它是由Twisted代码而非我们自己的代码来实现。如果你表示怀疑,那么就看看twisted.internet.tcp中的实现吧。如果你浏览代码会发现,由同一个类实现了IWriteDescriptor与ITransport。因此 IReadDescriptor实际上就是变相的Transport类。可以用图10来形象地说明dateReceived的回调过程:

使用Python的Twisted框架建立非阻塞下載程式的實例教程

一旦诗歌下载完成,PoetryProtocol就会通知它的PooetryClientFactory:

def connectionLost(self, reason):  
 self.poemReceived(self.poem)
def poemReceived(self, poem): 
 self.factory.poem_finished(self.task_num, poem)

当transport的连接关闭时,conncetionLost回调会被激活。reason参数是一个twisted.python.failure.Failure的实例对象,其携带的信息能够说明连接是被安全的关闭还是由于出错被关闭的。我们的客户端因认为总是能完整地下载完诗歌而忽略了这一参数。
工厂会在所有的诗歌都下载完毕后关闭reactor。再次重申:我们代码的工作就是用来下载诗歌-这意味我们的PoetryClientFactory缺少复用性。我们将在下一部分修正这一缺陷。值得注意的是,poem_finish回调函数是如何通过跟踪剩余诗歌数的:

...
self.poetry_count -= 1
 
if self.poetry_count == 0:
...

如果我们采用多线程以让每个线程分别下载诗歌,这样我们就必须使用一把锁来管理这段代码以免多个线程在同一时间调用poem_finish。但是在交互式体系下就不必担心了。由于reactor只能一次启用一个回调。
新的客户端实现在处理错误上也比先前的优雅的多,下面是PoetryClientFactory处理错误连接的回调实现代码:

def clientConnectionFailed(self, connector, reason):
  print 'Failed to connect to:', connector.getDestination()
  self.poem_finished()

   

注意,回调是在工厂内部而不是协议内部实现。由于协议是在连接建立后才创建的,而工厂能够在连接未能成功建立时捕获消息。

更多使用Python的Twisted框架建立非阻塞下載程式的實例教程相关文章请关注PHP中文网!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn