検索
ホームページバックエンド開発Python チュートリアルセロリによる公正な処理の確保 — パート I

Ensuring Fair Processing with Celery — Part I

Python に詳しい方なら、Celery について聞いたことがあるでしょう。画像処理や電子メールの送信など、非同期でタスクを処理する場合によく使用される選択肢です。

何人かと話していると、多くの開発者が最初は Celery に魅力を感じているものの、プロジェクトの規模が大きくなり複雑さが増すにつれて、その興奮は薄れ始めていることに気付き始めました。正当な理由で Celery から離れる人もいますが、単純にセロリの核心を自分のニーズに合わせて調整できるほど深く探求していない人もいます。

このブログでは、一部の開発者が代替案を探し始めたり、カスタムのバックグラウンド ワーカー フレームワークを構築したりする理由の 1 つである公平な処理について説明したいと思います。ユーザー/テナントがさまざまなサイズのタスクを送信する環境では、あるテナントの重いワークロードが他のテナントに影響を与えるリスクがボトルネックを生み出し、フラストレーションを引き起こす可能性があります。

Celery で公平な処理を実装し、単一のテナントがリソースを独占できないようにバランスのとれたタスク分散を確保するための戦略について説明します。

問題

マルチテナント アプリケーション、特にバッチ処理を処理するアプリケーションが直面する一般的な課題を詳しく見てみましょう。ユーザーが画像処理タスクをキューに入れて、少し待った後に処理済みの画像を受け取ることができるシステムがあると想像してください。この設定により、API の応答性が維持されるだけでなく、負荷を効率的に処理するために必要に応じてワーカーをスケールすることもできます。

あるテナントが処理のために画像の膨大なバッチを送信することを決定するまでは、すべてがスムーズに実行されます。複数のワーカーを配置し、需要の増加に合わせて自動スケールすることもできるため、インフラストラクチャに自信を持っています。ただし、問題は、他のテナントが小さなバッチ (おそらく数枚の画像のみ) をキューに入れようとしたときに突然始まり、更新が行われずに長い待ち時間に直面することになります。気づかないうちに、サポート チケットが殺到し始め、サービスが遅い、または応答しないというユーザーの苦情が寄せられます。

Celery はデフォルトでタスクを受け取った順に処理するため、このシナリオは非常に一般的です。 1 つのテナントが大量のタスクの流入でワーカーを圧倒している場合、最適な自動スケーリング戦略であっても、他のテナントの遅延を防ぐには十分ではない可能性があります。その結果、これらのユーザーは、約束または期待されたサービス レベルを下回る可能性があります。

セロリによるレート制限

公正な処理を確保するための効果的な戦略の 1 つは、レート制限を実装することです。これにより、各テナントが特定の時間枠内に送信できるタスクの数を制御できます。これにより、単一のテナントがワーカーを独占することがなくなり、すべてのテナントがタスクを処理する公平な機会を確保できるようになります。

Celery には、タスク レベルでレート制限を行うための機能が組み込まれています。

# app.py
from celery import Celery

app = Celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # Limit to 10 tasks per minute
def process_data(data):
    print(f"Processing data: {data}")

# Call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

以下を実行してワーカーを実行できます:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

次に、app.py スクリプトを実行して 20 のタスクをトリガーします。

python app.py

ローカルで実行できた場合は、レート制限が確実に適用されるように各タスク間に遅延があることがわかります。さて、あなたはおそらくこれが私たちの問題の解決にはあまり役に立たないと考えているでしょうが、あなたの言うことはまったく正しい。 Celery によるこの組み込みのレート制限は、タスクに厳しいレート制限がある外部サービスへの呼び出しが含まれる可能性があるシナリオに役立ちます。

この例では、組み込み機能が複雑なシナリオには単純すぎる可能性があることを強調しています。ただし、Celery のフレームワークをさらに詳しく調査することで、この制限を克服できます。テナントごとに自動再試行を使用して適切なレート制限を設定する方法を見てみましょう。

Redis を使用してテナントごとのレート制限を追跡します。 Redis は Celery の人気のあるデータベースおよびブローカーであるため、おそらくすでにスタックにあるこのコンポーネントを活用しましょう。

いくつかのライブラリをインポートしましょう:

import time
import redis
from celery import Celery, Task

