Home >Backend Development >Python Tutorial >Python's message queue package SnakeMQ is used

Python's message queue package SnakeMQ is used

高洛峰
高洛峰Original
2017-03-01 14:06:541478browse

Using message queues has many advantages in data communication. SnakeMQ is an open source cross-platform MQ library implemented in Python. Well, a preliminary study on the use of Python's message queue package SnakeMQ, here we go:

1. Official introduction about snakemq
SnakeMQ’s GitHub project page: https://github.com/dsiroky/snakemq1. Pure python implementation, cross-platform

2. Automatic restart Connection

3. Reliable sending--configurable message mode and message timeout mode

4. Persistent/temporary two queues

5. Support asynchronous--poll ()

6.symmetrical -- A single TCP connection can be used for duplex communication

7.Multiple database support--SQLite, MongoDB...

8.brokerless - similar The implementation principle of ZeroMQ

9. Extension modules: RPC, bandwidth throttling

The above are all official words. You need to verify them by yourself. I encapsulated them and it feels cute.


2. Description of several major issues

1. Supports automatic reconnection. You don’t need to write the heartbeat logic yourself. Just focus on sending and receiving

2. Support data persistence. If persistence is started, data will be sent automatically after reconnection.

3. Snakemq implements data reception by providing callbacks. You only need to write a receiving method and add it to the callback list.

4. When sending data, all data sent here are bytes type (binary), so conversion is required. What I test in the program are all text strings. I use str.encode('utf-8') to convert them into bytes, and then convert them back when receiving.

5. Terminology explanation, Connector: TcpClient similar to socket, Listener: TcpServer similar to socket. Each connector or listener has an ident identification. When sending and receiving data, you will know whose data it is. .

6. When using sqlite persistence, you need to modify the source code, sqlite3.connect(filename, check_same_thread = False), to solve the problem of multi-threaded access to sqlite. (Will there be a deadlock?)

7. When starting persistence, if the connection is reconnected, it will be sent automatically to ensure reliability.

8. For the purpose of encapsulation, after receiving the data, I send it out through callback.


3. Code

Explanation The custom log module is used in the code

from common import nxlogger

import snakemqlogger as logger

can be replaced by logging.

Callback class (callbacks.py):

# -*- coding:utf-8 -*-

'''synchronized callback'''

class Callback(object):

  def __init__(self):

    self.callbacks = []

 

  def add(self, func):

    self.callbacks.append(func)

 

  def remove(self, func):

    self.callbacks.remove(func)

 

  def __call__(self, *args, **kwargs):

    for callback in self.callbacks:

      callback(*args, **kwargs)

Connector class (snakemqConnector.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from snakemq.storage.sqlite import SqliteQueuesStorage

from snakemq.message import FLAG_PERSISTENT

from common.callbacks import Callback

 

from common import nxlogger

import snakemqlogger as logger

 

class SnakemqConnector(threading.Thread):

     def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):

         super(SnakemqConnector,self).__init__()

         self.messaging = None

         self.link = None

         self.snakemqident = snakemqident

         self.pktr = None

         self.remoteIp = remoteIp

         self.remotePort = remotePort

         self.persistent = persistent

         self.on_recv = Callback()

         self._initConnector()

 

     def run(self):

         logger.info("connector start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("connector end...")

    

     def terminate(self):

         logger.info("connetor terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("connetor terminated")

 

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

         except Exception as e:

              logger.error("connector recv:{0}".format(e))

              print(e)

 

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("connector:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

 

     '''

    

     '''

     def _initConnector(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_connector((self.remoteIp, self.remotePort))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

             

         except Exception as e:

              logger.error("connector:{0}".format(e))

         finally:

              logger.info("connector[{0}] loop ended...".format(self.snakemqident))

Listener class (snakemqListener.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from common import nxlogger

import snakemqlogger as logger

from common.callbacks import Callback

class SnakemqListener(threading.Thread):

     def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):

         super(SnakemqListener,self).__init__()

         self.messaging = None

         self.link = None

         self.pktr = None

         self.snakemqident = snakemqident

         self.ip = ip;

         self.port = port

         self.connectors = {}

         self.on_recv = Callback()

         self.persistent = persistent

         self._initlistener()

 

     '''

     thread run

     '''

     def run(self):

         logger.info("listener start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("listener end...")

 

     '''

     terminate snakemq listener thread

     '''

     def terminate(self):

         logger.info("listener terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("listener terminated")

 

     '''

     receive message from host named ident

     '''

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

              self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))

         except Exception as e:

              logger.error("listener recv:{0}".format(e))

              print(e)

 

     def on_drop_message(self, ident, message):

         print("message dropped", ident, message)

         logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))

 

     '''client connect'''

     def on_connect(self, ident):

         logger.debug("listener:{0} connected".format(ident))

         self.connectors[ident] = ident

         self.sendMsg(ident, "hello".encode('utf-8'))

 

     '''client disconnect'''

     def on_disconnect(self, ident):

         logger.debug("listener:{0} disconnected".format(ident))

         if ident in self.connectors:

              self.connectors.pop(ident)

 

     '''

     listen start loop

     '''

     def _initlistener(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_listener((self.ip, self.port))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

              self.pktr.on_connect.add(self.on_connect)

              self.pktr.on_disconnect.add(self.on_disconnect)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

              self.messaging.on_message_drop.add(self.on_drop_message)

 

         except Exception as e:

              logger.error("listener:{0}".format(e))

         finally:

              logger.info("listener:loop ended...")

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("listener:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

Test code connector (testSnakeConnector.py):

Read a local 1M file, then send it to the listener, and then the listener sends back a hello message.

from netComm.snakemq import snakemqConnector

import time

import sys

import os

def received(ident, data):

     print(data)

 

if __name__ == "__main__":

     bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)

     bob.on_recv.add(received)

     bob.start()

     try:

         with open("testfile.txt",encoding='utf-8') as f:

              txt = f.read()

              for i in range(100):

                  bob.sendMsg("niess",txt.encode('utf-8'))

                  time.sleep(0.1)

     except Exception as e:

         print(e)

     time.sleep(5)

     bob.terminate()   

 

测试代码listener(testSnakeListener.py):

from netComm.snakemq import snakemqListener

import time

 

def received(ident, data):

     filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))

     file = open(filename,'w')

     file.writelines(data)

     file.close()

 

if __name__ == "__main__":

     niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)

     niess.on_recv.add(received)

     niess.start()

     print("niess start...")

     time.sleep(60)

     niess.terminate()  

     print("niess end...")


For more articles related to the use of Python’s message queue package SnakeMQ, please pay attention to the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn