>백엔드 개발 >파이썬 튜토리얼 >셀러리를 통한 공정한 처리 보장 — 1부

셀러리를 통한 공정한 처리 보장 — 1부

Mary-Kate Olsen
Mary-Kate Olsen원래의
2024-11-16 09:10:03667검색

Ensuring Fair Processing with Celery — Part I

Python에 익숙하다면 Celery를 들어보셨을 것입니다. 이미지 처리나 이메일 전송과 같은 작업을 비동기식으로 처리하는 데 적합한 경우가 많습니다.

일부 사람들과 이야기를 나누면서 많은 개발자들이 처음에는 Celery를 인상적이라고 생각하지만 프로젝트 규모와 복잡성이 증가함에 따라 그들의 흥분은 사라지기 시작한다는 것을 알게 되었습니다. 어떤 사람들은 정당한 이유로 Celery에서 멀어지는 반면, 다른 사람들은 자신의 필요에 맞게 셀러리의 핵심을 깊이 탐구하지 않을 수도 있습니다.

이 블로그에서는 일부 개발자가 대안을 찾거나 맞춤형 백그라운드 작업자 프레임워크를 구축하는 이유 중 하나인 공정한 처리에 대해 논의하고 싶습니다. 사용자/테넌트가 다양한 크기의 작업을 제출하는 환경에서는 한 테넌트의 과도한 워크로드가 다른 테넌트에 영향을 미칠 위험이 있어 병목 현상이 발생하고 좌절감을 느낄 수 있습니다.

Celery에서 공정한 처리를 구현하여 단일 테넌트가 리소스를 독점할 수 없도록 균형 잡힌 작업 분배를 보장하는 전략을 안내하겠습니다.

문제

다중 테넌트 애플리케이션, 특히 일괄 처리를 처리하는 애플리케이션이 직면하는 일반적인 문제에 대해 자세히 살펴보겠습니다. 사용자가 이미지 처리 작업을 대기열에 추가하여 잠시 기다린 후 처리된 이미지를 받을 수 있는 시스템이 있다고 상상해 보십시오. 이 설정을 사용하면 API 응답성을 유지할 수 있을 뿐만 아니라 필요에 따라 작업자를 확장하여 로드를 효율적으로 처리할 수 있습니다.

한 세입자가 처리를 위해 엄청난 양의 이미지를 제출하기 전까지는 모든 것이 원활하게 진행됩니다. 여러 작업자가 배치되어 있고 수요 증가에 맞춰 자동 확장도 가능하므로 인프라에 대해 확신을 가질 수 있습니다. 그러나 문제는 다른 테넌트가 더 작은 배치(예: 이미지 몇 개)를 대기열에 넣으려고 시도하다가 갑자기 업데이트 없이 오랜 대기 시간에 직면하게 되면서 시작됩니다. 당신이 알기도 전에 지원 티켓이 쇄도하기 시작하고 사용자들은 서비스가 느리거나 심지어 응답이 없다고 불평합니다.

Celery는 기본적으로 작업을 수신된 순서대로 처리하기 때문에 이 시나리오는 매우 일반적입니다. 한 테넌트가 대량의 작업 유입으로 작업자를 압도하는 경우 최고의 자동 크기 조정 전략이라도 다른 테넌트의 지연을 방지하는 데 충분하지 않을 수 있습니다. 결과적으로 해당 사용자는 약속했거나 기대했던 것보다 낮은 서비스 수준을 경험할 수 있습니다.

셀러리를 사용한 속도 제한

공정한 처리를 보장하기 위한 효과적인 전략 중 하나는 비율 제한을 구현하는 것입니다. 이를 통해 각 테넌트가 특정 기간 내에 제출할 수 있는 작업 수를 제어할 수 있습니다. 이를 통해 단일 임차인이 직원을 독점하는 것을 방지하고 모든 임차인이 자신의 업무를 처리할 수 있는 공평한 기회를 갖도록 보장합니다.

Celery에는 작업 수준에서 속도를 제한하는 기능이 내장되어 있습니다.

# app.py
from celery import Celery

app = Celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # Limit to 10 tasks per minute
def process_data(data):
    print(f"Processing data: {data}")

# Call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

다음을 실행하여 작업자를 실행할 수 있습니다.

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

이제 app.py 스크립트를 실행하여 20개의 작업을 트리거합니다.

python app.py

로컬로 실행하면 속도 제한이 적용되는지 확인하기 위해 각 작업 사이에 지연이 있다는 것을 알 수 있습니다. 이제 당신은 이것이 우리의 문제를 해결하는 데 실제로 도움이 되지 않는다고 생각하고 있을 것입니다. 당신 말이 전적으로 옳습니다. Celery에서 제공하는 이 기본 제공 속도 제한은 엄격한 속도 제한이 있는 외부 서비스 호출이 포함될 수 있는 작업에 유용합니다.

