>백엔드 개발 >파이썬 튜토리얼 >Python의 메시지 대기열 패키지 SnakeMQ는 다음을 사용합니다.

Python의 메시지 대기열 패키지 SnakeMQ는 다음을 사용합니다.

高洛峰
高洛峰원래의
2017-03-01 14:06:541516검색

메시지 대기열을 사용하면 데이터 통신에 많은 이점이 있습니다. SnakeMQ는 Python으로 구현된 오픈 소스 크로스 플랫폼 MQ 라이브러리입니다. Python의 메시지 대기열 패키지 SnakeMQ 사용에 대한 예비 연구는 다음과 같습니다.

1. snakemq에 대한 공식 소개
SnakeMQ의 GitHub 프로젝트 페이지: https://github.com/dsiroky/snakemq1. 순수 Python 구현, 크로스 플랫폼

2. 연결 다시 시작

3. 안정적인 전송-구성 가능한 메시지 모드 및 메시지 시간 초과 모드

4. 영구/임시 대기열

5. 비동기식 지원--폴링()

6.대칭 -- 단일 TCP 연결을 이중 통신에 사용할 수 있습니다.

7. 다중 데이터베이스 지원 -- SQLite, MongoDB...

8.brokerless - 유사 ZeroMQ 구현 원리

9. 확장 모듈: RPC, 대역폭 조절

위 내용은 모두 공식적인 단어이며 직접 확인해야 합니다.


2. 여러 주요 문제 설명

1. 자동 재접속 지원, 하트비트 로직을 직접 작성할 필요가 없습니다. 보내고 받는 데만 집중하세요

2. 데이터 지속성을 지원합니다. 지속성이 시작되면 재접속 후 자동으로 데이터가 전송됩니다.

3. Snakemq는 콜백을 제공하여 데이터 수신을 구현합니다. 수신 메소드를 작성하고 콜백 목록에 추가하기만 하면 됩니다.

4. 여기에 전송되는 데이터는 바이트형(바이너리)이므로 변환이 필요합니다. 프로그램에서 테스트하는 것은 모두 텍스트 문자열입니다. str.encode('utf-8')를 사용하여 이를 바이트로 변환한 다음 수신 시 다시 변환합니다.

5. 용어 설명, 커넥터: 소켓과 유사한 TcpClient, 소켓과 유사한 TcpServer. 각 커넥터 또는 리스너는 데이터를 보내고 받을 때 누구의 데이터인지 알 수 있습니다.

6. sqlite 지속성을 사용하는 경우 sqlite에 대한 멀티 스레드 액세스 문제를 해결하려면 소스 코드 sqlite3.connect(filename, check_same_thread = False)를 수정해야 합니다. (교착상태가 발생할까요?)

7. 지속성 시작 시 연결이 다시 연결되면 안정성을 보장하기 위해 자동으로 전송됩니다.

8. Encapsulation을 위해 데이터를 받은 후 콜백을 통해 보낸다.


3. 코드

코드에 커스텀 로그 모듈을 사용했다는 설명

from common import nxlogger

import snakemqlogger as logger

는 로깅으로 대체 가능합니다.

콜백 클래스(callbacks.py):

# -*- coding:utf-8 -*-

'''synchronized callback'''

class Callback(object):

  def __init__(self):

    self.callbacks = []

 

  def add(self, func):

    self.callbacks.append(func)

 

  def remove(self, func):

    self.callbacks.remove(func)

 

  def __call__(self, *args, **kwargs):

    for callback in self.callbacks:

      callback(*args, **kwargs)

커넥터 클래스(snakemqConnector.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from snakemq.storage.sqlite import SqliteQueuesStorage

from snakemq.message import FLAG_PERSISTENT

from common.callbacks import Callback

 

from common import nxlogger

import snakemqlogger as logger

 

class SnakemqConnector(threading.Thread):

     def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):

         super(SnakemqConnector,self).__init__()

         self.messaging = None

         self.link = None

         self.snakemqident = snakemqident

         self.pktr = None

         self.remoteIp = remoteIp

         self.remotePort = remotePort

         self.persistent = persistent

         self.on_recv = Callback()

         self._initConnector()

 

     def run(self):

         logger.info("connector start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("connector end...")

    

     def terminate(self):

         logger.info("connetor terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("connetor terminated")

 

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

         except Exception as e:

              logger.error("connector recv:{0}".format(e))

              print(e)

 

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("connector:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

 

     '''

    

     '''

     def _initConnector(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_connector((self.remoteIp, self.remotePort))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

             

         except Exception as e:

              logger.error("connector:{0}".format(e))

         finally:

              logger.info("connector[{0}] loop ended...".format(self.snakemqident))

리스너 클래스(snakemqListener.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from common import nxlogger

import snakemqlogger as logger

from common.callbacks import Callback

class SnakemqListener(threading.Thread):

     def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):

         super(SnakemqListener,self).__init__()

         self.messaging = None

         self.link = None

         self.pktr = None

         self.snakemqident = snakemqident

         self.ip = ip;

         self.port = port

         self.connectors = {}

         self.on_recv = Callback()

         self.persistent = persistent

         self._initlistener()

 

     '''

     thread run

     '''

     def run(self):

         logger.info("listener start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("listener end...")

 

     '''

     terminate snakemq listener thread

     '''

     def terminate(self):

         logger.info("listener terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("listener terminated")

 

     '''

     receive message from host named ident

     '''

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

              self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))

         except Exception as e:

              logger.error("listener recv:{0}".format(e))

              print(e)

 

     def on_drop_message(self, ident, message):

         print("message dropped", ident, message)

         logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))

 

     '''client connect'''

     def on_connect(self, ident):

         logger.debug("listener:{0} connected".format(ident))

         self.connectors[ident] = ident

         self.sendMsg(ident, "hello".encode('utf-8'))

 

     '''client disconnect'''

     def on_disconnect(self, ident):

         logger.debug("listener:{0} disconnected".format(ident))

         if ident in self.connectors:

              self.connectors.pop(ident)

 

     '''

     listen start loop

     '''

     def _initlistener(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_listener((self.ip, self.port))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

              self.pktr.on_connect.add(self.on_connect)

              self.pktr.on_disconnect.add(self.on_disconnect)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

              self.messaging.on_message_drop.add(self.on_drop_message)

 

         except Exception as e:

              logger.error("listener:{0}".format(e))

         finally:

              logger.info("listener:loop ended...")

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("listener:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

테스트 코드 커넥터(testSnakeConnector.py):

로컬 1M 파일을 읽은 다음 이를 리스너에게 보내고 리스너는 다시 hello 메시지를 보냅니다.

from netComm.snakemq import snakemqConnector

import time

import sys

import os

def received(ident, data):

     print(data)

 

if __name__ == "__main__":

     bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)

     bob.on_recv.add(received)

     bob.start()

     try:

         with open("testfile.txt",encoding='utf-8') as f:

              txt = f.read()

              for i in range(100):

                  bob.sendMsg("niess",txt.encode('utf-8'))

                  time.sleep(0.1)

     except Exception as e:

         print(e)

     time.sleep(5)

     bob.terminate()   

 

测试代码listener(testSnakeListener.py):

from netComm.snakemq import snakemqListener

import time

 

def received(ident, data):

     filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))

     file = open(filename,'w')

     file.writelines(data)

     file.close()

 

if __name__ == "__main__":

     niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)

     niess.on_recv.add(received)

     niess.start()

     print("niess start...")

     time.sleep(60)

     niess.terminate()  

     print("niess end...")


Python의 메시지 대기열 패키지 SnakeMQ 사용과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트를 참고하세요. !

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.