Maison  >  Article  >  développement back-end  >  Comment utiliser Beanstalkd en Python pour le traitement des tâches asynchrones

Comment utiliser Beanstalkd en Python pour le traitement des tâches asynchrones

不言
不言original
2018-04-24 13:35:092947parcourir

Cet article présente principalement la méthode d'utilisation de Beanstalkd pour le traitement des tâches asynchrones en Python. Maintenant, je le partage avec vous et le donne comme référence. Jetons un coup d'œil ensemble

Utilisez Beanstalkd comme service de file d'attente de messages, puis combinez-le avec la syntaxe du décorateur de Python pour implémenter un outil de traitement de tâches asynchrone simple.

Effet final

Définir la tâche :

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task

Soumettre la tâche :

task_one.put(arg1="a", arg2="b", arg3="c")

Ensuite, le fil de travail en arrière-plan peut effectuer ces tâches.

Processus de mise en œuvre

1. Comprendre le serveur Beanstalk

Beanstalk est une file d'attente de travail simple et rapide https://github.com. /kr/beanstalkd

Beanstalk est un service de file d'attente de messages implémenté en langage C. Il fournit une interface commune et a été initialement conçu pour réduire la latence des pages dans les applications Web à grande échelle en exécutant des tâches chronophages de manière asynchrone. Il existe différentes implémentations de Beanstalkd Client pour différentes langues. Il existe des beanstalkc et ainsi de suite en Python. J'utilise beanstalkc comme outil pour communiquer avec le serveur beanstalkd.

2. Principe de mise en œuvre de l'exécution asynchrone des tâches

beanstalkd ne peut effectuer que la planification de tâches de chaîne. Pour que le programme prenne en charge la soumission de fonctions et de paramètres, la fonction est ensuite exécutée par le travailleur et les paramètres sont transportés. Une couche intermédiaire est nécessaire pour enregistrer les fonctions avec les paramètres transmis.

L'implémentation comprend principalement 3 parties :

Abonné : responsable de l'enregistrement de la fonction sur un tube dans beanstalk. L'implémentation est très simple, enregistrant la relation correspondante entre le nom de la fonction et la fonction elle-même. . (Cela signifie que le même nom de fonction ne peut pas exister dans le même groupe (tube)). Les données sont stockées dans des variables de classe.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue : convertit facilement une fonction ordinaire en un décorateur avec la capacité Putter

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper

Putter : combinez le nom de la fonction, les paramètres de la fonction et le regroupement spécifié dans un objet, puis sérialisez json dans une chaîne, et enfin poussez-le vers la file d'attente beanstalkd via beanstalkc.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()

Worker : prenez la chaîne de la file d'attente beanstalkd, puis désérialisez-la en un objet via json.loads pour obtenir le nom de la fonction et paramètres et tube. Enfin, le code de fonction correspondant au nom de la fonction est obtenu de l'abonné, puis les paramètres sont transmis pour exécuter la fonction.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True

Lors de l'écriture du code ci-dessus, j'ai trouvé un problème :

Enregistrer la fonction via l'abonné La relation correspondante entre le nom et la fonction elle-même est dans un interpréteur Python, c'est-à-dire qu'elle s'exécute dans un processus et que le Worker s'exécute de manière asynchrone dans un autre processus. Comment le Worker peut-il également obtenir le même abonné que Putter ? Enfin, j'ai découvert que ce problème pouvait être résolu grâce au mécanisme de décoration de Python.

Cette phrase a résolu le problème de l'abonné

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)

Lorsque import_module_by_str est exécuté, __import__ est appelé pour charger dynamiquement les classes et les fonctions. Après avoir chargé le module contenant la fonction utilisant JobQueue en mémoire. Lors de l'exécution de Woker, l'interpréteur Python exécutera d'abord le code du décorateur @-decorated et chargera la relation correspondante dans Subscriber en mémoire.

Pour une utilisation réelle, veuillez consulter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

Recommandations associées :

Explication détaillée de l'instance de classe de file d'attente de messages php-beanstalkd

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn