Rumah >pembangunan bahagian belakang >Tutorial Python >Memastikan Pemprosesan Adil dengan Saderi — Bahagian I

Memastikan Pemprosesan Adil dengan Saderi — Bahagian I

Mary-Kate Olsen
Mary-Kate Olsenasal
2024-11-16 09:10:03683semak imbas

Ensuring Fair Processing with Celery — Part I

Jika anda biasa dengan Python, kemungkinan besar anda pernah mendengar tentang Saderi. Ia selalunya menjadi pilihan utama untuk mengendalikan tugas secara tidak segerak, seperti pemprosesan imej atau menghantar e-mel.

Bercakap dengan beberapa orang, saya mula perasan bahawa ramai pembangun mendapati Celery mengagumkan pada mulanya, tetapi apabila skala dan kerumitan projek mereka meningkat, keterujaan mereka mula pudar. Walaupun sesetengah orang berpindah dari Celery atas sebab yang sah, yang lain mungkin tidak meneroka intinya dengan cukup mendalam untuk menyesuaikannya dengan keperluan mereka.

Dalam blog ini, saya ingin membincangkan salah satu sebab mengapa sesetengah pembangun mula mencari alternatif atau membina rangka kerja latar belakang tersuai: pemprosesan yang adil. Dalam persekitaran di mana pengguna/penyewa menyerahkan tugas dengan saiz yang berbeza-beza, risiko beban kerja berat seorang penyewa menjejaskan orang lain boleh mewujudkan kesesakan dan membawa kepada kekecewaan.

Saya akan membimbing anda melalui strategi untuk melaksanakan pemprosesan adil dalam Saderi, memastikan pengagihan tugas yang seimbang supaya tiada penyewa boleh menguasai sumber anda.

Masalahnya

Mari kita selami cabaran biasa yang dihadapi oleh aplikasi berbilang penyewa, terutamanya yang mengendalikan pemprosesan kelompok. Bayangkan anda mempunyai sistem di mana pengguna boleh beratur untuk tugas pemprosesan imej mereka, membolehkan mereka menerima imej yang diproses selepas menunggu sebentar. Persediaan ini bukan sahaja memastikan API anda responsif tetapi juga membolehkan anda mengukur pekerja anda mengikut keperluan untuk mengendalikan beban dengan cekap.

Semuanya berjalan lancar—sehingga seorang penyewa memutuskan untuk menyerahkan sejumlah besar imej untuk diproses. Anda mempunyai berbilang pekerja, malah mereka boleh berskala automatik untuk menampung permintaan yang meningkat, jadi anda berasa yakin tentang infrastruktur anda. Walau bagaimanapun, masalah bermula apabila penyewa lain cuba beratur dalam kelompok yang lebih kecil—mungkin hanya beberapa imej—dan tiba-tiba mendapati diri mereka menghadapi masa menunggu yang lama tanpa sebarang kemas kini. Sebelum anda sedar, tiket sokongan mula membanjiri, dengan pengguna mengadu bahawa perkhidmatan anda lambat atau tidak bertindak balas.

Senario ini terlalu biasa kerana Celery, secara lalai, memproses tugas mengikut urutan yang diterima. Apabila seorang penyewa membebankan pekerja anda dengan kemasukan tugasan yang besar, strategi penskalaan automatik yang terbaik mungkin tidak mencukupi untuk mengelakkan kelewatan penyewa lain. Akibatnya, pengguna tersebut mungkin mengalami tahap perkhidmatan yang tidak mencapai apa yang dijanjikan atau dijangkakan.

Mengehadkan Kadar dengan Saderi

Satu strategi berkesan untuk memastikan pemprosesan yang adil adalah dengan melaksanakan had kadar. Ia membolehkan anda mengawal bilangan tugas yang boleh diserahkan oleh setiap penyewa dalam jangka masa tertentu. Ini menghalang mana-mana penyewa tunggal daripada memonopoli pekerja anda dan memastikan semua penyewa mempunyai peluang yang saksama untuk memproses tugas mereka.

Saderi mempunyai fungsi terbina dalam untuk mengehadkan kadar pada tahap tugas:

# app.py
from celery import Celery

app = Celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # Limit to 10 tasks per minute
def process_data(data):
    print(f"Processing data: {data}")

# Call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

Anda boleh menjalankan pekerja dengan melaksanakan:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Sekarang, jalankan skrip app.py untuk mencetuskan 20 tugas:

python app.py

Jika anda berjaya menjalankannya secara tempatan, anda akan mendapati terdapat kelewatan antara setiap tugasan untuk memastikan had kadar dikuatkuasakan. Sekarang anda mungkin berfikir bahawa ini tidak benar-benar membantu kami dengan masalah kami, dan anda betul sekali. Had kadar terbina dalam oleh Celery ini berguna untuk senario di mana tugas kami mungkin melibatkan panggilan ke perkhidmatan luar yang mempunyai had kadar yang ketat.

