這篇文章主要介紹了Python使用 Beanstalkd 做非同步任務處理的方法,現在分享給大家,也給大家做個參考。一起來看看吧
使用Beanstalkd 作為訊息佇列服務,然後結合Python 的裝飾器語法實作一個簡單的非同步任務處理工具.
最終效果
#定義任務:
from xxxxx.job_queue import JobQueue queue = JobQueue() @queue.task('task_tube_one') def task_one(arg1, arg2, arg3): # do task
提交任務:
task_one.put(arg1="a", arg2="b", arg3="c")
然後就可以由後台的work 執行緒去執行這些任務了。
實作過程
1、了解Beanstalk Server
#Beanstalk is a simple, fast work queue. https://github.com /kr/beanstalkd
Beanstalk 是一個C 語言實作的訊息佇列服務。它提供了通用的接口,最初設計的目的是透過非同步運行耗時的任務來減少大量Web應用程式中的頁面延遲。針對不同的語言,有不同的 Beanstalkd Client 實作。 Python 裡就有 beanstalkc 等。我就是利用 beanstalkc 來作為與 beanstalkd server 溝通的工具。
2、任務非同步執行實作原理
#beanstalkd 只能進行字串的任務調度。為了讓程式支援提交函數和參數,然後由woker執行函數並攜帶參數。需要一個中間層來將函數與傳遞的參數註冊。
實作主要包含3個部分:
Subscriber: 負責將函數註冊到 beanstalk 的一個tube上,實作很簡單,註冊函數名稱和函數本身的對應關係。 (也就意味著同一個分組(tube)下不能有相同函數名存在)。資料儲存在類別變數裡。
class Subscriber(object): FUN_MAP = defaultdict(dict) def __init__(self, func, tube): logger.info('register func:{} to tube:{}.'.format(func.__name__, tube)) Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue: 方便將一個普通函數轉換為具有Putter 能力的裝飾器
class JobQueue(object): @classmethod def task(cls, tube): def wrapper(func): Subscriber(func, tube) return Putter(func, tube) return wrapper
Putter: 將函數名稱、函數參數、指定的分組組合為一個對象,然後json 序列化為字串,最後透過beanstalkc 推送到beanstalkd 佇列。
class Putter(object): def __init__(self, func, tube): self.func = func self.tube = tube # 直接调用返回 def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) # 推给离线队列 def put(self, **kwargs): args = { 'func_name': self.func.__name__, 'tube': self.tube, 'kwargs': kwargs } logger.info('put job:{} to queue'.format(args)) beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) try: beanstalk.use(self.tube) job_id = beanstalk.put(json.dumps(args)) return job_id finally: beanstalk.close()
Worker: 從beanstalkd 佇列中取出字串,然後透過json.loads 反序列化為對象,獲得函數名稱、參數和tube 。最後從 Subscriber 得到 函數名稱對應的函數程式碼,然後傳遞參數執行函數。
class Worker(object): worker_id = 0 def __init__(self, tubes): self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) self.tubes = tubes self.reserve_timeout = 20 self.timeout_limit = 1000 self.kick_period = 600 self.signal_shutdown = False self.release_delay = 0 self.age = 0 self.signal_shutdown = False signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown()) Worker.worker_id += 1 import_module_by_str('pear.web.controllers.controller_crawler') def subscribe(self): if isinstance(self.tubes, list): for tube in self.tubes: if tube not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(tube)) continue self.beanstalk.watch(tube) else: if self.tubes not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(self.tubes)) return self.beanstalk.watch(self.tubes) def run(self): self.subscribe() while True: if self.signal_shutdown: break if self.signal_shutdown: logger.info("graceful shutdown") break job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout if not job: continue try: self.on_job(job) self.delete_job(job) except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) except Exception as e: logger.error(e) kicks = job.stats()['kicks'] if kicks < 3: self.bury_job(job) else: message = json.loads(job.body) logger.error("Kicks reach max. Delete the job", extra={'body': message}) self.delete_job(job) @classmethod def on_job(cls, job): start = time.time() msg = json.loads(job.body) logger.info(msg) tube = msg.get('tube') func_name = msg.get('func_name') try: func = Subscriber.FUN_MAP[tube][func_name] kwargs = msg.get('kwargs') func(**kwargs) logger.info(u'{}-{}'.format(func, kwargs)) except Exception as e: logger.error(e.message, exc_info=True) cost = time.time() - start logger.info('{} cost {}s'.format(func_name, cost)) @classmethod def delete_job(cls, job): try: job.delete() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) @classmethod def bury_job(cls, job): try: job.bury() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) def graceful_shutdown(self): self.signal_shutdown = True
寫上面程式碼的時候,發現一個問題:
透過Subscriber 註冊函數名稱和函數本身的對應關係,是在一個Python解釋器,也就是在一個進程裡運行的,而Worker 又是異步在另外的進程運行,怎麼樣才能讓Worker 也能拿到和Putter 一樣的Subscriber。最後發現透過 Python 的裝飾器機制可以解決這個問題。
就是這句解決了Subscriber 的問題
import_module_by_str('pear.web.controllers.controller_crawler')
# import_module_by_str 的实现 def import_module_by_str(module_name): if isinstance(module_name, unicode): module_name = str(module_name) __import__(module_name)
執行import_module_by_str 時,會呼叫__import__ 動態載入類別和函數。將使用了 JobQueue 的函數所在模組載入到記憶體之後。當 執行 Woker 時,Python 解釋器就會先執行 @修飾的裝飾器程式碼,也會把 Subscriber 中的對應關係載入到記憶體。
實際使用可以看https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py
相關建議:
以上是Python使用 Beanstalkd 做非同步任務處理的方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

要在有限的時間內最大化學習Python的效率,可以使用Python的datetime、time和schedule模塊。 1.datetime模塊用於記錄和規劃學習時間。 2.time模塊幫助設置學習和休息時間。 3.schedule模塊自動化安排每週學習任務。

Python在遊戲和GUI開發中表現出色。 1)遊戲開發使用Pygame,提供繪圖、音頻等功能,適合創建2D遊戲。 2)GUI開發可選擇Tkinter或PyQt,Tkinter簡單易用,PyQt功能豐富,適合專業開發。

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。Python以简洁和强大的生态系统著称,C 则以高性能和底层控制能力闻名。

2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

兩小時內可以學到Python的基礎知識。 1.學習變量和數據類型,2.掌握控制結構如if語句和循環,3.了解函數的定義和使用。這些將幫助你開始編寫簡單的Python程序。

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

Safe Exam Browser
Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

MantisBT
Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

SecLists
SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

ZendStudio 13.5.1 Mac
強大的PHP整合開發環境