如果您熟悉 Python,您很可能听说过 Celery。它通常是异步处理任务的首选,例如图像处理或发送电子邮件。
与一些人交谈时,我开始注意到许多开发人员一开始都觉得 Celery 令人印象深刻,但随着他们的项目规模和复杂性的增加,他们的兴奋开始消退。虽然有些人出于正当原因放弃了 Celery,但其他人可能只是没有深入探索其核心,无法根据自己的需求进行定制。
在这篇博客中,我想讨论一些开发人员开始寻找替代方案甚至构建自定义后台工作框架的原因之一:公平处理。在用户/租户提交不同规模任务的环境中,一个租户的繁重工作量影响其他租户的风险可能会造成瓶颈并导致挫败感。
我将引导您了解在 Celery 中实现公平处理的策略,确保平衡的任务分配,以便没有任何一个租户可以支配您的资源。
问题
让我们深入探讨多租户应用程序面临的常见挑战,特别是那些处理批处理的应用程序。想象一下,您有一个系统,用户可以将其图像处理任务排队,允许他们在短暂等待后收到处理后的图像。此设置不仅可以使您的 API 保持响应,还可以让您根据需要扩展工作线程以有效地处理负载。
一切都运行顺利 - 直到一个租户决定提交大量图像进行处理。您拥有多名工作人员,他们甚至可以自动扩展以满足不断增长的需求,因此您对您的基础设施充满信心。然而,当其他租户尝试对较小的批次(可能只是几张图像)进行排队并突然发现自己面临长时间的等待而没有任何更新时,麻烦就开始了。在您不知不觉中,支持请求开始涌入,用户抱怨您的服务速度缓慢甚至没有响应。
这种情况太常见了,因为 Celery 默认情况下按照接收到的顺序处理任务。当一个租户因大量涌入的任务而让您的工作人员不堪重负时,即使是最好的自动扩展策略也可能不足以防止其他租户出现延误。因此,这些用户体验到的服务水平可能达不到承诺或预期的水平。
使用 Celery 进行速率限制
确保公平处理的一个有效策略是实施速率限制。它允许您控制每个租户在特定时间范围内可以提交的任务数量。这可以防止任何单个租户垄断您的工人,并确保所有租户都有公平的机会来处理他们的任务。
Celery 具有内置的任务级别速率限制功能:
# app.py from celery import Celery app = Celery("app", broker="redis://localhost:6379/0") @app.task(rate_limit="10/m") # Limit to 10 tasks per minute def process_data(data): print(f"Processing data: {data}") # Call the task if __name__ == "__main__": for i in range(20): process_data.delay(f"data_{i}")
您可以通过执行以下命令来运行工作线程:
celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行app.py脚本来触发20个任务:
python app.py
如果您设法在本地运行它,您会注意到每个任务之间存在延迟,以确保执行速率限制。现在您可能认为这并不能真正帮助我们解决问题,您完全正确。 Celery 的内置速率限制对于我们的任务可能涉及调用具有严格速率限制的外部服务的场景非常有用。
这个示例强调了内置功能对于复杂场景来说可能过于简单。然而,我们可以通过更深入地探索 Celery 的框架来克服这个限制。让我们看看如何为每个租户设置适当的速率限制并自动重试。
我们将使用 Redis 来跟踪每个租户的速率限制。 Redis 是 Celery 的流行数据库和代理,因此让我们利用这个可能已经在您的堆栈中的组件。
让我们导入几个库:
import time import redis from celery import Celery, Task
现在我们将为我们的速率限制任务实现一个自定义基任务类:
# Initialize a Redis client redis_client = redis.StrictRedis(host="localhost", port=6379, db=0) class RateLimitedTask(Task): def __init__(self, *args, **kwargs): # Set default rate limit if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # Rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # Increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # Set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs)
这个自定义类将跟踪特定租户使用 Redis 触发的任务量,并将 TTL 设置为 10 秒。如果超出速率限制,任务将在 10 秒后重试。所以基本上我们的默认速率限制是 10 秒内完成 10 个任务。
让我们定义一个模拟处理的示例任务:
@app.task(base=RateLimitedTask, custom_rate_limit=5) def process(tenant_id: int, data): """ Mock processing task that takes 0.3 seconds to complete. """ print(f"Processing data: {data} for tenant: {tenant_id}") time.sleep(0.3)
这里我们定义了一个流程任务,你可以看到我可以在任务级别更改custom_rate_limit。如果我们不指定 custom_rate_limit,则将分配默认值 10。 现在我们的速率限制已更改为 10 秒内完成 5 个任务。
现在让我们为不同的租户触发一些任务:
if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
我们为租户 ID 1 定义 20 个任务,为租户 ID 2 定义 10 个任务。
所以我们完整的代码将如下所示:
# app.py import time import redis from celery import Celery, Task app = Celery( "app", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=False, ) # Initialize a Redis client redis_client = redis.StrictRedis(host="localhost", port=6379, db=0) class RateLimitedTask(Task): def __init__(self, *args, **kwargs): if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # Rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # Increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # Set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs) @app.task(base=RateLimitedTask, custom_rate_limit=5) def process(tenant_id: int, data): """ Mock processing task that takes 0.3 seconds to complete. """ print(f"Processing data: {data} for tenant: {tenant_id}") time.sleep(0.3) if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
让我们运行我们的工作线程:
celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行 app.py 脚本来触发任务:
python app.py
如您所见,工作人员处理了第一个租户的 5 个任务,并为所有其他任务设置了重试。然后它会执行第二个租户的 5 个任务,并为其他任务设置重试,然后继续进行。
这种方法允许您定义每个租户的速率限制,但正如您在我们的示例中看到的,对于运行速度非常快的任务,对速率限制过于严格最终会让工作人员在一段时间内无所事事。微调速率限制参数至关重要,并且取决于具体的任务和数量。不要犹豫,不断尝试,直到找到最佳平衡。
结论
我们探讨了 Celery 的默认任务处理如何导致多租户环境中的不公平,以及速率限制如何帮助解决此问题。通过实施特定于租户的速率限制,我们可以防止任何单个租户垄断资源,并确保更公平地分配处理能力。
这种方法为在 Celery 中实现公平处理提供了坚实的基础。然而,还有其他值得探索的技术来进一步优化多租户应用程序中的任务处理。虽然我最初计划在一篇文章中涵盖所有内容,但事实证明这个主题非常广泛!为了确保清晰度并保持本文的重点,我决定将其分为两部分。
在本系列的下一部分中,我们将深入研究任务优先级作为增强公平性和效率的另一种机制。这种方法允许您根据不同的标准为任务分配不同的优先级,确保即使在高需求时期也能及时处理关键任务。
敬请期待下期!
以上是确保芹菜的公平加工——第一部分的详细内容。更多信息请关注PHP中文网其他相关文章!

