이전 기사에서는 Rabbitmq 설치와 고전적인 Hello World를 소개했습니다! 예. 여기서는 작업 대기열에 대해 살펴보겠습니다. 이전 글과 이어지는 내용이기 때문에, 이전 글을 읽지 않으신 분들은 이 글을 이해하기 어려울 수도 있습니다. 이전 글의 주소는 우분투에 Rabbitmq와 Python을 설치하는 방법
메시지도 작업으로 이해할 수 있고, 메시지 보낸 사람은 작업 할당자로 이해할 수 있습니다. 작업자가 작업을 수신하고 완료하지 않은 경우 작업 할당자가 다른 작업을 보내는데 너무 바빠서 이러한 작업을 함께 처리하려면 여러 작업자가 필요합니다. 작업 대기열이라고 합니다. 구조도는 다음과 같습니다.
rabbitmq의 Python 인스턴스 작업 대기열
준비
예제 프로그램에서는 new_task.py를 사용하여 작업 할당자를 시뮬레이션하고 Worker.py를 사용하여 작업자를 시뮬레이션합니다.
send.py를 수정하고, 명령줄 매개변수에서 정보를 수신하고,
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
보내기 receive.py의 콜백 함수를 수정합니다.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
여기서 두 개의 터미널을 엽니다. 둘 다 Worker.py를 실행하고 수신 대기 상태입니다. 이는 두 명의 작업자와 동일합니다. 세 번째 터미널을 열고 new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
를 실행하여 작업자.py가 작업을 받는지 확인하세요. 한 작업자가
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
작업 2개를 받았습니다:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
위 관점에서 볼 때, 각 작업자에게는 차례로 작업이 할당됩니다. 따라서 작업자가 작업을 처리하다가 사망하면 해당 작업은 완료되지 않고 다른 작업자에게 넘겨져야 합니다. 따라서 작업자가 작업을 완료하면 피드백을 제공하는 메커니즘이 있어야 합니다.
메시지 확인
메시지 확인은 작업자가 작업을 완료하면 Rabbitmq로 피드백됩니다. Worker.py에서 콜백 함수를 수정합니다.
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
여기에서 5초 동안 잠시 멈춰서 Ctrl+C를 누르면 쉽게 종료됩니다.
no_ack=True 매개변수를 제거하거나 False로 설정할 수도 있습니다.
channel.basic_consume(callback, queue='hello', no_ack=False)
이 코드로 실행하면 작업자 중 하나가 ctrl+c를 눌러 종료하더라도 실행 중인 작업이 손실되지 않으며 Rabbitmq는 해당 작업을 다른 작업자에게 재배포합니다.
메시지 내구성(Message Durability)
메시지 피드백 메커니즘이 있지만, Rabbitmq 자체가 정지되는 경우 떨어뜨려도 임무는 계속 사라집니다. 따라서 작업은 지속적으로 저장되어야 합니다. 영구 저장소 선언:
channel.queue_declare(queue='hello', durable=True)
그러나 hello 대기열이 이미 존재하고 비영구적이므로 RabbitMQ는 다른 매개변수를 사용하여 기존 대기열을 재정의하는 것을 허용하지 않으므로 이 프로그램은 오류를 실행합니다. 대기열 재정의:
channel.queue_declare(queue='task_queue', durable=True)
작업을 보낼 때 Delivery_mode=2를 사용하여 작업을 영구 저장소로 표시합니다.
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))
공정한 파견
위의 예에서는 각 작업자가 차례로 작업에 할당되지만 각 작업이 반드시 동일하지는 않습니다. 일부 작업은 더 무겁고 실행하는 데 더 오랜 시간이 걸릴 수 있으며, 일부 작업은 더 가벼워서 실행하는 데 더 짧은 시간이 걸릴 수 있습니다. 공정하게 예약할 수 있다면 가장 좋을 것입니다. Rabbitmq가 작업자에게 동시에 여러 작업을 할당하지 않도록 basic_qos를 사용하여 prefetch_count=1을 설정하세요. 즉, 작업자가 작업을 완료한 후에만 작업을 다시 받게 됩니다. .
channel.basic_qos(prefetch_count=1)
new_task.py 완성코드
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py 완성코드
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
위는 Python Rabbitmq를 사용한 내용입니다(2). PHP 중국어 사이트(www.php.cn)를 주목하세요!