Contoh ini menyerlahkan cara ciri terbina dalam mungkin terlalu mudah untuk senario yang rumit. Walau bagaimanapun, kita boleh mengatasi had ini dengan meneroka rangka kerja Celery dengan lebih mendalam. Mari lihat bagaimana kita boleh menetapkan had kadar yang betul dengan auto-cuba semula setiap penyewa.

Kami akan menggunakan Redis untuk menjejaki had kadar setiap penyewa. Redis ialah pangkalan data dan broker yang popular untuk Celery, jadi mari kita manfaatkan komponen ini yang mungkin sudah ada dalam timbunan anda.

Jom import beberapa perpustakaan:

import time
import redis
from celery import Celery, Task

Sekarang kami akan melaksanakan kelas tugas asas tersuai untuk tugasan terhad kadar kami:

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)

class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        # Set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

Kelas tersuai ini akan menjejaki jumlah tugas yang dicetuskan oleh penyewa tertentu menggunakan Redis dan menetapkan TTL selama 10 saat. Jika melebihi had kadar, tugasan akan dicuba semula dalam masa 10 saat. Jadi pada asasnya had kadar lalai kami ialah 10 tugasan dalam masa 10 saat.

Mari kita tentukan contoh tugasan yang mencontohi pemprosesan:

@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

Di sini kami telah menentukan tugasan proses dan anda dapat melihat bahawa saya boleh menukar had_kadar_suai pada tahap tugasan. Jika kami tidak menentukan had_kadar_suai, nilai lalai 10 akan diberikan. Kini had kadar kami telah bertukar kepada 5 tugasan dalam masa 10 saat.

Mari kita cetuskan beberapa tugasan untuk penyewa yang berbeza:

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Kami mentakrifkan 20 tugasan untuk ID penyewa 1 dan 10 tugasan untuk ID penyewa 2.

Jadi kod lengkap kami akan kelihatan seperti ini:

# app.py
import time
import redis
from celery import Celery, Task

app = Celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=False,
)

# Initialize a Redis client
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)


class RateLimitedTask(Task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # Rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # Increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # Set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=RateLimitedTask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    Mock processing task that takes 0.3 seconds to complete.
    """
    print(f"Processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

Mari kita jalankan pekerja kita:

celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

Sekarang, jalankan skrip app.py untuk mencetuskan tugas:

python app.py

Seperti yang anda lihat, pekerja memproses 5 tugasan penyewa pertama dan menyediakan percubaan semula untuk semua tugasan lain. Ia kemudian mengambil 5 tugasan penyewa kedua dan menyediakan percubaan semula untuk tugasan lain, dan tugasan itu diteruskan.

Pendekatan ini membolehkan anda mentakrifkan had kadar bagi setiap penyewa tetapi seperti yang anda lihat dalam contoh kami, untuk tugasan yang berjalan sangat pantas, terlalu ketat dengan had kadar akhirnya menyebabkan pekerja tidak melakukan apa-apa untuk seketika. Penalaan halus parameter had kadar adalah penting dan bergantung pada tugas dan volum tertentu. Jangan teragak-agak untuk mencuba sehingga anda menemui keseimbangan yang optimum.

Kesimpulan

Kami telah meneroka cara pemprosesan tugas lalai Celery boleh membawa kepada ketidakadilan dalam persekitaran berbilang penyewa dan cara pengehadan kadar boleh membantu menangani isu ini. Dengan melaksanakan had kadar khusus penyewa, kami boleh menghalang mana-mana penyewa tunggal daripada memonopoli sumber dan memastikan pengagihan kuasa pemprosesan yang lebih saksama.

Pendekatan ini menyediakan asas yang kukuh untuk mencapai pemprosesan yang adil dalam Saderi. Walau bagaimanapun, terdapat teknik lain yang patut diterokai untuk mengoptimumkan lagi pengendalian tugas dalam aplikasi berbilang penyewa. Walaupun saya pada mulanya bercadang untuk mengupas semuanya dalam satu siaran, topik ini terbukti agak meluas! Untuk memastikan kejelasan dan memastikan artikel ini fokus, saya telah memutuskan untuk membahagikannya kepada dua bahagian.

Dalam bahagian seterusnya siri ini, kami akan menyelidiki keutamaan tugas sebagai satu lagi mekanisme untuk meningkatkan keadilan dan kecekapan. Pendekatan ini membolehkan anda menetapkan tahap keutamaan yang berbeza kepada tugasan berdasarkan kriteria yang berbeza, memastikan tugas kritikal diproses dengan segera walaupun dalam tempoh permintaan tinggi.

Nantikan ansuran seterusnya!

Atas ialah kandungan terperinci Memastikan Pemprosesan Adil dengan Saderi — Bahagian I. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn