Heim > Artikel > Backend-Entwicklung > Es wird das Nachrichtenwarteschlangenpaket SnakeMQ von Python verwendet
Die Verwendung von Nachrichtenwarteschlangen bietet viele Vorteile bei der Datenkommunikation. Nun, eine vorläufige Studie zur Verwendung des Nachrichtenwarteschlangenpakets SnakeMQ von Python:
1. Offizielle Einführung in SnakeMQ
SnakeMQs GitHub-Projektseite: https://github.com/dsiroky/snakemq1. Reine Python-Implementierung, plattformübergreifend
2 Verbindung neu starten
Zuverlässiges Senden – konfigurierbarer Nachrichtenmodus und Nachrichten-Timeout-Modus
4. Unterstützt asynchrone Abfrage ()
6.symmetrisch – eine einzelne TCP-Verbindung kann für Duplex-Kommunikation verwendet werden
7. Unterstützung mehrerer Datenbanken – SQLite, MongoDB...
8.Brokerless – ähnlich Implementierungsprinzip von ZeroMQ
9. Erweiterungsmodule: RPC, Bandbreitendrosselung
Die oben genannten sind alles offizielle Worte und müssen von Ihnen selbst überprüft werden, und es fühlt sich süß an.
1. Unterstützt die automatische Wiederverbindung, Sie müssen keine Heartbeat-Logik schreiben Konzentrieren Sie sich einfach auf das Senden und Empfangen
2. Unterstützt die Datenpersistenz. Wenn die Persistenz gestartet ist, werden die Daten nach der erneuten Verbindung automatisch gesendet.
3. Snakemq implementiert den Datenempfang durch die Bereitstellung von Rückrufen. Sie müssen lediglich eine Empfangsmethode schreiben und diese zur Rückrufliste hinzufügen.
4. Die hier gesendeten Daten sind vom Typ Bytes (binär) und müssen daher konvertiert werden. Was ich im Programm teste, sind alle Textzeichenfolgen, die ich mit str.encode('utf-8') in Bytes konvertiere und beim Empfang wieder zurückkonvertiere.
5. Terminologieerklärung, Connector: TcpClient ähnlich wie Socket. Jeder Connector oder Listener verfügt über eine Identität. Beim Senden und Empfangen von Daten wissen Sie, um wessen Daten es sich handelt.
6. Wenn Sie SQLite-Persistenz verwenden, müssen Sie den Quellcode sqlite3.connect(filename, check_same_thread = False) ändern, um das Problem des Multithread-Zugriffs auf SQLite zu lösen. (Wird es zu einem Deadlock kommen?)
7. Wenn die Verbindung wieder hergestellt wird, wird sie automatisch gesendet, um die Zuverlässigkeit sicherzustellen.
8. Zum Zweck der Kapselung sende ich die Daten nach Erhalt per Rückruf aus.
Erklärung, dass das benutzerdefinierte Protokollmodul im Code verwendet wird
from common import nxlogger import snakemqlogger as logger
kann durch Protokollierung ersetzt werden.
Callback-Klasse (callbacks.py):
# -*- coding:utf-8 -*- '''synchronized callback''' class Callback(object): def __init__(self): self.callbacks = [] def add(self, func): self.callbacks.append(func) def remove(self, func): self.callbacks.remove(func) def __call__(self, *args, **kwargs): for callback in self.callbacks: callback(*args, **kwargs)
Connector-Klasse (snakemqConnector.py):
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from snakemq.storage.sqlite import SqliteQueuesStorage from snakemq.message import FLAG_PERSISTENT from common.callbacks import Callback from common import nxlogger import snakemqlogger as logger class SnakemqConnector(threading.Thread): def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False): super(SnakemqConnector,self).__init__() self.messaging = None self.link = None self.snakemqident = snakemqident self.pktr = None self.remoteIp = remoteIp self.remotePort = remotePort self.persistent = persistent self.on_recv = Callback() self._initConnector() def run(self): logger.info("connector start...") if self.link != None: self.link.loop() logger.info("connector end...") def terminate(self): logger.info("connetor terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("connetor terminated") def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data except Exception as e: logger.error("connector recv:{0}".format(e)) print(e) '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("connector:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg) ''' ''' def _initConnector(self): try: self.link = snakemq.link.Link() self.link.add_connector((self.remoteIp, self.remotePort)) self.pktr = snakemq.packeter.Packeter(self.link) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) except Exception as e: logger.error("connector:{0}".format(e)) finally: logger.info("connector[{0}] loop ended...".format(self.snakemqident))
Listener-Klasse (snakemqListener.py):
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from common import nxlogger import snakemqlogger as logger from common.callbacks import Callback class SnakemqListener(threading.Thread): def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False): super(SnakemqListener,self).__init__() self.messaging = None self.link = None self.pktr = None self.snakemqident = snakemqident self.ip = ip; self.port = port self.connectors = {} self.on_recv = Callback() self.persistent = persistent self._initlistener() ''' thread run ''' def run(self): logger.info("listener start...") if self.link != None: self.link.loop() logger.info("listener end...") ''' terminate snakemq listener thread ''' def terminate(self): logger.info("listener terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("listener terminated") ''' receive message from host named ident ''' def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8')) except Exception as e: logger.error("listener recv:{0}".format(e)) print(e) def on_drop_message(self, ident, message): print("message dropped", ident, message) logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message)) '''client connect''' def on_connect(self, ident): logger.debug("listener:{0} connected".format(ident)) self.connectors[ident] = ident self.sendMsg(ident, "hello".encode('utf-8')) '''client disconnect''' def on_disconnect(self, ident): logger.debug("listener:{0} disconnected".format(ident)) if ident in self.connectors: self.connectors.pop(ident) ''' listen start loop ''' def _initlistener(self): try: self.link = snakemq.link.Link() self.link.add_listener((self.ip, self.port)) self.pktr = snakemq.packeter.Packeter(self.link) self.pktr.on_connect.add(self.on_connect) self.pktr.on_disconnect.add(self.on_disconnect) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) self.messaging.on_message_drop.add(self.on_drop_message) except Exception as e: logger.error("listener:{0}".format(e)) finally: logger.info("listener:loop ended...") '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("listener:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg)
Testcode-Connector (testSnakeConnector.py ):
Lesen Sie eine lokale 1M-Datei, senden Sie sie dann an den Listener, und dann sendet der Listener eine Hallo-Nachricht zurück.
from netComm.snakemq import snakemqConnector import time import sys import os def received(ident, data): print(data) if __name__ == "__main__": bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True) bob.on_recv.add(received) bob.start() try: with open("testfile.txt",encoding='utf-8') as f: txt = f.read() for i in range(100): bob.sendMsg("niess",txt.encode('utf-8')) time.sleep(0.1) except Exception as e: print(e) time.sleep(5) bob.terminate() 测试代码listener(testSnakeListener.py): from netComm.snakemq import snakemqListener import time def received(ident, data): filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime())) file = open(filename,'w') file.writelines(data) file.close() if __name__ == "__main__": niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002) niess.on_recv.add(received) niess.start() print("niess start...") time.sleep(60) niess.terminate() print("niess end...")
Weitere Artikel zur Verwendung des Python-Nachrichtenwarteschlangenpakets SnakeMQ finden Sie unter PHP Chinesische Website!