Maison >développement back-end >Tutoriel Python >Le package de file d'attente de messages de Python, SnakeMQ, est utilisé
L'utilisation de files d'attente de messages présente de nombreux avantages dans la communication de données. SnakeMQ est une bibliothèque MQ multiplateforme open source implémentée en Python. Eh bien, une étude préliminaire sur l'utilisation du package de file d'attente de messages de Python, SnakeMQ, c'est parti :
1. Introduction officielle à Snakemq
Page du projet GitHub de SnakeMQ : https://github.com/dsiroky/snakemq1 Implémentation python pure, multiplateforme
2. redémarrer la connexion
3. Envoi fiable - mode de message configurable et mode d'expiration des messages
4. Files d'attente persistantes/temporaires
Prise en charge asynchrone - sondage ()
6. symétrique -- une seule connexion TCP peut être utilisée pour la communication duplex 7. Prise en charge de plusieurs bases de données -- SQLite, MongoDB...8.brokerless - similaire. principe de mise en œuvre de ZeroMQ9. Modules d'extension : RPC, limitation de bande passanteCe qui précède sont tous des mots officiels et doivent être vérifiés par vous-même. Je l'ai emballé moi-même et c'est mignon.2. Description de plusieurs problèmes majeurs
1. Prise en charge de la reconnexion automatique, pas besoin d'écrire la logique du rythme cardiaque par vous-même, vous Concentrez-vous simplement sur l'envoi et la réception 2. Prend en charge la persistance des données Si la persistance est démarrée, les données seront envoyées automatiquement après la reconnexion. 3. Snakemq implémente la réception des données en fournissant des rappels. Il vous suffit d'écrire une méthode de réception et de l'ajouter à la liste de rappel. 4. Les données envoyées ici sont de type octets (binaires), elles doivent donc être converties. Ce que je teste dans le programme, ce sont toutes les chaînes de texte. J'utilise str.encode('utf-8') pour les convertir en octets, puis les reconvertir lors de la réception. 5. Explication de la terminologie, Connecteur : TcpClient similaire à socket, Listener : TcpServer similaire à socket. Chaque connecteur ou écouteur a une identification lors de l'envoi et de la réception de données, vous saurez de qui il s'agit. 6. Lorsque vous utilisez la persistance SQLite, vous devez modifier le code source, sqlite3.connect(filename, check_same_thread = False), pour résoudre le problème de l'accès multithread à SQLite. (Y aura-t-il une impasse ?) 7. Lors du démarrage de la persistance, si la connexion est reconnectée, elle sera envoyée automatiquement pour garantir la fiabilité. 8. Aux fins de l'encapsulation, une fois les données reçues, je les envoie par rappel.3. Code
Explication selon laquelle le module de journal personnalisé est utilisé dans le code
from common import nxlogger import snakemqlogger as loggerpeut être remplacé par la journalisation. Classe de rappel (callbacks.py) :
# -*- coding:utf-8 -*- '''synchronized callback''' class Callback(object): def __init__(self): self.callbacks = [] def add(self, func): self.callbacks.append(func) def remove(self, func): self.callbacks.remove(func) def __call__(self, *args, **kwargs): for callback in self.callbacks: callback(*args, **kwargs)Classe de connecteur (snakemqConnector.py) :
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from snakemq.storage.sqlite import SqliteQueuesStorage from snakemq.message import FLAG_PERSISTENT from common.callbacks import Callback from common import nxlogger import snakemqlogger as logger class SnakemqConnector(threading.Thread): def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False): super(SnakemqConnector,self).__init__() self.messaging = None self.link = None self.snakemqident = snakemqident self.pktr = None self.remoteIp = remoteIp self.remotePort = remotePort self.persistent = persistent self.on_recv = Callback() self._initConnector() def run(self): logger.info("connector start...") if self.link != None: self.link.loop() logger.info("connector end...") def terminate(self): logger.info("connetor terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("connetor terminated") def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data except Exception as e: logger.error("connector recv:{0}".format(e)) print(e) '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("connector:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg) ''' ''' def _initConnector(self): try: self.link = snakemq.link.Link() self.link.add_connector((self.remoteIp, self.remotePort)) self.pktr = snakemq.packeter.Packeter(self.link) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) except Exception as e: logger.error("connector:{0}".format(e)) finally: logger.info("connector[{0}] loop ended...".format(self.snakemqident))Classe d'écoute (snakemqListener.py) :
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from common import nxlogger import snakemqlogger as logger from common.callbacks import Callback class SnakemqListener(threading.Thread): def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False): super(SnakemqListener,self).__init__() self.messaging = None self.link = None self.pktr = None self.snakemqident = snakemqident self.ip = ip; self.port = port self.connectors = {} self.on_recv = Callback() self.persistent = persistent self._initlistener() ''' thread run ''' def run(self): logger.info("listener start...") if self.link != None: self.link.loop() logger.info("listener end...") ''' terminate snakemq listener thread ''' def terminate(self): logger.info("listener terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("listener terminated") ''' receive message from host named ident ''' def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8')) except Exception as e: logger.error("listener recv:{0}".format(e)) print(e) def on_drop_message(self, ident, message): print("message dropped", ident, message) logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message)) '''client connect''' def on_connect(self, ident): logger.debug("listener:{0} connected".format(ident)) self.connectors[ident] = ident self.sendMsg(ident, "hello".encode('utf-8')) '''client disconnect''' def on_disconnect(self, ident): logger.debug("listener:{0} disconnected".format(ident)) if ident in self.connectors: self.connectors.pop(ident) ''' listen start loop ''' def _initlistener(self): try: self.link = snakemq.link.Link() self.link.add_listener((self.ip, self.port)) self.pktr = snakemq.packeter.Packeter(self.link) self.pktr.on_connect.add(self.on_connect) self.pktr.on_disconnect.add(self.on_disconnect) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) self.messaging.on_message_drop.add(self.on_drop_message) except Exception as e: logger.error("listener:{0}".format(e)) finally: logger.info("listener:loop ended...") '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("listener:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg)Connecteur de code de test (testSnakeConnector.py ): Lisez un fichier local de 1 Mo, puis envoyez-le à l'auditeur, puis l'auditeur renvoie un message de bonjour.
from netComm.snakemq import snakemqConnector import time import sys import os def received(ident, data): print(data) if __name__ == "__main__": bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True) bob.on_recv.add(received) bob.start() try: with open("testfile.txt",encoding='utf-8') as f: txt = f.read() for i in range(100): bob.sendMsg("niess",txt.encode('utf-8')) time.sleep(0.1) except Exception as e: print(e) time.sleep(5) bob.terminate() 测试代码listener(testSnakeListener.py): from netComm.snakemq import snakemqListener import time def received(ident, data): filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime())) file = open(filename,'w') file.writelines(data) file.close() if __name__ == "__main__": niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002) niess.on_recv.add(received) niess.start() print("niess start...") time.sleep(60) niess.terminate() print("niess end...")