Linux终端中查看Python版本时遇到权限问题的解决方法当你在Linux终端中尝试查看Python的版本时,输入python...

本文解释了如何使用美丽的汤库来解析html。 它详细介绍了常见方法,例如find(),find_all(),select()和get_text(),以用于数据提取,处理不同的HTML结构和错误以及替代方案(SEL)

本文比较了Tensorflow和Pytorch的深度学习。 它详细介绍了所涉及的步骤:数据准备,模型构建,培训,评估和部署。 框架之间的关键差异,特别是关于计算刻度的

在使用Python的pandas库时,如何在两个结构不同的DataFrame之间进行整列复制是一个常见的问题。假设我们有两个Dat...

本文指导Python开发人员构建命令行界面(CLIS)。 它使用Typer,Click和ArgParse等库详细介绍,强调输入/输出处理,并促进用户友好的设计模式,以提高CLI可用性。

本文讨论了诸如Numpy,Pandas,Matplotlib,Scikit-Learn,Tensorflow,Tensorflow,Django,Blask和请求等流行的Python库,并详细介绍了它们在科学计算,数据分析,可视化,机器学习,网络开发和H中的用途

文章讨论了虚拟环境在Python中的作用,重点是管理项目依赖性并避免冲突。它详细介绍了他们在改善项目管理和减少依赖问题方面的创建,激活和利益。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

适用于 Eclipse 的 SAP NetWeaver 服务器适配器
将Eclipse与SAP NetWeaver应用服务器集成。

Dreamweaver CS6
视觉化网页开发工具

安全考试浏览器
Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

WebStorm Mac版
好用的JavaScript开发工具

SecLists
SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。