Maison  >  Article  >  développement back-end  >  Implémentation de l'exécution en boucle chronométrée des tâches via Python3

Implémentation de l'exécution en boucle chronométrée des tâches via Python3

anonymity
anonymityoriginal
2019-04-19 13:15:1612862parcourir

Dans notre développement actuel, il existe souvent une telle exigence : un certain module fonctionnel ou une certaine tâche doit être exécuté de manière cyclique dans la même période de temps. Il y a ici un concept de minuterie. Plus précisément, comment devrions-nous implémenter une minuterie ? Les timers disposent de nombreuses fonctions très pratiques, qui permettent de contrôler l'exécution des threads, de réduire la consommation du système, etc. Pratiquons maintenant à implémenter la fonction de synchronisation dans Python3.

Implémentation de l'exécution en boucle chronométrée des tâches via Python3

Par exemple, lorsque vous utilisez Python pour développer un système d'exploration, vous devrez peut-être effectuer des tâches à plusieurs reprises à intervalles réguliers pour implémenter un service de thread permettant de surveiller les données en arrière-plan. . Le statut d'exploration, la minuterie peut aider ici.

[Recommandation vidéo : Tutoriel vidéo Python3]

[Recommandation manuelle : Manuel Python chinois]

Grâce à la documentation Python, nous peut trouver threading.Timer() pour implémenter la fonction de synchronisation :

Code d'implémentation simple :

import threading
def func1(a):
    #Do something
    print('Do something')
    a+=1
    print(a)
    print('当前线程数为{}'.format(threading.activeCount()))
    if a>5:
        return
    t=threading.Timer(5,func1,(a,))
    t.start()

Rendu :

Implémentation de lexécution en boucle chronométrée des tâches via Python3

En consultant les informations, Python peut être utilisé pour implémenter trois méthodes différentes d'exécution de tâches planifiées :

1. Code de tâche planifiée

#!/user/bin/env python
#定时执行任务命令
import time,os,sched
schedule = sched.scheduler(time.time,time.sleep)
def perform_command(cmd,inc):
  os.system(cmd)
  print('task')
def timming_exe(cmd,inc=60):
  schedule.enter(inc,0,perform_command,(cmd,inc))
  schedule.run()
print('show time after 2 seconds:')
timming_exe('echo %time%',2)

Exécution périodique des tâches

<.>
#!/user/bin/env python
import time,os,sched
schedule = sched.scheduler(time.time,time.sleep)
def perform_command(cmd,inc):
  #在inc秒后再次运行自己,即周期运行
  schedule.enter(inc, 0, perform_command, (cmd, inc))
  os.system(cmd)
def timming_exe(cmd,inc=60):
  schedule.enter(inc,0,perform_command,(cmd,inc))
  schedule.run()#持续运行,直到计划时间队列变成空为止
print(&#39;show time after 2 seconds:&#39;)
timming_exe(&#39;echo %time%&#39;,2)
3. Exécution en boucle des commandes

#!/user/bin/env python
import time,os
def re_exe(cmd,inc = 60):
  while True:
    os.system(cmd)
    time.sleep(inc)
re_exe("echo %time%",5)
En résumé : les méthodes de Python pour implémenter les minuteries sont toutes basées sur la planification et le threading. L'utilisation spécifique doit être utilisée de manière flexible en fonction de la situation réelle.


Les deux modules les plus couramment utilisés : threading, Sched

Le module Threading est utilisé :


import threading ,time
from time import sleep, ctime
class Timer(threading.Thread):
        """
        very simple but useless timer.
        """
        def __init__(self, seconds):
                self.runTime = seconds
                threading.Thread.__init__(self)
        def run(self):
                time.sleep(self.runTime)
                print ("Buzzzz!! Time&#39;s up!")
class CountDownTimer(Timer):
        """
        a timer that can counts down the seconds.
        """
        def run(self):
                counter = self.runTime
                for sec in range(self.runTime):
                        print (counter)
                        time.sleep(1.0)
                        counter -= 1
                print ("Done")
 
class CountDownExec(CountDownTimer):
        """
        a timer that execute an action at the end of the timer run.
        """
        def __init__(self, seconds, action, args=[]):
                self.args = args
                self.action = action
                CountDownTimer.__init__(self, seconds)
        def run(self):
                CountDownTimer.run(self)
                self.action(self.args)
 
def myAction(args=[]):
        print ("Performing my action with args:")
        print (args)
 
if __name__ == "__main__":
        t = CountDownExec(3, myAction, ["hello", "world"])
        t.start()
        print("2333")
Le module Sched est utilisé :


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn