메시지 데이터 전송에서 메시지 큐(MQ, Message Queue)의 보존 역할은 데이터 통신을 보장하고 실시간 처리의 편의성을 제공합니다. 여기서는 Python에서 스레드의 MQ 메시지 큐 구현과 분석을 살펴보겠습니다.
"메시지 대기열"은 전송 중에 메시지를 저장하는 컨테이너입니다. 메시지 큐 관리자는 소스에서 대상으로 메시지를 중계할 때 중개자 역할을 합니다. 대기열의 주요 목적은 라우팅을 제공하고 메시지 전달을 보장하는 것입니다. 메시지가 전송될 때 수신자를 사용할 수 없는 경우 Message Queue는 메시지가 성공적으로 전달될 때까지 메시지를 보관합니다. 저는 메시지 대기열이 모든 아키텍처나 애플리케이션에 중요한 구성 요소라고 믿습니다. 다음은 10가지 이유입니다:
Python 메시지 대기열 예:
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """没打印一个数字等待1秒,并发打印10个数字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): whileTrue: #消费者端,从队列中获取num num = self.queue.get() print "i'm num %s"%(num) time.sleep(1) #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号 self.queue.task_done() start = time.time() def main(): #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发 for i in range(10): t = ThreadNum(queue) t.setDaemon(True) t.start() #往队列中填错数据 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
실행 결과:
i'm num 0 i'm num 1 i'm num 2 i'm num 3 i'm num 4 i'm num 5 i'm num 6 i'm num 7 i'm num 8 i'm num 9 Elapsed Time: 1.01399993896
해석:
구체적인 작업 단계는 다음과 같습니다.
1. Queue.Queue() 인스턴스를 생성한 다음 데이터로 채웁니다.
2. 채워진 데이터 인스턴스를 threading.Thread를 상속하여 생성된 스레드 클래스에 전달합니다.
3. 데몬 스레드 풀을 생성합니다.
4. 매번 대기열에서 하나의 항목을 꺼내고 이 스레드의 데이터 및 실행 메서드를 사용하여 해당 작업을 수행합니다.
5. 이 작업을 완료한 후 queue.task_done() 함수를 사용하여 작업이 완료되었다는 신호를 대기열에 보냅니다.
6. 대기열에서 조인 작업을 수행한다는 것은 실제로 기본 프로그램을 종료하기 전에 대기열이 빌 때까지 기다리는 것을 의미합니다.
이 모드를 사용할 때 주의할 점: 데몬 스레드를 true로 설정하면 프로그램이 실행 후 자동으로 종료됩니다. 장점은 대기열에서 조인 작업을 수행하거나 종료하기 전에 대기열이 빌 때까지 기다릴 수 있다는 것입니다.
다중 대기열이라고 하며, 한 대기열의 출력을 다른 대기열의 입력으로 사용할 수 있습니다.
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() out_queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): whileTrue: #从队列中取消息 num = self.queue.get() bkeep = num #将bkeep放入队列中 self.out_queue.put(bkeep) #signals to queue job is done self.queue.task_done() class PrintLove(threading.Thread): def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): whileTrue: #从队列中获取消息并赋值给bkeep bkeep = self.out_queue.get() keke = "I love " + str(bkeep) print keke, print self.getName() time.sleep(1) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #populate queue with data for num in range(10): queue.put(num) #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadNum(queue, out_queue) t.setDaemon(True) t.start() for i in range(5): pl = PrintLove(out_queue) pl.setDaemon(True) pl.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
실행 결과:
I love 0 Thread-6 I love 1 Thread-7 I love 2 Thread-8 I love 3 Thread-9 I love 4 Thread-10 I love 5 Thread-7 I love 6 Thread-6 I love 7 Thread-9 I love 8 Thread-8 I love 9 Thread-10 Elapsed Time: 2.00300002098
해석:
ThreadNum 클래스 작업 흐름
정의 큐- ---> 스레딩 상속----> 큐 초기화----> 실행 함수 정의 ---> 큐에 데이터 가져오기---> 데이터 처리----> 데이터 넣기 다른 대기열로---> 항목이 처리되었음을 대기열에 알리는 신호를 보냅니다
주 기능 작업 흐름:
---> for 루프가 시작되는 것으로 결정됨 스레드 수----> ThreadNum 클래스 인스턴스화----> 스레드 시작 및 데몬 설정
---> for 루프는 시작된 스레드 수를 결정합니다. ----> PrintLove 클래스 인스턴스화--- >스레드를 시작하고 가드로 설정합니다
--->큐의 메시지가 처리될 때까지 기다린 후 조인을 실행합니다. 즉, 메인 프로그램을 종료합니다.
MQ의 일반적인 구현을 이해한 후 메시지 대기열의 장점을 요약해 보겠습니다.
1. 디커플링
프로젝트 시작 시 향후 프로젝트에 어떤 요구 사항이 발생하게 될지 예측하는 것은 매우 어렵습니다. 메시지 큐는 처리 프로세스 중간에 암시적 데이터 기반 인터페이스 계층을 삽입하며 양쪽의 처리 프로세스는 이 인터페이스를 구현해야 합니다. 이를 통해 동일한 인터페이스 제약 조건을 준수하는 한 양쪽의 프로세스를 독립적으로 확장하거나 수정할 수 있습니다.
데이터 처리 시 프로세스가 실패하는 경우가 있습니다. 데이터가 지속되지 않으면 영원히 손실됩니다. 메시지 큐는 데이터가 완전히 처리될 때까지 데이터를 유지하여 데이터 손실 위험을 방지합니다. 많은 메시지 대기열에서 사용되는 "삽입-가져오기-삭제" 패러다임에서는 대기열에서 메시지를 삭제하기 전에 데이터가 안전한지 확인하기 위해 처리 프로세스에서 메시지가 처리되었음을 명확하게 표시해야 합니다. 사용하면 끝.
메시지 대기열이 처리를 분리하기 때문에 추가 처리만 추가하면 메시지 대기열 추가 및 처리 빈도를 쉽게 높일 수 있습니다. 코드를 변경하거나 매개변수를 조정할 필요가 없습니다. 확장은 전원 버튼을 켜는 것만큼 쉽습니다.
해커 뉴스 홈페이지에 애플리케이션이 표시되면 트래픽이 비정상적인 수준으로 올라간 것을 확인할 수 있습니다. 방문 횟수가 급격하게 증가하더라도 애플리케이션은 계속 작동해야 하지만 이러한 트래픽 급증은 흔하지 않습니다. 이러한 피크 방문을 처리할 수 있는 기준에 따라 대기 리소스를 투자하는 것은 큰 낭비입니다. 메시지 대기열을 사용하면 중요한 구성 요소가 과부하된 요청으로 인해 완전히 무너지지 않고 증가된 액세스 압력을 견딜 수 있습니다. 이에 대한 자세한 내용은 최대 처리 기능에 대한 블로그 게시물을 확인하세요.
시스템의 일부 구성 요소에 장애가 발생해도 전체 시스템에 영향을 미치지 않습니다. 메시지 큐는 프로세스 간의 결합을 줄여 주므로 메시지 처리 프로세스가 중단되더라도 큐에 추가된 메시지는 시스템이 복구된 후에도 계속 처리될 수 있습니다. 요청을 재시도하거나 연기하도록 허용하는 기능은 약간 불편한 사용자와 불만을 느끼는 사용자의 차이인 경우가 많습니다.
메시지 대기열에서 제공하는 중복 메커니즘은 하나의 프로세스가 대기열을 읽는 한 메시지가 실제로 처리될 수 있도록 보장합니다. 이를 바탕으로 IronMQ는 "한 번만 전달"을 보장합니다. 큐에서 데이터를 수신하는 프로세스 수에 관계없이 각 메시지는 한 번만 처리될 수 있습니다. 이는 메시지를 받는 것만으로도 메시지를 "구독"하고 일시적으로 대기열에서 제거하기 때문에 가능합니다. 클라이언트가 메시지 처리를 완료했음을 명시적으로 나타내지 않는 한 메시지는 다시 큐에 들어가고 구성 가능한 기간이 지나면 다시 처리될 수 있습니다.
많은 경우 데이터 처리 순서가 중요합니다. 메시지 큐는 본질적으로 정렬되어 있으며 데이터가 특정 순서로 처리되도록 보장할 수 있습니다. IronMO는 메시지가 FIFO(선입 선출) 순서로 처리되도록 보장하므로 대기열의 메시지 위치는 해당 메시지가 검색된 위치입니다.
모든 중요한 시스템에는 다양한 처리 시간이 필요한 요소가 있습니다. 예를 들어 이미지를 로드하는 데는 필터를 적용하는 것보다 시간이 덜 걸립니다. 메시지 대기열은 버퍼링 계층을 사용하여 작업이 가장 효율적으로 실행되도록 돕습니다. 대기열에 대한 쓰기는 대기열에서 읽기를 위한 준비 처리에 의해 제한되지 않고 가능한 한 빨리 처리됩니다. 이 버퍼링은 시스템을 통해 데이터가 흐르는 속도를 제어하고 최적화하는 데 도움이 됩니다.
분산 시스템에서는 사용자 작업에 소요되는 시간과 이유에 대한 전반적인 인상을 얻는 것이 큰 과제입니다. 메시지 시리즈는 데이터 흐름이 충분히 최적화되지 않은 메시지 처리 빈도를 기준으로 성능이 저하된 프로세스나 영역을 식별하는 데 도움이 됩니다.
메시지를 즉시 처리하는 것을 원하지 않거나 필요로 하지 않는 경우가 많습니다. 메시지 대기열은 메시지를 대기열에 넣을 수 있지만 즉시 처리할 수는 없는 비동기 처리 메커니즘을 제공합니다. 원하는 만큼 메시지를 대기열에 넣고 원할 때 처리할 수 있습니다.
위 내용은 Python의 MQ 메시지 큐 구현 및 메시지 큐의 장점 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!