次に、レート制限タスクのカスタム基本タスク クラスを実装します。

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)

class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        # Set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

このカスタム クラスは、Redis を使用して特定のテナントによってトリガーされたタスクの量を追跡し、10 秒の TTL を設定します。レート制限を超えると、タスクは 10 秒後に再試行されます。したがって、基本的にデフォルトのレート制限は 10 秒以内に 10 タスクです。

処理をエミュレートするサンプル タスクを定義してみましょう:

@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

ここではプロセスタスクを定義しており、タスクレベルでcustom_rate_limitを変更できることがわかります。 Custom_rate_limit を指定しない場合は、デフォルト値の 10 が割り当てられます。 これで、レート制限が 10 秒以内に 5 つのタスクに変更されました。

次に、さまざまなテナントに対していくつかのタスクをトリガーしてみましょう:

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

テナント ID 1 には 20 個のタスク、テナント ID 2 には 10 個のタスクを定義しています。

完全なコードは次のようになります:

# app.py
import time
import redis
from celery import Celery, Task

app = Celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=False,
)

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)


class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

ワーカーを実行しましょう:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

次に、app.py スクリプトを実行してタスクをトリガーします。

python app.py

ご覧のとおり、ワーカーは最初のテナントの 5 つのタスクを処理し、他のすべてのタスクに対して再試行を設定します。次に、2 番目のテナントの 5 つのタスクを取得し、他のタスクの再試行を設定し、続行します。

このアプローチでは、テナントごとにレート制限を定義できますが、この例でわかるように、非常に高速に実行されるタスクの場合、レート制限を厳密に設定しすぎると、ワーカーがしばらく何もしない状態になってしまいます。レート制限パラメーターを微調整することは非常に重要であり、特定のタスクと量に応じて異なります。最適なバランスが見つかるまで、ためらわずに試してみてください。

結論

Celery のデフォルトのタスク処理がマルチテナント環境でどのように不公平を引き起こす可能性があるか、またレート制限がこの問題の解決にどのように役立つかを調査してきました。テナント固有のレート制限を実装することで、単一のテナントによるリソースの独占を防ぎ、処理能力のより公平な配分を確保できます。

このアプローチは、Celery で公正な処理を実現するための強固な基盤を提供します。ただし、マルチテナント アプリケーションでのタスク処理をさらに最適化するために検討する価値のある手法は他にもあります。当初は 1 つの投稿ですべてをカバーする予定でしたが、このトピックは非常に広範囲にわたることが判明しました。明確さを確保し、この記事の焦点を絞るため、記事を 2 つの部分に分割することにしました。

このシリーズの次のパートでは、公平性と効率を高める別のメカニズムとして、タスクの優先順位について詳しく説明します。このアプローチにより、さまざまな基準に基づいてタスクにさまざまな優先度レベルを割り当てることができるため、需要が高い期間であっても重要なタスクが迅速に処理されるようになります。

次回もお楽しみに!

以上がセロリによる公正な処理の確保 — パート Iの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
Pythonの学習:2時間の毎日の研究で十分ですか?Pythonの学習:2時間の毎日の研究で十分ですか?Apr 18, 2025 am 12:22 AM

Pythonを1日2時間学ぶだけで十分ですか?それはあなたの目標と学習方法に依存します。 1)明確な学習計画を策定し、2)適切な学習リソースと方法を選択します。3)実践的な実践とレビューとレビューと統合を練習および統合し、統合すると、この期間中にPythonの基本的な知識と高度な機能を徐々に習得できます。

Web開発用のPython:主要なアプリケーションWeb開発用のPython:主要なアプリケーションApr 18, 2025 am 12:20 AM

Web開発におけるPythonの主要なアプリケーションには、DjangoおよびFlaskフレームワークの使用、API開発、データ分析と視覚化、機械学習とAI、およびパフォーマンスの最適化が含まれます。 1。DjangoandFlask Framework:Djangoは、複雑な用途の迅速な発展に適しており、Flaskは小規模または高度にカスタマイズされたプロジェクトに適しています。 2。API開発:フラスコまたはdjangorestFrameworkを使用して、Restfulapiを構築します。 3。データ分析と視覚化:Pythonを使用してデータを処理し、Webインターフェイスを介して表示します。 4。機械学習とAI:Pythonは、インテリジェントWebアプリケーションを構築するために使用されます。 5。パフォーマンスの最適化:非同期プログラミング、キャッシュ、コードを通じて最適化

