>  기사  >  백엔드 개발  >  비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법

비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법

不言
不言원래의
2018-04-24 13:35:092919검색

이 글에서는 Python에서 비동기 작업 처리를 위해 Beanstalkd를 사용하는 방법을 주로 소개하고 참고용으로 올려드립니다. 함께 살펴보겠습니다

Beanstalkd를 메시지 큐 서비스로 사용한 다음 이를 Python의 데코레이터 구문과 결합하여 간단한 비동기 작업 처리 도구를 구현합니다.

최종 효과

작업 정의:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task

작업 제출:

task_one.put(arg1="a", arg2="b", arg3="c")

그러면 백그라운드 작업 스레드가 이러한 작업을 수행할 수 있습니다.

구현 과정

1. Beanstalk 서버 이해

Beanstalk는 간단하고 빠른 작업 대기열입니다. https://github.com/kr/beanstalkd

Beanstalk는 C 언어로 구현된 메시지 대기열 서비스입니다. 이는 공통 인터페이스를 제공하며 원래 시간이 많이 걸리는 작업을 비동기식으로 실행하여 대규모 웹 애플리케이션에서 페이지 대기 시간을 줄이도록 설계되었습니다. 언어마다 다른 Beanstalkd 클라이언트 구현이 있습니다. Python에는 beanstalkc 등이 있습니다. 나는 beanstalkd 서버와 통신하기 위한 도구로 beanstalkc를 사용합니다.

2. 비동기 작업 실행 구현 원칙

beanstalkd는 문자열 작업 스케줄링만 수행할 수 있습니다. 프로그램이 함수와 매개변수 제출을 지원하기 위해 워커에 의해 함수가 실행되고 매개변수가 전달됩니다. 전달된 매개변수로 함수를 등록하려면 중간 계층이 필요합니다.

구현은 주로 세 부분으로 구성됩니다.

구독자: 콩나무 튜브에 함수를 등록하는 역할을 담당합니다. 구현은 매우 간단하며 함수 이름과 함수 자체 사이의 해당 관계를 등록합니다. (같은 그룹(튜브) 내에서는 같은 기능명이 존재할 수 없다는 뜻입니다.) 데이터는 클래스 변수에 저장됩니다.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue: Putter 기능을 사용하여 일반 함수를 데코레이터로 편리하게 변환합니다.

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper

Putter: 함수 이름, 함수 매개변수 및 지정된 그룹을 객체로 결합한 다음 json 직렬화 문자열로 변환하고 마지막으로 beanstalkc를 통해 beanstalkd 대기열로 푸시됩니다.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()

Worker: beanstalkd 대기열에서 문자열을 가져온 다음 json.loads를 통해 객체로 역직렬화하여 함수 이름, 매개변수 및 튜브를 가져옵니다. 마지막으로 구독자로부터 함수 이름에 해당하는 함수 코드를 얻은 후 매개 변수를 전달하여 함수를 실행합니다.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True

위 코드를 작성하다가 문제를 발견했습니다.

Subscriber를 통해 등록한 함수 이름과 함수 자체의 해당 관계는 Python 인터프리터, 즉 작업자는 다른 프로세스에서 비동기적으로 실행되며 어떻게 작업자가 Putter와 동일한 구독자를 얻을 수 있습니까? 마지막으로 저는 이 문제가 Python의 데코레이터 메커니즘을 통해 해결될 수 있음을 발견했습니다.

구독자 문제를 해결한 문장입니다

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)

import_module_by_str이 실행되면 __import__를 호출하여 클래스와 함수를 동적으로 로드합니다. JobQueue를 사용하여 함수가 포함된 모듈을 메모리에 로드한 후. Woker를 실행할 때 Python 인터프리터는 먼저 @ 장식 데코레이터 코드를 실행하고 Subscriber의 해당 관계를 메모리에 로드합니다.

실제 사용법은 https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

관련 권장 사항:

php-beanstalkd 메시지 큐 클래스 인스턴스 자세한 설명을 참조하세요.

위 내용은 비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.