ホームページ  >  記事  >  バックエンド開発  >  Python コルーチンをマスターする: コードの効率とパフォーマンスを向上させる

Python コルーチンをマスターする: コードの効率とパフォーマンスを向上させる

Mary-Kate Olsen
Mary-Kate Olsenオリジナル
2024-11-23 09:21:22839ブラウズ

Mastering Python

Python のコルーチンと構造化同時実行のエキサイティングな世界を探索してみましょう。これらの強力な機能は、同時実行コードの記述方法に革命をもたらし、コードの効率性と管理を容易にしました。

コルーチンは、実行を一時停止して他のコルーチンに制御を渡すことができる特別な関数です。これらは async キーワードを使用して定義され、await キーワードを使用して待機できます。簡単な例を次に示します:

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

async def main():
    await greet("Alice")
    await greet("Bob")

asyncio.run(main())

このコードでは、greet 関数は挨拶を出力し、少し待ってから別れを告げるコルーチンです。 main 関数は、greet を 2 回呼び出し、asyncio.run を使用してメイン コルーチンを実行します。

しかし、コルーチンはなぜ特別なのでしょうか?これらを使用すると、同期コードのように見え、動作する同時コードを作成できますが、実際には複数の操作を同時に実行できます。これは、ネットワーク操作やファイル処理など、I/O バウンドのタスクに特に役立ちます。

Python での非同期プログラミングの基盤を提供する asyncio ライブラリについて詳しく見てみましょう。その中心となるのは、コルーチンの実行を管理するイベント ループです。これは、次にどのコルーチンを実行するかを決定するスケジューラと考えることができます。

asyncio でタスクを作成して使用する方法は次のとおりです。

import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # Simulating network delay
    return f"Data from {url}"

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

この例では、複数の URL から同時にデータを取得することをシミュレートしています。 asyncio.create_task 関数はコルーチンをタスクに変換し、asyncio.gather.

を使用して同時に実行します。

ここで、構造化された同時実行性について話しましょう。これは、同時実行コードをより予測可能にし、推論しやすくすることを目的としたパラダイムです。 Python 3.11 では、タスク グループなど、構造化された同時実行をサポートするためのいくつかの新機能が導入されました。

タスク グループの使用方法は次のとおりです:

import asyncio

async def process_item(item):
    await asyncio.sleep(1)
    return f"Processed {item}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(process_item("A"))
        task2 = tg.create_task(process_item("B"))
        task3 = tg.create_task(process_item("C"))

    print(task1.result())
    print(task2.result())
    print(task3.result())

asyncio.run(main())

TaskGroup は、次に進む前にすべてのタスクが完了 (またはキャンセル) されていることを確認します。これにより、忘れられたタスクや同時操作間の予期せぬ相互作用などの問題を防ぐことができます。

コルーチンの最も強力な側面の 1 つは、I/O 操作を効率的に処理できることです。単純な非同期 Web サーバーの例を見てみましょう:

import asyncio
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def main():
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    print("Server started at http://localhost:8080")
    await asyncio.Event().wait()

asyncio.run(main())

このサーバーは、コルーチンの力のおかげで、複数の接続を同時に処理できます。各リクエストは独自のコルーチンで処理されるため、高負荷時でもサーバーの応答性を維持できます。

さらに高度な概念をいくつか見てみましょう。キャンセルは、同時操作を処理する場合に重要な機能です。場合によっては、タスクが完了する前にタスクを停止する必要があります。その方法は次のとおりです:

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

async def main():
    await greet("Alice")
    await greet("Bob")

asyncio.run(main())

この例では、長時間実行タスクを作成し、5 秒後にキャンセルします。タスクは CancelledError をキャッチし、終了する前に必要なクリーンアップを実行します。

もう 1 つの強力な機能は、カスタム イベント ループを作成する機能です。ほとんどの場合、デフォルトのイベント ループで十分ですが、場合によっては、より詳細な制御が必要になります。カスタム イベント ループの簡単な例を次に示します。

import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # Simulating network delay
    return f"Data from {url}"

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