Python vs. C:パフォーマンスと効率の探索Python vs. C:パフォーマンスと効率の探索Apr 18, 2025 am 12:20 AM

Pythonは開発効率でCよりも優れていますが、Cは実行パフォーマンスが高くなっています。 1。Pythonの簡潔な構文とリッチライブラリは、開発効率を向上させます。 2.Cのコンピレーションタイプの特性とハードウェア制御により、実行パフォーマンスが向上します。選択を行うときは、プロジェクトのニーズに基づいて開発速度と実行効率を比較検討する必要があります。

Python in Action:実世界の例Python in Action:実世界の例Apr 18, 2025 am 12:18 AM

Pythonの実際のアプリケーションには、データ分析、Web開発、人工知能、自動化が含まれます。 1)データ分析では、PythonはPandasとMatplotlibを使用してデータを処理および視覚化します。 2)Web開発では、DjangoおよびFlask FrameworksがWebアプリケーションの作成を簡素化します。 3)人工知能の分野では、TensorflowとPytorchがモデルの構築と訓練に使用されます。 4)自動化に関しては、ファイルのコピーなどのタスクにPythonスクリプトを使用できます。

Pythonの主な用途:包括的な概要Pythonの主な用途:包括的な概要Apr 18, 2025 am 12:18 AM

Pythonは、データサイエンス、Web開発、自動化スクリプトフィールドで広く使用されています。 1)データサイエンスでは、PythonはNumpyやPandasなどのライブラリを介してデータ処理と分析を簡素化します。 2)Web開発では、DjangoおよびFlask Frameworksにより、開発者はアプリケーションを迅速に構築できます。 3)自動化されたスクリプトでは、Pythonのシンプルさと標準ライブラリが理想的になります。

Pythonの主な目的:柔軟性と使いやすさPythonの主な目的:柔軟性と使いやすさApr 17, 2025 am 12:14 AM

Pythonの柔軟性は、マルチパラダイムサポートと動的タイプシステムに反映されていますが、使いやすさはシンプルな構文とリッチ標準ライブラリに由来しています。 1。柔軟性:オブジェクト指向、機能的および手続き的プログラミングをサポートし、動的タイプシステムは開発効率を向上させます。 2。使いやすさ:文法は自然言語に近く、標準的なライブラリは幅広い機能をカバーし、開発プロセスを簡素化します。

Python:汎用性の高いプログラミングの力Python:汎用性の高いプログラミングの力Apr 17, 2025 am 12:09 AM

Pythonは、初心者から上級開発者までのすべてのニーズに適した、そのシンプルさとパワーに非常に好まれています。その汎用性は、次のことに反映されています。1)学習と使用が簡単、シンプルな構文。 2)Numpy、Pandasなどの豊富なライブラリとフレームワーク。 3)さまざまなオペレーティングシステムで実行できるクロスプラットフォームサポート。 4)作業効率を向上させるためのスクリプトおよび自動化タスクに適しています。

1日2時間でPythonを学ぶ:実用的なガイド1日2時間でPythonを学ぶ:実用的なガイドApr 17, 2025 am 12:05 AM

はい、1日2時間でPythonを学びます。 1.合理的な学習計画を作成します。2。適切な学習リソースを選択します。3。実践を通じて学んだ知識を統合します。これらの手順は、短時間でPythonをマスターするのに役立ちます。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

このプロジェクトは osdn.net/projects/mingw に移行中です。引き続きそこでフォローしていただけます。 MinGW: GNU Compiler Collection (GCC) のネイティブ Windows ポートであり、ネイティブ Windows アプリケーションを構築するための自由に配布可能なインポート ライブラリとヘッダー ファイルであり、C99 機能をサポートする MSVC ランタイムの拡張機能が含まれています。すべての MinGW ソフトウェアは 64 ビット Windows プラットフォームで実行できます。

SublimeText3 英語版

SublimeText3 英語版

推奨: Win バージョン、コードプロンプトをサポート!

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

PhpStorm Mac バージョン

PhpStorm Mac バージョン

最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール