Maison > Article > développement back-end > Analyser les principales fonctionnalités du framework Twisted de Python
1. réacteur
Le cœur du réacteur torsadé, et le réacteur est inévitablement synchrone/asynchrone, bloquant/non bloquant. Dans l'introduction conceptuelle de Dave dans le premier chapitre, il y a un peu de frontière entre synchrone/asynchrone. Fuzzy, concernant la synchronisation/asynchrone, le blocage/non-blocage, veuillez vous référer à la discussion sur Zhihu. Quant au proactor (activateur) et au réacteur (réacteur), voici un blog recommandé avec une introduction plus détaillée.
En ce qui concerne les IO réseau en mode réacteur, il doit s'agir d'IO synchrones plutôt que d'IO asynchrones. Le cœur de l'asynchronisme mentionné dans le premier chapitre de Dave est le suivant : abandonner explicitement le contrôle de la tâche au lieu d'être arrêté de manière aléatoire par le système d'exploitation. Le programmeur doit organiser la tâche en une séquence à réaliser en alternant de petites étapes. Par conséquent, si l’une des tâches utilise le résultat d’une autre tâche, la tâche dépendante (c’est-à-dire la tâche qui reçoit le résultat) doit être conçue pour recevoir une série de bits ou de fragments plutôt que tous en même temps.
Abandonner explicitement et de manière proactive le contrôle d'une tâche est quelque peu similaire à la façon de penser les coroutines. Reactor peut être considéré comme le planificateur de coroutines. Le réacteur est une boucle d'événements. Nous pouvons enregistrer les événements qui nous intéressent (comme le socket étant lisible/inscriptible) et les processeurs (comme effectuer des opérations de lecture et d'écriture) avec le réacteur. Le réacteur rappellera notre processeur lorsque l'événement se produit. se produit. Une fois l'exécution du processeur terminée, cela équivaut à la suspension de la coroutine (rendement), retournant à la boucle d'événements du réacteur, attendant que l'événement suivant vienne et rappelant. Le réacteur lui-même dispose d'un démultiplexeur d'événements synchrones, qui peut être implémenté par select/epoll et d'autres mécanismes. Bien entendu, le déclenchement d'événements du réacteur torsadé n'est pas nécessairement basé sur les IO, mais peut également être déclenché par d'autres mécanismes tels que des minuteries.
Le réacteur torsadé ne nous oblige pas à enregistrer activement les événements et les fonctions de rappel, mais est implémenté par polymorphisme (hériter d'une classe spécifique, implémenter l'interface d'événement concernée, puis la transmettre au réacteur torsadé). Concernant le réacteur twisté, il y a plusieurs choses à noter :
twisted.internet.reactor est un mode singleton, et chaque programme ne peut avoir qu'un seul réacteur
Essayez de terminer l'opération le plus tôt possible dans la fonction de rappel du réacteur ; et ne bloquez pas Les tâches et les réacteurs sont essentiellement à thread unique. Le code de rappel utilisateur et le code tordu exécuté dans le même contexte entraîneront le blocage de toute la boucle d'événements du réacteur
; à moins que réacteur.stop() ne soit utilisé explicitement, mais en général, appeler réacteur.stop() signifie que l'application se termine
2. Twisted est simple à utiliser
L'essence de twisted est le réacteur ; nous pouvons utiliser l'API sous-jacente de twisted (pour éviter d'activer l'abstraction de haut niveau pratique de twisted) pour utiliser le réacteur :
# 示例一 twisted底层API的使用 from twisted.internet import reacto from twisted.internet import main from twisted.internet.interfaces import IReadDescriptor import socket class MySocket(IReadDescriptor): def __init__(self, address): # 连接服务器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告诉reactor 监听的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在连接断开时的回调 def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 当应用程序需要终止时 调用: # reactor.stop() # 接口: 当套接字描述符有数据可读时 def doRead(self): bytes = '' # 尽可能多的读取数据 while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在这里解析协议并处理数据 print bytes
L'exemple 1 permet de voir clairement l'essence du réacteur tordu : ajoutez un descripteur d'écoute, écoutez pour les événements lisibles/inscriptibles, et lorsque l'événement arrive, fonction de rappel, continuez à écouter les événements une fois le rappel terminé.
Remarque :
Le socket est non bloquant. S'il est bloquant, la signification de réacteur est perdue
Nous fournissons l'interface requise par réacteur en héritant de IReadDescriptor
Ajoutez le socket à réacteur.addReader Le la classe est ajoutée à l'objet d'écoute du réacteur
main.CONNECTION_LOST est une valeur prédéfinie de twisted Grâce à ces valeurs, nous pouvons contrôler le prochain rappel dans une certaine mesure (semblable à la simulation d'un événement)
Mais la classe MySocket ci-dessus. n'est pas suffisant. Les principaux inconvénients sont :
Nous devons lire les données nous-mêmes, plutôt que le framework les lit pour nous et gère les exceptions
Les E/S réseau et le traitement des données sont mélangés et non séparés
Trois. abstraction torsadée
twisted a établi une abstraction supérieure basée sur le réacteur Pour une connexion réseau, twisted a établi les trois concepts suivants :
Transports : couche de connexion réseau, uniquement responsable de la connexion réseau et de la lecture/écriture. données d'octets
Protocoles : couche de protocole, service des protocoles réseau liés à l'entreprise, convertit le flux d'octets en données requises par l'application
Usines de protocoles : usine de protocoles, responsable de la création des protocoles, chaque connexion réseau a un objet Protocoles (car il est nécessaire de sauvegarder l'état d'analyse du protocole)
Ces concepts de twisted sont très similaires au framework de réseau ranch d'Erlang. Le framework ranch résume également les concepts de transports et de protocoles. Lorsqu'il y a une nouvelle connexion réseau, ranch automatiquement. crée des transports et des protocoles, où les protocoles sont transmis par l'utilisateur lors du démarrage du ranch, est un module qui implémente le comportement ranch_protocol Lorsque les protocoles sont initialisés, les transports correspondant à la connexion seront reçus, afin que nous puissions traiter les données du flux d'octets. dans Protocoles, selon notre Le protocole analyse et traite les données. Dans le même temps, les données peuvent être envoyées via les transports (le ranch a déjà lu les données du flux d'octets pour vous).
Semblable à Ranch, Twisted créera également des protocoles et passera dans le transport lorsqu'une nouvelle connexion arrive. Twisted nous aidera à lire les données du flux d'octets. Il nous suffit de traiter le flux d'octets dans l'interface dataReceived (self, data). .Juste des données. À l'heure actuelle, twisted peut être considéré comme véritablement asynchrone dans les E/S du réseau. Il nous aide à gérer les E/S du réseau et les exceptions possibles, et sépare les E/S du réseau et le traitement des données, en les abstraits dans les transports et les protocoles, ce qui améliore l'efficacité du programme. et robustesse.
# 示例二 twisted抽象的使用 from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory class MyProtocol(Protocol): # 接口: Protocols初始化时调用,并传入Transports # 另外 twisted会自动将Protocols的factory对象成员设为ProtocolsFactory实例的引用 # 如此就可以通过factory来与MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有数据到达 def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 连接断开 def connectionLost(self, reason): # 连接断开的处理 class MyProtocolFactory(ClientFactory): # 接口: 通过protocol类成员指出需要创建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在创建Protocols的回调 def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在这里对proto做一些初始化.... return proto # 接口: 连接Server失败时的回调 def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 连接服务端时传入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()
L'exemple 2 est beaucoup plus simple et plus clair que l'exemple 1, car il n'a pas besoin de gérer les E/S réseau et la logique est plus claire. En fait, ClientFactory et Protocol fournissent plus d'interfaces pour une implémentation plus flexible. . Contrôle logique puissant, veuillez vous référer au code source torsadé pour les interfaces spécifiques.
四. twisted Deferred
twisted Deferred对象用于解决这样的问题:有时候我们需要在ProtocolsFactory中嵌入自己的回调,以便Protocols中发生某个事件(如所有Protocols都处理完成)时,回调我们指定的函数(如TaskFinished)。如果我们自己来实现回调,需要处理几个问题:
如何区分回调的正确返回和错误返回?(我们在使用异步调用时,要尤其注意错误返回的重要性)
如果我们的正确返回和错误返回都需要执行一个公共函数(如关闭连接)呢?
如果保证该回调只被调用一次?
Deferred对象便用于解决这种问题,它提供两个回调链,分别对应于正确返回和错误返回,在正确返回或错误返回时,它会依次调用对应链中的函数,并且保证回调的唯一性。
d = Deferred() # 添加正确回调和错误回调 d.addCallbacks(your_ok_callback, your_err_callback) # 添加公共回调函数 d.addBoth(your_common_callback) # 正确返回 将依次调用 your_ok_callback(Res) -> common_callback(Res) d.callback(Res) # 错误返回 将依次调用 your_err_callback(Err) -> common_callback(Err) d.errback(Err) # 注意,对同一个Defered对象,只能返回一次,尝试多次返回将会报错
twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。
五.综合示例
下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我这里先做别的事情" print de print "go go end"
更多剖析Python的Twisted框架的核心特性相关文章请关注PHP中文网!