이 기사에서는 공정한 처리에 대한 이전 게시물을 바탕으로 Celery의 작업 우선순위를 살펴봅니다. 작업 우선순위는 사용자 정의 기준에 따라 작업에 다양한 우선순위 수준을 할당하여 백그라운드 처리의 공정성과 효율성을 높이는 방법을 제공합니다.
작업 수준 우선순위를 통해 복잡한 구현 없이 작업 실행을 세밀하게 제어할 수 있습니다. 할당된 우선순위 값을 사용하여 모든 작업을 단일 대기열에 제출함으로써 작업자는 긴급성에 따라 작업을 처리할 수 있습니다. 이는 제출 시간에 관계없이 공정한 처리를 보장합니다.
예를 들어, 한 테넌트가 100개의 작업을 제출하고 다른 테넌트가 곧 5개의 작업을 제출하는 경우 작업 수준 우선 순위에 따라 두 번째 테넌트는 100개의 작업이 모두 완료될 때까지 기다리지 않습니다.
이 접근 방식은 테넌트의 작업 수에 따라 우선순위를 동적으로 할당합니다. 각 테넌트의 첫 번째 작업은 높은 우선 순위로 시작되지만 동시 작업이 10개마다 우선 순위가 감소합니다. 이를 통해 작업 수가 적은 임차인이 불필요한 지연을 경험하지 않도록 할 수 있습니다.
먼저 Celery와 Redis를 설치합니다.
pip install celery redis
Redis를 브로커로 사용하고 우선순위 기반 작업 처리를 활성화하도록 Celery를 구성합니다.
from celery import Celery app = Celery( "tasks", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=True, ) app.conf.broker_transport_options = { "priority_steps": list(range(10)), "sep": ":", "queue_order_strategy": "priority", }
Redis를 사용하여 각 테넌트의 작업 수를 캐시하는 동적 우선순위를 계산하는 방법을 정의합니다.
import redis redis_client = redis.StrictRedis(host="localhost", port=6379, db=1) def calculate_priority(tenant_id): """ Calculate task priority based on the number of tasks for the tenant. """ key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) return min(10, task_count // 10)
성공적으로 완료되면 작업 수를 줄이는 사용자 정의 작업 클래스를 만듭니다.
from celery import Task class TenantAwareTask(Task): def on_success(self, retval, task_id, args, kwargs): tenant_id = kwargs.get("tenant_id") if tenant_id: key = f"tenant:{tenant_id}:task_count" redis_client.decr(key, 1) return super().on_success(retval, task_id, args, kwargs) @app.task(name="tasks.send_email", base=TenantAwareTask) def send_email(tenant_id, task_data): """ Simulate sending an email. """ sleep(1) key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) logger.info("Tenant %s tasks: %s", tenant_id, task_count)
다른 테넌트에 대한 작업을 트리거하여 테넌트_id가 작업의 키워드 인수에 포함되도록 합니다.
if __name__ == "__main__": tenant_id = 1 for _ in range(100): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority ) tenant_id = 2 for _ in range(10): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority )
여기에서 전체 코드를 볼 수 있습니다.
Celery 작업자를 시작하고 작업을 트리거합니다.
# Run the worker celery -A tasks worker --loglevel=info # Trigger the tasks python tasks.py
이 설정은 Redis와 결합된 Celery의 우선 순위 대기열이 테넌트 활동에 따라 우선 순위를 동적으로 조정하여 공정한 작업 처리를 보장하는 방법을 보여줍니다. 작업자의 단순화된 출력을 살펴보겠습니다.
Celery 및 Redis의 작업 수준 우선순위는 다중 테넌트 시스템에서 공정한 처리를 보장하기 위한 강력한 솔루션을 제공합니다. 우선순위를 동적으로 할당하고 단일 대기열을 활용함으로써 비즈니스 요구 사항을 충족하면서 단순성을 유지할 수 있습니다.
작업 수준 우선순위를 구현하는 방법은 여러 가지가 있습니다. 예를 들어 RabbitMQ를 사용하는 것은 핵심에서 우선순위를 지원하므로 더 효율적이지만 작업 계산에도 Redis를 사용하므로 전체 아키텍처가 단순화됩니다.
이 내용이 도움이 되기를 바라며 다음 내용을 살펴보세요!
위 내용은 셀러리의 공정한 처리 보장 - 2부의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!