ホームページ >バックエンド開発 >Python チュートリアル >Python 開発における単一スレッドで実行される複数のスケジュールされたタスクの分析例
1. 初期バージョン:
アイデア: 率直に言えば、タイマーは、指定されたプログラムの実行を遅延させます。 Python のタイマーを自分で設定することはできないため、遅延操作にはシステム タイマーを使用する必要がありますが、操作をスケジュールする必要があるすべてのプログラムを特定のリストに追加して、プログラムを取り出すことができます。リスト内の最も短いタイミング時間を取得し、Bind threading.Timer (時間、コールバック) を実行し、タイムアウトになるのを待ち、カスタム コールバックをトリガーし、リストから取り出したばかりのプログラムを実行して時間を更新します。リスト内の最も短い時間のプログラムを再度取り出し、継続的な反復ループである threading.Timer のバインドを続けます。新しいスケジュールされたタスクがリストに追加されると、現在の threading.Timer バインドがキャンセルされ、リスト内の時間を更新します。 、再度最短時間を取り出し、スレッドをバインドします。タイマー...
import threading
import time
class Timer():
'''单线程下的定时器'''
def __init__(self):
self.queues = []
self.timer = None
self.last_time = time.time()
def start(self):
item = self.get()
if item:
self.timer = threading.Timer(item[0],self.execute)
self.timer.start()
def add(self,item):
print('add',item)
self.flush_time()
self.queues.append(item)
self.queues.sort(key=lambda x:x[0])
if self.timer:
self.timer.cancel()
self.timer = None
self.start()
def get(self):
item = None
if len(self.queues) > 0:
item = self.queues[0]
return item
def pop(self):
item = None
if len(self.queues) > 0:
item = self.queues.pop(0)
return item
def flush_time(self):
curr_time = time.time()
for i in self.queues:
i[0] = i[0] - (curr_time - self.last_time)
self.last_time = curr_time
def execute(self):
# if self.timer:
# self.timer.cancel()
# self.timer = None
item = self.pop()
self.flush_time()
if item:
callback = item[1]
args = item[0]
callback(args)
self.start()
注: コード出力を表示します。すべてのタイマーが調整されています。時間は順番に実行され、非常に完璧で、すべてが非常に良好に見えます。しかし、どうやら、ははは、func で time.sleep(5) を有効にすると、スレッドの数がゆっくりと増加します。理由は最後のタイマーのコールバックです。まだ実行中ですが、この時点で新しいスレッドが追加されました
2.アイデア: ジェネレーター - コンシューマー モデルを使用し、threading.Condition 条件変数を使用します。タイマーは強制的に永久に有効になります。 コード:if __name__ == '__main__': # 检测线程数
def func(): while True: print(threading.active_count())
time.sleep(1)
f1 = threading.Thread(target=func)
f1.start()
import logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") def func1(*args):
logging.info('func1 %s'%args) # time.sleep(5)
def func2(*args):
logging.info('func2 %s' % args) # time.sleep(5)
def func3(*args):
logging.info('func3 %s' % args) # time.sleep(5)
def func4(*args):
logging.info('func4 %s' % args) # time.sleep(5)
def func5(*args):
logging.info('func5 %s' % args) # time.sleep(5)
# 测试
t1 = Timer()
logging.info('start')
t1.add([5,func1])
time.sleep(0.5)
t1.add([4,func2])
time.sleep(0.5)
t1.add([3,func3])
time.sleep(0.5)
t1.add([2,func4])
time.sleep(0.5)
t1.add([1,func5])
time.sleep(5)
t1.add([1,func1])
t1.add([2,func2])
t1.add([3,func3])
t1.add([4,func4])
t1.add([5,func5])
# 输出
# 2
# 07/27/2017 10:36:47 [Thursday] start
# add [5, <function func1 at 0x000000D79FC77E18>]
# add [4, <function func2 at 0x000000D79FCA8488>]
# 3
# add [3, <function func3 at 0x000000D79FCA8510>]
# add [2, <function func4 at 0x000000D79FCA8598>]
# 3
# add [1, <function func5 at 0x000000D79FCA8620>]
# 3
# 07/27/2017 10:36:50 [Thursday] func5 1
# 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459
# 3
# 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105
# 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766
# 3
# 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516
# 2
# 2
# add [1, <function func1 at 0x000000D79FC77E18>]
# add [2, <function func2 at 0x000000D79FCA8488>]
# add [3, <function func3 at 0x000000D79FCA8510>]
# add [4, <function func4 at 0x000000D79FCA8598>]
# add [5, <function func5 at 0x000000D79FCA8620>]
# 3
# 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396
# 3
# 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355
# 3
# 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854
# 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195
# 3
# 3
# 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816
import time import threading import logging class NewTimer(threading.Thread): '''单线程下的定时器''' def __init__(self): super().__init__() self.queues = [] self.timer = None self.cond = threading.Condition() def run(self): while True: # print('NewTimer',self.queues) self.cond.acquire() item = self.get() callback = None if not item: logging.info('NewTimer wait') self.cond.wait() elif item[0] <= time.time(): new_item = self.pop() callback = new_item[1] else: logging.info('NewTimer start sys timer and wait') self.timer = threading.Timer(item[0]-time.time(),self.execute) self.timer.start() self.cond.wait() self.cond.release() if callback: callback(item[0]) def add(self, item): # print('add', item) self.cond.acquire() item[0] = item[0] + time.time() self.queues.append(item) self.queues.sort(key=lambda x: x[0]) logging.info('NewTimer add notify') if self.timer: self.timer.cancel() self.timer = None self.cond.notify() self.cond.release() def pop(self): item = None if len(self.queues) > 0: item = self.queues.pop(0) return item def get(self): item = None if len(self.queues) > 0: item = self.queues[0] return item def execute(self): logging.info('NewTimer execute notify') self.cond.acquire() self.cond.notify() self.cond.release()出力
注: 今回は何があってもテスト スレッドの数は継続的に増加することはなく、マルチタイマー タスクの要件は同時に達成できます。 欠点: 2 つのスレッドが使用されます。 2 回目の精度の問題では、プログラムの実行を続行するには、最後にスケジュールされたプログラムの実行が完了するまで待つ必要があります
。以上がPython 開発における単一スレッドで実行される複数のスケジュールされたタスクの分析例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。