Heim > Artikel > Backend-Entwicklung > Ausführliche Erklärung, wie Python das Asyncio-Paket verwendet, um Parallelität zu handhaben
Dieser Artikel stellt hauptsächlich die relevanten Informationen über die Verwendung des Asyncio-Pakets zur Handhabung der Parallelität vor. Interessierte Freunde können sich auf
Blockieren von E/A und der GIL
Der CPython-Interpreter selbst ist nicht threadsicher, daher gibt es eine Global Interpreter Lock (GIL), die es jeweils nur einem Thread erlaubt, Python-Bytecode auszuführen. Daher kann ein Python-Prozess normalerweise nicht mehrere CPU-Kerne gleichzeitig verwenden. Allerdings geben alle Funktionen in der Standardbibliothek, die blockierende E/A-Vorgänge ausführen, die GIL frei, während sie darauf warten, dass das Betriebssystem ein Ergebnis zurückgibt. Das bedeutet, dass Multithreading auf Python-Sprachebene möglich ist und I/O-intensive Python-Programme davon profitieren können: Während ein Python-Thread auf eine Netzwerkantwort wartet, gibt die blockierende I/O-Funktion die GIL frei und führt einen anderen Thread aus.asyncio
Dieses Paket verwendet ereignisschleifengesteuerte Coroutinen, um Parallelität zu implementieren. asyncio nutzt in großem Umfang Yield-from-Ausdrücke und ist daher nicht mit älteren Versionen von Python kompatibel. Die vom Asyncio-Paket verwendete „Coroutine“ ist eine strengere Definition. Coroutinen, die für die Asyncio-API geeignet sind, müssen yield from im Definitionstext verwenden, können jedoch yield nicht verwenden. Darüber hinaus sollten für Asyncio geeignete Coroutinen vom Aufrufer gesteuert und vom Aufrufer über yield from aufgerufen werden >@asyncio.coroutine markiert die Generatorfunktion als Coroutine-Typ.
asyncio.sleep(3) erstellt eine Coroutine, die in 3 Sekunden abgeschlossen ist.loop.run_until_complete(future), wird ausgeführt, bis die Zukunft abgeschlossen ist. Wenn der Parameter ein Coroutine-Objekt ist, müssen Sie den Funktionswrapper „sure_future()“ verwenden. loop.close() schließt die Ereignisschleife
Beispiel 2
import threading import asyncio @asyncio.coroutine def hello(): print('Start Hello', threading.currentThread()) yield from asyncio.sleep(5) print('End Hello', threading.currentThread()) @asyncio.coroutine def world(): print('Start World', threading.currentThread()) yield from asyncio.sleep(3) print('End World', threading.currentThread()) # 获取EventLoop: loop = asyncio.get_event_loop() tasks = [hello(), world()] # 执行coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Erklärung:
1. asyncio.ensure_future(coro_or_future, *, loop=None): Planen Sie die Ausführung eines Coroutine-Objekts und geben Sie ein asyncio.Task-Objekt zurück. 2. worker_fu.cancel(): Brechen Sie die Ausführung einer Coroutine ab und lösen Sie eine CancelledError-Ausnahme aus.
3. asyncio.wait(): Der Parameter der Coroutine ist ein iterierbares Objekt, das aus Futures oder Coroutinen besteht. Wait verpackt jede Coroutine in ein Task-Objekt.import asyncio @asyncio.coroutine def worker(text): """ 协程运行的函数 :param text: :return: """ i = 0 while True: print(text, i) try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break i += 1 @asyncio.coroutine def client(text, io_used): worker_fu = asyncio.ensure_future(worker(text)) # 假装等待I/O一段时间 yield from asyncio.sleep(io_used) # 结束运行协程 worker_fu.cancel() return 'done' loop = asyncio.get_event_loop() tasks = [client('xiaozhe', 3), client('zzzz', 5)] result = loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('Answer:', result)
Asyncio.Task-Objekte sind fast gleichbedeutend mit threading.Thread-Objekten.
Aufgabenobjekte werden nicht von Ihnen selbst instanziiert, sondern durch Übergabe der Coroutine an die Funktion asyncio.ensure_future(…) oder die Methode loop.create_task(…) erhalten.
Die Ausführung des erhaltenen Task-Objekts wurde geplant; die Thread-Instanz muss die Startmethode aufrufen, um die Ausführung explizit anzuweisen.
Wenn Sie die Aufgabe beenden möchten, können Sie die Instanzmethode Task.cancel() verwenden, um eine CancelledError-Ausnahme innerhalb der Coroutine auszulösen.
Sicherheitsvergleich von Threads und Coroutinen
Wenn Sie wichtige Programmierungen mit Threads durchgeführt haben, kann der Scheduler den Thread jederzeit unterbrechen. Sie müssen daran denken, Sperren beizubehalten, um wichtige Teile des Programms zu schützen, zu verhindern, dass mehrstufige Vorgänge während der Ausführung unterbrochen werden, und um zu verhindern, dass sich Daten in einem ungültigen Zustand befinden.
Coroutinen werden standardmäßig vollständig geschützt, um Unterbrechungen zu verhindern. Wir müssen explizit ausgeben, damit der Rest des Programms ausgeführt werden kann. Bei Coroutinen besteht keine Notwendigkeit, Sperren beizubehalten und Vorgänge zwischen mehreren Threads zu synchronisieren. Die Coroutinen selbst werden synchronisiert, da jeweils nur eine Coroutine ausgeführt wird. Wenn Sie die Kontrolle abgeben möchten, können Sie yield oder yield from verwenden, um die Kontrolle an den Scheduler zurückzugeben. Aus diesem Grund können Coroutinen sicher abgebrochen werden: Per Definition können Coroutinen nur bei pausierter Ausbeute abgebrochen werden, sodass CancelledError-Ausnahmen behandelt und Bereinigungsvorgänge durchgeführt werden können.
Normalerweise sollte man Futures nicht selbst erstellen, sondern kann nur durch das Concurrency Framework (concurrent.futures oder asyncio) instanziiert werden. Der Grund ist einfach: Futures repräsentieren etwas, das irgendwann passieren wird, und die einzige Möglichkeit, sicher zu wissen, dass etwas passieren wird, besteht darin, seine Ausführung zu planen.
asyncio.FutureIm Asyncio-Paket empfängt die BaseEventLoop.create_task(…)-Methode eine Coroutine, plant ihre Laufzeit und gibt eine Asyncio-Task-Instanz zurück. ist auch eine Instanz der asyncio.Future-Klasse, da Task eine Unterklasse von Future ist und zum Umschließen von Coroutinen verwendet wird. asyncio.ensure_future(coro_or_future, *, loop=None)
Diese Funktion vereinheitlicht Coroutinen und Futures: Der erste Parameter kann einer von beiden sein. Wenn es sich um ein Future- oder Task-Objekt handelt, wird es unverändert zurückgegeben. Wenn es sich um eine Coroutine handelt, ruft die asynchrone Funktion die Methode loop.create_task(…) auf, um ein Task-Objekt zu erstellen. Das Schlüsselwortargument „loop=“ ist optional und wird zur Übergabe der Ereignisschleife verwendet. Wenn es nicht übergeben wird, ruft die async-Funktion das Schleifenobjekt durch Aufrufen der Funktion asyncio.get_event_loop() ab.
BaseEventLoop.create_task(coro)Diese Methode plant die Ausführungszeit der Coroutine und gibt ein asyncio.Task-Objekt zurück.
Das Asyncio-Paket enthält mehrere Funktionen, die die durch den Parameter angegebene Coroutine automatisch in ein asyncio.Task-Objekt einschließen, wie beispielsweise die Methode BaseEventLoop.run_until_complete(…).
asyncio.as_completed为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。
使用asyncio和aiohttp包
从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。
cc_list = ['China', 'USA'] @asyncio.coroutine def get_flag(cc): url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) image = yield from resp.read() return image @asyncio.coroutine def download_one(name): image = yield from get_flag(name) save_flag(image, name.lower() + '.gif') return name loop = asyncio.get_event_loop() wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) res, _ = loop.run_until_complete(wait_coro) loop.close()
使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。
避免阻塞型调用
有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用
多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。
把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。
上面的脚本为什么会很快
在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。
asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。
async和await
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await
例如:
@asyncio.coroutine def hello(): print("Hello world!") r = yield from asyncio.sleep(1) print("Hello again!")
等同于
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
网站请求实例
import asyncio import aiohttp urls = [ 'http://www.163.com/', 'http://www.sina.com.cn/', 'https://www.hupu.com/', 'http://www.php.cn/' ] async def get_url_data(u): """ 读取url的数据 :param u: :return: """ print('running ', u) async with aiohttp.ClientSession() as session: async with session.get(u) as resp: print(u, resp.status, type(resp.text())) # print(await resp.text()) return resp.headers async def request_url(u): """ 主调度函数 :param u: :return: """ res = await get_url_data(u) return res loop = asyncio.get_event_loop() task_lists = asyncio.wait([request_url(u) for u in urls]) all_res, _ = loop.run_until_complete(task_lists) loop.close() print(all_res)
Das obige ist der detaillierte Inhalt vonAusführliche Erklärung, wie Python das Asyncio-Paket verwendet, um Parallelität zu handhaben. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!