이 모듈은 스레드 간 통신에 적합하지만 프로세스 간 통신에는 사용할 수 없습니다.
샘플 코드 1: [참고: 현재 코드에 오류가 있습니다! ! ! 】
import time import threading from queue import Queue def task_func(): global queue while queue.qsize() > 0: x = queue.get() print(f"num: {x}") time.sleep(0.1) def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
참고: 위는 다음과 같이 작성됩니다.
while queue.qsize() > 0: x = queue.get()
생산자 속도가 소비자 속도만큼 빠르지 않으면 위 소비자 코드가 일찍 종료되어 생산자 속도가 소비할 수 없게 됩니다.
while True: x = queue.get()
이런 방식에도 문제가 있습니다. 이때 소비자 큐는 생산자 큐에 데이터가 있는지 항상 모니터링하므로 스레드가 항상 차단되고 프로그램이 중지되지 않습니다. 이는 시스템 리소스를 심각하게 낭비합니다. apscheduler와 같은 예약된 작업 라이브러리를 사용하는 경우 예약된 작업이 시작되지 않습니다.
실제로 timeout 매개변수는 대기열의 put() 또는 get() 메서드에 제공됩니다. 이 매개변수를 사용하면 위에서 언급한 소비 불가 및 스레드 차단 문제를 효과적으로 해결할 수 있습니다.
샘플 코드 2:
import time import threading from queue import Queue def task_func(): global queue while True: x = queue.get(timeout=10) print(f"num: {x}") def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
실행 결과:
다양한 상황에 따라 타임아웃 값은 실제 상황에 따라 설정될 수 있습니다. 예약된 작업을 사용하는 경우 시간 제한을 사용해도 괜찮으며 프로그램은 예외 발생을 멈추지 않습니다.
이 모듈은 프로세스에 사용되지만 프로세스 풀에는 사용할 수 없습니다.
샘플 코드:
import time from multiprocessing import Process, Queue import queue def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = queue.Queue() queue = Queue() my_producer = Process(target=producer, args=(queue, )) my_consumer = Process(target=consumer, args=(queue, )) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() # 使用queue模块的Queue()会报错 # 使用multiprocessing中的Queue(),正确输出a
실행 결과:
샘플 코드 :
import time from multiprocessing import Process, Queue, Pool, Manager def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = Queue() queue = Manager().Queue() pool = Pool() # pool中的进程间通信需要使用Manager pool.apply_async(producer, args=(queue, )) pool.apply_async(consumer, args=(queue, )) pool.close() pool.join()
실행 결과:
위 내용은 Python 대기열을 사용하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!