>  기사  >  백엔드 개발  >  Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현

Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현

高洛峰
高洛峰원래의
2017-03-01 14:04:501562검색

rabbitmq의 중국어 번역은 주로 mq: Message Queue라는 글자를 의미하며 이는 메시지 대기열을 의미합니다. 앞에 토끼를 뜻하는 "rabbit"이라는 단어도 있는데, 파이썬이라는 파이썬 언어와 똑같습니다. Rabbitmq 서비스는 mysql 및 apache 서비스와 유사하지만 제공되는 기능이 다릅니다. rabbimq는 다양한 애플리케이션 간 통신에 사용할 수 있는 메시지 전송 서비스를 제공하는 데 사용됩니다.

rabbitmq 설치
우분투 12.04에서는 apt-get을 통해 직접 설치할 수 있습니다.

sudo apt-get install rabbitmq-server

설치가 완료되면 Rabbitmq 서비스가 시작되었습니다. 다음으로 Python으로 Hello World!를 작성하는 예를 살펴보겠습니다. 예제의 내용은 send.py에서 Rabbitmq로 "Hello World!"를 보내고, receive.py는 Rabbitmq에서 send.py가 보낸 정보를 받습니다.

Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현

P는 생산자를 의미하며, sender라고도 할 수 있습니다. 예에서는 send.py로 표시됩니다. C는 소비자를 의미합니다. , 이는 소비자를 의미하며 수신자라고도 하며, 예에서 receive.py로 표시됩니다. 가운데 빨간색은 예에서 hello 큐로 표시되는 큐의 의미를 나타냅니다.

Python은 기성 클래스 라이브러리 pika, txAMQP 또는 py-amqplib를 사용할 수 있습니다. 여기서는 pika를 선택했습니다.

pika 설치

pip를 사용하면 됩니다. pip는 Python 소프트웨어 관리 패키지입니다. 설치되지 않은 경우 apt-를 통해 설치할 수 있습니다. get

sudo apt-get install python-pip

pip를 통해 pika 설치:

sudo pip install pika

send.py code

rabbitmq 서버에 연결합니다. 로컬에서 테스트되었으므로 localhost를 사용하면 됩니다.

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

메시지가 전달될 메시지 대기열을 선언합니다. 존재하지 않는 대기열에 메시지가 전송되면 Rabbitmq는 자동으로 이러한 메시지를 지웁니다.

channel.queue_declare(queue='hello')

위에 선언된 hello 대기열로 메시지를 보냅니다. 여기서 exchange는 메시지를 보낼 대기열을 정확하게 지정할 수 있는 exchanger를 나타내며, Routing_key는 는 대기열 이름으로 설정되고 본문은 전송될 콘텐츠이므로 지금은 구체적인 전송 세부정보에 주의를 기울이지 않겠습니다.

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

연결 종료

connection.close()

전체 코드

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

먼저 이 프로그램을 실행해 보세요. 실행이 성공하면 Rabbitmqctl이 hello 대기열을 성공적으로 추가해야 하며, Rabbitmqctl 명령을 사용하여 확인해야 합니다

rabbitmqctl list_queues

작성자의 컴퓨터에 다음 정보를 출력합니다:

Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현


네 안녕하세요 큐가 있고, 큐에 메시지가 있습니다. 다음으로 receive.py를 사용하여 대기열에 있는 정보를 가져옵니다.

receive.py 코드

는 send.py의 이전 두 단계와 동일합니다. 두 단계 모두 먼저 서버에 연결한 다음 메시지 대기열을 선언해야 합니다. , 여기서는 논의되지 않습니다. 동일한 코드가 게시되었습니다.

메시지 수신은 더 복잡하며 이를 처리하려면 콜백 함수를 정의해야 합니다. 여기서 콜백 함수는 정보를 인쇄하는 것입니다.

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

rabbitmq에게 콜백을 사용하여 정보를 수신하도록 지시합니다.

channel.basic_consume(callback, queue='hello', no_ack=True)

정보 수신을 시작합니다. Blocking 상태로 진입하면 Queue에 정보가 있는 경우에만 처리를 위해 콜백이 호출됩니다. 종료하려면 Ctrl+C를 누르세요.

channel.start_consuming()

완전한 코드

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

프로그램을 실행하면 받을 수 있습니다. 대기열의 hello 메시지가 화면에 인쇄됩니다. 터미널을 변경하고 다시 send.py를 실행하면 receive.py가 다시 정보를 받는 것을 볼 수 있습니다.

작업 대기열 예시

1. 준비

예제 프로그램에서는 new_task .py를 사용하여 시뮬레이션합니다. 작업자를 시뮬레이션하기 위한 작업 할당자와 작업자.py.

send.py를 수정하여 명령줄 매개변수에서 정보를 받고

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

receive.py의 콜백 함수를 수정합니다.

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

여기서 먼저 두 개의 터미널을 엽니다. 둘 다 work.py를 실행하고 수신 대기 상태입니다. 이쪽은 두 명의 작업자와 같습니다. 세 번째 터미널을 열고 new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

를 실행하고 작업자.py가 작업을 받고 한 작업자가 3개의 작업을 받는 것을 관찰합니다.

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

다른 작업자가 2개의 작업을 받았습니다:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()


更多Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현相关文章请关注PHP中文网!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.