이 예에서는 복잡한 시나리오에 내장된 기능이 너무 단순할 수 있음을 강조합니다. 그러나 Celery의 프레임워크를 더 깊이 탐구함으로써 이러한 한계를 극복할 수 있습니다. 테넌트당 자동 재시도를 통해 적절한 비율 제한을 설정하는 방법을 살펴보겠습니다.

테넌트당 속도 제한을 추적하기 위해 Redis를 사용할 것입니다. Redis는 Celery의 인기 있는 데이터베이스이자 브로커이므로 이미 스택에 있을 수 있는 이 구성 요소를 활용해 보겠습니다.

몇 개의 라이브러리를 가져옵니다.

import time
import redis
from celery import Celery, Task

이제 속도 제한 작업을 위한 사용자 정의 기본 작업 클래스를 구현하겠습니다.

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)

class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        # Set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

이 사용자 정의 클래스는 Redis를 사용하여 특정 테넌트가 트리거한 작업의 양을 추적하고 TTL을 10초로 설정합니다. 속도 제한을 초과하면 10초 후에 작업이 다시 시도됩니다. 따라서 기본적으로 기본 속도 제한은 10초 내에 10개 작업입니다.

처리를 에뮬레이션하는 샘플 작업을 정의해 보겠습니다.

@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

여기서 프로세스 작업을 정의했으며 작업 수준에서 custom_rate_limit를 변경할 수 있음을 알 수 있습니다. custom_rate_limit를 지정하지 않으면 기본값인 10이 할당됩니다. 이제 속도 제한이 10초 이내에 5개 작업으로 변경되었습니다.

이제 다양한 테넌트에 대해 몇 가지 작업을 실행해 보겠습니다.

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

테넌트 ID 1에 대해 20개의 작업을 정의하고 테넌트 ID 2에 대해 10개의 작업을 정의합니다.

따라서 전체 코드는 다음과 같습니다.

# app.py
import time
import redis
from celery import Celery, Task

app = Celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=False,
)

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)


class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

작업자를 실행해 보겠습니다.

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

이제 app.py 스크립트를 실행하여 작업을 트리거합니다.

python app.py

보시다시피 작업자는 첫 번째 테넌트의 5개 작업을 처리하고 다른 모든 작업에 대해 재시도를 설정합니다. 그런 다음 두 번째 테넌트의 5개 작업을 가져와서 다른 작업에 대한 재시도를 설정하고 계속 진행됩니다.

이 접근 방식을 사용하면 테넌트당 비율 제한을 정의할 수 있지만 이 예에서 볼 수 있듯이 매우 빠르게 실행되는 작업의 경우 비율 제한을 너무 엄격하게 설정하면 작업자가 한동안 아무것도 하지 않게 됩니다. 속도 제한 매개변수를 미세 조정하는 것은 중요하며 특정 작업 및 볼륨에 따라 다릅니다. 최적의 균형을 찾을 때까지 주저하지 말고 실험해 보세요.

결론

우리는 Celery의 기본 작업 처리가 다중 테넌트 환경에서 어떻게 불공정으로 이어질 수 있는지, 그리고 속도 제한이 이 문제를 해결하는 데 어떻게 도움이 될 수 있는지 살펴보았습니다. 테넌트별 비율 제한을 구현함으로써 단일 테넌트가 리소스를 독점하는 것을 방지하고 처리 능력을 보다 공평하게 분배할 수 있습니다.

이러한 접근 방식은 Celery에서 공정한 처리를 달성하기 위한 견고한 기반을 제공합니다. 그러나 다중 테넌트 애플리케이션에서 작업 처리를 더욱 최적화하기 위해 살펴볼 가치가 있는 다른 기술이 있습니다. 처음에는 하나의 게시물에서 모든 내용을 다루려고 계획했지만 이 주제는 꽤 광범위한 것으로 입증되었습니다! 명확성을 보장하고 이 기사의 초점을 유지하기 위해 기사를 두 부분으로 나누기로 결정했습니다.

이 시리즈의 다음 편에서는 공정성과 효율성을 높이는 또 다른 메커니즘으로 작업 우선순위에 대해 살펴보겠습니다. 이 접근 방식을 사용하면 다양한 기준에 따라 작업에 다양한 우선순위 수준을 할당할 수 있으므로 수요가 많은 기간에도 중요한 작업이 즉시 처리될 수 있습니다.

다음편도 기대해주세요!

위 내용은 셀러리를 통한 공정한 처리 보장 — 1부의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.