此模組適用於執行緒間通信,但不能用於進程間通訊。
範例程式碼1: 【注意:此時程式碼有錯誤! ! ! 】
import time import threading from queue import Queue def task_func(): global queue while queue.qsize() > 0: x = queue.get() print(f"num: {x}") time.sleep(0.1) def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
注意:上述寫法:
while queue.qsize() > 0: x = queue.get()
當生產者速度沒有消費者速度快時,上述消費者程式碼會提前結束,導致生產者的速度不能消費。
while True: x = queue.get()
這種寫法也存在問題,此時消費者佇列會一直監聽生產者佇列是否有數據,導致執行緒一直處於阻塞狀態,程式會一直阻塞不會停止,嚴重浪費系統資源。如果使用apscheduler等定時任務的函式庫的話,會導致定時任務無法啟動。
其實queue佇列中的put()或是get()方法都提供了timeout參數,利用這個參數可以有效解決上述消除不能消費和執行緒一直阻塞問題。
範例程式碼2:
import time import threading from queue import Queue def task_func(): global queue while True: x = queue.get(timeout=10) print(f"num: {x}") def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
運行結果:
#根據不同的情境,可以根據實際情況設定timeout的值。如果使用定時任務,使用timeout是可以的,不會使程式拋異常停止的。
此模組用於對進程,但不能用於進程池
範例程式碼:
import time from multiprocessing import Process, Queue import queue def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = queue.Queue() queue = Queue() my_producer = Process(target=producer, args=(queue, )) my_consumer = Process(target=consumer, args=(queue, )) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() # 使用queue模块的Queue()会报错 # 使用multiprocessing中的Queue(),正确输出a
執行結果:
範例程式碼:
import time from multiprocessing import Process, Queue, Pool, Manager def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = Queue() queue = Manager().Queue() pool = Pool() # pool中的进程间通信需要使用Manager pool.apply_async(producer, args=(queue, )) pool.apply_async(consumer, args=(queue, )) pool.close() pool.join()
執行結果:
#以上是Python佇列的使用方法有哪些的詳細內容。更多資訊請關注PHP中文網其他相關文章!