rabbitmq의 중국어 번역은 주로 mq: Message Queue라는 글자를 의미하며 이는 메시지 대기열을 의미합니다. 앞에 토끼를 뜻하는 "rabbit"이라는 단어도 있는데, 파이썬이라는 파이썬 언어와 똑같습니다. Rabbitmq 서비스는 mysql 및 apache 서비스와 유사하지만 제공되는 기능이 다릅니다. rabbimq는 다양한 애플리케이션 간 통신에 사용할 수 있는 메시지 전송 서비스를 제공하는 데 사용됩니다.
rabbitmq 설치
우분투 12.04에서는 apt-get을 통해 직접 설치할 수 있습니다.
sudo apt-get install rabbitmq-server
설치가 완료되면 Rabbitmq 서비스가 시작되었습니다. 다음으로 Python으로 Hello World!를 작성하는 예를 살펴보겠습니다. 예제의 내용은 send.py에서 Rabbitmq로 "Hello World!"를 보내고, receive.py는 Rabbitmq에서 send.py가 보낸 정보를 받습니다.
P는 생산자를 의미하며, sender라고도 할 수 있습니다. 예에서는 send.py로 표시됩니다. C는 소비자를 의미합니다. , 이는 소비자를 의미하며 수신자라고도 하며, 예에서 receive.py로 표시됩니다. 가운데 빨간색은 예에서 hello 큐로 표시되는 큐의 의미를 나타냅니다.
Python은 기성 클래스 라이브러리 pika, txAMQP 또는 py-amqplib를 사용할 수 있습니다. 여기서는 pika를 선택했습니다.
pika 설치
pip를 사용하면 됩니다. pip는 Python 소프트웨어 관리 패키지입니다. 설치되지 않은 경우 apt-를 통해 설치할 수 있습니다. get
sudo apt-get install python-pip
pip를 통해 pika 설치:
sudo pip install pika
send.py code
rabbitmq 서버에 연결합니다. 로컬에서 테스트되었으므로 localhost를 사용하면 됩니다.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
메시지가 전달될 메시지 대기열을 선언합니다. 존재하지 않는 대기열에 메시지가 전송되면 Rabbitmq는 자동으로 이러한 메시지를 지웁니다.
channel.queue_declare(queue='hello')
위에 선언된 hello 대기열로 메시지를 보냅니다. 여기서 exchange는 메시지를 보낼 대기열을 정확하게 지정할 수 있는 exchanger를 나타내며, Routing_key는 는 대기열 이름으로 설정되고 본문은 전송될 콘텐츠이므로 지금은 구체적인 전송 세부정보에 주의를 기울이지 않겠습니다.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
연결 종료
connection.close()
전체 코드
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
먼저 이 프로그램을 실행해 보세요. 실행이 성공하면 Rabbitmqctl이 hello 대기열을 성공적으로 추가해야 하며, Rabbitmqctl 명령을 사용하여 확인해야 합니다
rabbitmqctl list_queues
작성자의 컴퓨터에 다음 정보를 출력합니다:
네 안녕하세요 큐가 있고, 큐에 메시지가 있습니다. 다음으로 receive.py를 사용하여 대기열에 있는 정보를 가져옵니다.
receive.py 코드
는 send.py의 이전 두 단계와 동일합니다. 두 단계 모두 먼저 서버에 연결한 다음 메시지 대기열을 선언해야 합니다. , 여기서는 논의되지 않습니다. 동일한 코드가 게시되었습니다.
메시지 수신은 더 복잡하며 이를 처리하려면 콜백 함수를 정의해야 합니다. 여기서 콜백 함수는 정보를 인쇄하는 것입니다.
def callback(ch, method, properties, body): print "Received %r" % (body,)
rabbitmq에게 콜백을 사용하여 정보를 수신하도록 지시합니다.
channel.basic_consume(callback, queue='hello', no_ack=True)
정보 수신을 시작합니다. Blocking 상태로 진입하면 Queue에 정보가 있는 경우에만 처리를 위해 콜백이 호출됩니다. 종료하려면 Ctrl+C를 누르세요.
channel.start_consuming()
완전한 코드
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
프로그램을 실행하면 받을 수 있습니다. 대기열의 hello 메시지가 화면에 인쇄됩니다. 터미널을 변경하고 다시 send.py를 실행하면 receive.py가 다시 정보를 받는 것을 볼 수 있습니다.
작업 대기열 예시
1. 준비
예제 프로그램에서는 new_task .py를 사용하여 시뮬레이션합니다. 작업자를 시뮬레이션하기 위한 작업 할당자와 작업자.py.
send.py를 수정하여 명령줄 매개변수에서 정보를 받고
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
receive.py의 콜백 함수를 수정합니다.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
여기서 먼저 두 개의 터미널을 엽니다. 둘 다 work.py를 실행하고 수신 대기 상태입니다. 이쪽은 두 명의 작업자와 같습니다. 세 번째 터미널을 열고 new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
를 실행하고 작업자.py가 작업을 받고 한 작업자가 3개의 작업을 받는 것을 관찰합니다.
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
다른 작업자가 2개의 작업을 받았습니다:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ 환경 배포 및 작업 대기열 구현相关文章请关注PHP中文网!