Home >Backend Development >Python Tutorial >Example analysis of multiple scheduled tasks executed in a single thread in Python development
1. Initial version:
Idea: timer , to put it bluntly, it means delaying the execution of the specified program. Currently, it is not practical to reconstruct the timer in python by yourself, and the ability is not reached, so the system timer must be used for delay operation, but we can change the rules; All programs that need to perform scheduled operations are added to a specific list, take out the program with the shortest scheduled time in the list, bind threading.Timer (time, callback), wait for the time to time out, trigger a custom callback, and execute the program just removed from the list ; Then update the time, take out the program with the shortest time in the list again, continue to bind threading.Timer, and continue the iterative loop; when a new scheduled task is added to the list, cancel the current threading.Timer binding , update the time in the list, take out the shortest time again, and bind threading.Timer...
Code:
import threading import time class Timer(): '''单线程下的定时器''' def __init__(self): self.queues = [] self.timer = None self.last_time = time.time() def start(self): item = self.get() if item: self.timer = threading.Timer(item[0],self.execute) self.timer.start() def add(self,item): print('add',item) self.flush_time() self.queues.append(item) self.queues.sort(key=lambda x:x[0]) if self.timer: self.timer.cancel() self.timer = None self.start() def get(self): item = None if len(self.queues) > 0: item = self.queues[0] return item def pop(self): item = None if len(self.queues) > 0: item = self.queues.pop(0) return item def flush_time(self): curr_time = time.time() for i in self.queues: i[0] = i[0] - (curr_time - self.last_time) self.last_time = curr_time def execute(self): # if self.timer: # self.timer.cancel() # self.timer = None item = self.pop() self.flush_time() if item: callback = item[1] args = item[0] callback(args) self.start()
Execution and output:
##
if __name__ == '__main__': # 检测线程数 def func(): while True: print(threading.active_count()) time.sleep(1) f1 = threading.Thread(target=func) f1.start() import logging logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") def func1(*args): logging.info('func1 %s'%args) # time.sleep(5) def func2(*args): logging.info('func2 %s' % args) # time.sleep(5) def func3(*args): logging.info('func3 %s' % args) # time.sleep(5) def func4(*args): logging.info('func4 %s' % args) # time.sleep(5) def func5(*args): logging.info('func5 %s' % args) # time.sleep(5) # 测试 t1 = Timer() logging.info('start') t1.add([5,func1]) time.sleep(0.5) t1.add([4,func2]) time.sleep(0.5) t1.add([3,func3]) time.sleep(0.5) t1.add([2,func4]) time.sleep(0.5) t1.add([1,func5]) time.sleep(5) t1.add([1,func1]) t1.add([2,func2]) t1.add([3,func3]) t1.add([4,func4]) t1.add([5,func5]) # 输出 # 2 # 07/27/2017 10:36:47 [Thursday] start # add [5, <function func1 at 0x000000D79FC77E18>] # add [4, <function func2 at 0x000000D79FCA8488>] # 3 # add [3, <function func3 at 0x000000D79FCA8510>] # add [2, <function func4 at 0x000000D79FCA8598>] # 3 # add [1, <function func5 at 0x000000D79FCA8620>] # 3 # 07/27/2017 10:36:50 [Thursday] func5 1 # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459 # 3 # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105 # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766 # 3 # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516 # 2 # 2 # add [1, <function func1 at 0x000000D79FC77E18>] # add [2, <function func2 at 0x000000D79FCA8488>] # add [3, <function func3 at 0x000000D79FCA8510>] # add [4, <function func4 at 0x000000D79FCA8598>] # add [5, <function func5 at 0x000000D79FCA8620>] # 3 # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396 # 3 # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355 # 3 # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854 # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195 # 3 # 3 # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816
Execution code
Note: Check the code output. All timers are executed in sequence according to the calibrated time. It is perfect. Everything looks beautiful. It just looks like, hahaha, when you put func After time.sleep(5) is enabled, the number of threads increases slowly; the reason is that the last timer callback is still being executed, and the next timer has been started, and then another thread is added. Alas, Failure
2, revised version
Idea: Use the generator-consumer model, The threading.Condition condition variable is used; a Timer is forced to be enabled forever!
Code:
import time import threading import logging class NewTimer(threading.Thread): '''单线程下的定时器''' def __init__(self): super().__init__() self.queues = [] self.timer = None self.cond = threading.Condition() def run(self): while True: # print('NewTimer',self.queues) self.cond.acquire() item = self.get() callback = None if not item: logging.info('NewTimer wait') self.cond.wait() elif item[0] <= time.time(): new_item = self.pop() callback = new_item[1] else: logging.info('NewTimer start sys timer and wait') self.timer = threading.Timer(item[0]-time.time(),self.execute) self.timer.start() self.cond.wait() self.cond.release() if callback: callback(item[0]) def add(self, item): # print('add', item) self.cond.acquire() item[0] = item[0] + time.time() self.queues.append(item) self.queues.sort(key=lambda x: x[0]) logging.info('NewTimer add notify') if self.timer: self.timer.cancel() self.timer = None self.cond.notify() self.cond.release() def pop(self): item = None if len(self.queues) > 0: item = self.queues.pop(0) return item def get(self): item = None if len(self.queues) > 0: item = self.queues[0] return item def execute(self): logging.info('NewTimer execute notify') self.cond.acquire() self.cond.notify() self.cond.release()
Execution and output:
##
if __name__ == '__main__': def func(): while True: print(threading.active_count()) time.sleep(1) f1 = threading.Thread(target=func) f1.start() logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") newtimer = NewTimer() newtimer.start() def func1(*args): logging.info('func1 %s'%args) time.sleep(5) def func2(*args): logging.info('func2 %s' % args) time.sleep(5) def func3(*args): logging.info('func3 %s' % args) time.sleep(5) def func4(*args): logging.info('func4 %s' % args) time.sleep(5) def func5(*args): logging.info('func5 %s' % args) time.sleep(5) newtimer.add([5,func1]) newtimer.add([4,func2]) newtimer.add([3,func3]) newtimer.add([2,func4]) newtimer.add([1,func5]) time.sleep(1) newtimer.add([1,func1]) newtimer.add([2,func2]) newtimer.add([3,func3]) newtimer.add([4,func4]) newtimer.add([5,func5])# 输出# 2# 07/27/2017 11:26:19 [Thursday] NewTimer wait# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait# 07/27/2017 11:26:20 [Thursday] NewTimer execute notify# 4# 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 3# 3# 3# 3# 3# 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007# 3# 3# 3# 3# 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279# 3# 3# 3# 3# 3# 3# 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007# 3# 3# 3# 3# 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:10 [Thursday] NewTimer wait
Note: The number of test threads will not increase in any way this time, and multi-timer task requirements can be achieved at the same time; Disadvantages: Two threads are used, and a single thread is not used for implementation, and the second time accuracy problem , you need to wait for the last scheduled program to complete before the program can continue to run
The above is the detailed content of Example analysis of multiple scheduled tasks executed in a single thread in Python development. For more information, please follow other related articles on the PHP Chinese website!