このモジュールはスレッド間通信に適していますが、プロセス間通信には使用できません。
サンプル コード 1: [注意: 現時点ではコードにエラーがあります。 ! ! 】
import time import threading from queue import Queue def task_func(): global queue while queue.qsize() > 0: x = queue.get() print(f"num: {x}") time.sleep(0.1) def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
注: 上記の書き方:
while queue.qsize() > 0: x = queue.get()
プロデューサーの速度がコンシューマーの速度ほど速くない場合、上記のコンシューマー コードは早期に終了します。その結果、プロデューサーの速度が低下し、消費できなくなります。
while True: x = queue.get()
この書き方にも問題があり、このときコンシューマーキューはプロデューサーキューにデータがあるかどうかを常に監視するため、スレッドが常にブロックされ、プログラムがブロックされてしまいます。停止しません。これはシステム リソースの重大な無駄です。 apscheduler などのスケジュールされたタスク ライブラリを使用する場合、スケジュールされたタスクは開始されません。
実際には、タイムアウト パラメータはキュー キューの put() または get() メソッドで提供されており、このパラメータを使用すると、前述の消費不能およびスレッド ブロックの問題を効果的に解決できます。
サンプルコード 2:
import time import threading from queue import Queue def task_func(): global queue while True: x = queue.get(timeout=10) print(f"num: {x}") def producer_data(): global queue for i in range(100): queue.put(i) time.sleep(0.1) if __name__ == '__main__': queue = Queue() producer_thread = threading.Thread(target=producer_data) producer_thread.start() thread_list = [] for i in range(5): thread = threading.Thread(target=task_func) thread.start() thread_list.append(thread) for thread in thread_list: thread.join() print("主程序执行结束!")
実行結果:
さまざまな状況に応じて、タイムアウトの値を設定できます。実際の状況。スケジュールされたタスクを使用する場合は、タイムアウトを使用しても問題ありません。プログラムは例外のスローを停止しません。
このモジュールはプロセスに使用されますが、プロセス プールには使用できません
サンプルコード:
import time from multiprocessing import Process, Queue import queue def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = queue.Queue() queue = Queue() my_producer = Process(target=producer, args=(queue, )) my_consumer = Process(target=consumer, args=(queue, )) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() # 使用queue模块的Queue()会报错 # 使用multiprocessing中的Queue(),正确输出a
実行結果:
サンプルコード:
import time from multiprocessing import Process, Queue, Pool, Manager def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": # queue = Queue() queue = Manager().Queue() pool = Pool() # pool中的进程间通信需要使用Manager pool.apply_async(producer, args=(queue, )) pool.apply_async(consumer, args=(queue, )) pool.close() pool.join()
実行結果:
以上がPython キューの使用方法にはどのようなものがありますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。