>백엔드 개발 >파이썬 튜토리얼 >셀러리의 공정한 처리 보장 - 2부

셀러리의 공정한 처리 보장 - 2부

Barbara Streisand
Barbara Streisand원래의
2024-12-28 14:22:10433검색

이 기사에서는 공정한 처리에 대한 이전 게시물을 바탕으로 Celery의 작업 우선순위를 살펴봅니다. 작업 우선순위는 사용자 정의 기준에 따라 작업에 다양한 우선순위 수준을 할당하여 백그라운드 처리의 공정성과 효율성을 높이는 방법을 제공합니다.

왜 작업 수준 우선순위인가?

작업 수준 우선순위를 통해 복잡한 구현 없이 작업 실행을 세밀하게 제어할 수 있습니다. 할당된 우선순위 값을 사용하여 모든 작업을 단일 대기열에 제출함으로써 작업자는 긴급성에 따라 작업을 처리할 수 있습니다. 이는 제출 시간에 관계없이 공정한 처리를 보장합니다.

예를 들어, 한 테넌트가 100개의 작업을 제출하고 다른 테넌트가 곧 5개의 작업을 제출하는 경우 작업 수준 우선 순위에 따라 두 번째 테넌트는 100개의 작업이 모두 완료될 때까지 기다리지 않습니다.

이 접근 방식은 테넌트의 작업 수에 따라 우선순위를 동적으로 할당합니다. 각 테넌트의 첫 번째 작업은 높은 우선 순위로 시작되지만 동시 작업이 10개마다 우선 순위가 감소합니다. 이를 통해 작업 수가 적은 임차인이 불필요한 지연을 경험하지 않도록 할 수 있습니다.

작업 우선순위 구현

먼저 Celery와 Redis를 설치합니다.

pip install celery redis

Redis를 브로커로 사용하고 우선순위 기반 작업 처리를 활성화하도록 Celery를 구성합니다.

from celery import Celery

app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=True,
)

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

Redis를 사용하여 각 테넌트의 작업 수를 캐시하는 동적 우선순위를 계산하는 방법을 정의합니다.

import redis

redis_client = redis.StrictRedis(host="localhost", port=6379, db=1)

def calculate_priority(tenant_id):
    """
    Calculate task priority based on the number of tasks for the tenant.
    """
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    return min(10, task_count // 10)

성공적으로 완료되면 작업 수를 줄이는 사용자 정의 작업 클래스를 만듭니다.

from celery import Task

class TenantAwareTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        tenant_id = kwargs.get("tenant_id")

        if tenant_id:
            key = f"tenant:{tenant_id}:task_count"
            redis_client.decr(key, 1)

        return super().on_success(retval, task_id, args, kwargs)

@app.task(name="tasks.send_email", base=TenantAwareTask)
def send_email(tenant_id, task_data):
    """
    Simulate sending an email.
    """
    sleep(1)
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    logger.info("Tenant %s tasks: %s", tenant_id, task_count)

다른 테넌트에 대한 작업을 트리거하여 테넌트_id가 작업의 키워드 인수에 포함되도록 합니다.

if __name__ == "__main__":
    tenant_id = 1
    for _ in range(100):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )


    tenant_id = 2
    for _ in range(10):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )

여기에서 전체 코드를 볼 수 있습니다.

Celery 작업자를 시작하고 작업을 트리거합니다.

# Run the worker
celery -A tasks worker --loglevel=info

# Trigger the tasks
python tasks.py

이 설정은 Redis와 결합된 Celery의 우선 순위 대기열이 테넌트 활동에 따라 우선 순위를 동적으로 조정하여 공정한 작업 처리를 보장하는 방법을 보여줍니다. 작업자의 단순화된 출력을 살펴보겠습니다.

Ensuring Fair Processing with Celery - Part II

결론

Celery 및 Redis의 작업 수준 우선순위는 다중 테넌트 시스템에서 공정한 처리를 보장하기 위한 강력한 솔루션을 제공합니다. 우선순위를 동적으로 할당하고 단일 대기열을 활용함으로써 비즈니스 요구 사항을 충족하면서 단순성을 유지할 수 있습니다.

작업 수준 우선순위를 구현하는 방법은 여러 가지가 있습니다. 예를 들어 RabbitMQ를 사용하는 것은 핵심에서 우선순위를 지원하므로 더 효율적이지만 작업 계산에도 Redis를 사용하므로 전체 아키텍처가 단순화됩니다.

이 내용이 도움이 되기를 바라며 다음 내용을 살펴보세요!

위 내용은 셀러리의 공정한 처리 보장 - 2부의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.