これは非常に基本的なカスタム イベント ループですが、原理を示しています。これを拡張して、より適切なスケジューリング、監視、他のシステムとの統合などの機能を追加できます。

コルーチンと構造化された同時実行性を使用する場合のベスト プラクティスについて話しましょう。まず、非同期コンテキスト マネージャーを管理するには、常に async with を使用します。これにより、例外が発生した場合でも、適切なセットアップと破棄が保証されます:

import asyncio

async def process_item(item):
    await asyncio.sleep(1)
    return f"Processed {item}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(process_item("A"))
        task2 = tg.create_task(process_item("B"))
        task3 = tg.create_task(process_item("C"))

    print(task1.result())
    print(task2.result())
    print(task3.result())

asyncio.run(main())

2 番目に、ブロック操作には注意してください。 CPU バウンドのタスクを実行する必要がある場合は、asyncio.to_thread を使用して別のスレッドで実行することを検討してください。

import asyncio
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def main():
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()

    print("Server started at http://localhost:8080")
    await asyncio.Event().wait()

asyncio.run(main())

3 番目に、タスクのグループをさらに制御する必要がある場合は、asyncio.wait を使用します。最初のタスクが完了するまで待つことも、タイムアウトを設定することもできます:

import asyncio

async def long_running_task():
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(5)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

同時実行コードのデバッグは困難な場合があります。 Python の asyncio には、いくつかの便利なツールが付属しています。デバッグ モードを有効にして、より詳細なログを取得できます:

import asyncio

class MyEventLoop(asyncio.BaseEventLoop):
    def __init__(self):
        self._running = False
        self._ready = asyncio.Queue()

    def run_forever(self):
        self._running = True
        while self._running:
            coro = self._ready.get_nowait()
            if coro:
                coro.send(None)

    def stop(self):
        self._running = False

    def call_soon(self, callback, *args):
        self._ready.put_nowait(callback(*args))

# Usage
loop = MyEventLoop()
asyncio.set_event_loop(loop)

async def my_coroutine():
    print("Hello from my coroutine!")

loop.call_soon(my_coroutine)
loop.run_forever()

さらに高度なデバッグ機能として aiodebug ライブラリを使用することもできます。

より複雑な例、並列データ処理パイプラインを見てみましょう。これは、大規模なデータセットの処理やストリーミング データの処理などのタスクに役立つ可能性があります:

async with aiohttp.ClientSession() as session:
    async with session.get('http://example.com') as response:
        html = await response.text()

このパイプラインは、キューを使用して、すべてを同時に実行するさまざまな処理段階間でデータを渡す方法を示しています。

コルーチンと構造化された同時実行性は、Python プログラミングに新たな可能性をもたらしました。これらにより、推論と保守が容易な、効率的で同時実行のコードを作成できるようになります。 Web サーバー、データ処理パイプライン、応答性の高い GUI のいずれを構築している場合でも、これらのツールは堅牢で高性能なアプリケーションの作成に役立ちます。

これらの概念を習得する鍵は練習であることを忘れないでください。単純な例から始めて、徐々により複雑な使用例に増やしていきます。エラー処理とキャンセルは信頼性の高い非同期システムを構築するために重要であるため、注意してください。そして、asyncio のソース コードに飛び込むことを恐れないでください。これは、これらの強力な機能が内部でどのように機能するかについての理解を深めるための優れた方法です。

コルーチンと構造化された同時実行性の探索を続けると、コードをより効率的で表現力豊かにする新しいパターンやテクニックが見つかるでしょう。これは Python 開発のエキサイティングな分野であり、継続的に進化しています。学習を続け、実験を続け、非同期プログラミングの世界への旅を楽しんでください!


私たちの作品

私たちの作品をぜひチェックしてください:

インベスターセントラル | スマートな暮らし | エポックとエコー | 不可解な謎 | ヒンドゥーヴァ | エリート開発者 | JS スクール


私たちは中程度です

Tech Koala Insights | エポックズ&エコーズワールド | インベスター・セントラル・メディア | 不可解な謎 中 | 科学とエポックミディアム | 現代ヒンドゥーヴァ

以上がPython コルーチンをマスターする: コードの効率とパフォーマンスを向上させるの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。