>백엔드 개발 >파이썬 튜토리얼 >Python이 asyncio 패키지를 사용하여 동시성을 처리하는 방법에 대한 자세한 설명

Python이 asyncio 패키지를 사용하여 동시성을 처리하는 방법에 대한 자세한 설명

巴扎黑
巴扎黑원래의
2017-09-11 10:59:031902검색

이 글은 동시성을 처리하기 위해 Python의 asyncio 패키지 사용에 대한 관련 정보를 주로 소개합니다. 관심 있는 친구들이 참고할 수 있습니다.

Blocking I/O 및 GIL

CPython 설명 인터프리터 자체 스레드로부터 안전하지 않으므로 한 번에 하나의 스레드만 Python 바이트코드를 실행할 수 있도록 허용하는 GIL(Global Interpreter Lock)이 있습니다. 따라서 Python 프로세스는 일반적으로 여러 CPU 코어를 동시에 사용할 수 없습니다.

그러나 블로킹 I/O 작업을 수행하는 표준 라이브러리의 모든 함수는 운영 체제가 결과를 반환할 때까지 기다리는 동안 GIL을 해제합니다. 이는 Python 언어 수준에서 멀티스레딩이 가능하고 I/O 집약적인 Python 프로그램이 이점을 누릴 수 있음을 의미합니다. 하나의 Python 스레드가 네트워크 응답을 기다리는 동안 차단 I/O 함수는 GIL을 해제하고 다른 스레드를 실행합니다. .

asyncio

이 패키지는 이벤트 루프 기반 코루틴을 사용하여 동시성을 달성합니다. asyncio는 표현식의 항복을 많이 사용하므로 이전 버전의 Python과 호환되지 않습니다.

asyncio 패키지에서 사용되는 "코루틴"은 더 엄격한 정의입니다. asyncio API에 적합한 코루틴은 정의 본문에서 항복을 사용해야 하지만, 항복을 사용할 수는 없습니다. 또한 asyncio에 적합한 코루틴은 호출자가 구동해야 하며

예제 1


import threading
import asyncio

@asyncio.coroutine
def hello():
  print('Start Hello', threading.currentThread())
  yield from asyncio.sleep(5)
  print('End Hello', threading.currentThread())

@asyncio.coroutine
def world():
  print('Start World', threading.currentThread())
  yield from asyncio.sleep(3)
  print('End World', threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine에서는 생성기 함수를 코루틴 유형으로 표시합니다.
asyncio.sleep(3)은 3초 안에 완료되는 코루틴을 생성합니다.
loop.run_until_complete(future), future가 완료될 때까지 실행합니다. 매개변수가 코루틴 객체인 경우 verify_future() 함수 래퍼를 사용해야 합니다.
loop.close()는 이벤트 루프를 닫습니다

예제 2


import asyncio

@asyncio.coroutine
def worker(text):
  """
  协程运行的函数
  :param text:
  :return:
  """
  i = 0
  while True:
    print(text, i)

    try:
      yield from asyncio.sleep(.1)
    except asyncio.CancelledError:
      break

    i += 1


@asyncio.coroutine
def client(text, io_used):
  worker_fu = asyncio.ensure_future(worker(text))

  # 假装等待I/O一段时间
  yield from asyncio.sleep(io_used)

  # 结束运行协程
  worker_fu.cancel()
  return 'done'


loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

설명:

1. asyncio.ensure_future(coro_or_future, *, loop=None): 코루틴 객체, asyncio.Task 객체를 반환합니다.
2.worker_fu.cancel(): 코루틴 실행을 취소하고 CancelledError 예외를 발생시킵니다.
3. asyncio.wait(): 코루틴의 매개변수는 future 또는 코루틴으로 구성된 반복 가능한 개체입니다. wait는 각 코루틴을 Task 개체로 래핑합니다.

asyncio.Task 개체와 threading.Thread 개체 비교

asyncio.Task 개체는 threading.Thread 개체와 거의 동일합니다.
Task 객체는 코루틴을 구동하는 데 사용되고 Thread 객체는 호출 가능한 객체를 호출하는 데 사용됩니다.
Task 객체는 스스로 인스턴스화되지 않고 코루틴을 asyncio.ensure_future(…) 함수 또는 loop.create_task(…) 메서드에 전달하여 얻습니다.
획득된 Task 객체는 실행되도록 예약되었습니다. Thread 인스턴스는 명시적으로 실행되도록 지시하기 위해 start 메서드를 호출해야 합니다.
작업을 종료하려면 Task.cancel() 인스턴스 메서드를 사용하여 코루틴 내에서 CancelledError 예외를 발생시킬 수 있습니다.

스레드와 코루틴의 안전성 비교

스레드를 사용하여 중요한 프로그래밍을 했다면 스케줄러가 언제든지 스레드를 중단할 수 있기 때문입니다. 프로그램의 중요한 부분을 보호하고, 실행 중에 다단계 작업이 중단되는 것을 방지하고, 데이터가 유효하지 않은 상태에 있는 것을 방지하려면 잠금을 유지해야 합니다.

코루틴은 중단을 방지하기 위해 기본적으로 완벽하게 보호됩니다. 프로그램의 나머지 부분을 실행하려면 명시적으로 출력해야 합니다. 코루틴의 경우 잠금을 유지하고 여러 스레드 간에 작업을 동기화할 필요가 없습니다. 언제든지 하나의 코루틴만 실행되기 때문에 코루틴 자체는 동기화됩니다. 제어권을 넘겨주고 싶을 때, 항복(yield) 또는 항복(yield from)을 사용하여 제어권을 스케줄러에 반환할 수 있습니다. 이것이 바로 코루틴을 안전하게 취소할 수 있는 이유입니다. 정의에 따라 코루틴은 일시 중지된 결과에서만 취소될 수 있으므로 CancelledError 예외를 처리하고 정리 작업을 수행할 수 있습니다.

Future(미래)

일반적으로 future를 직접 생성해서는 안 되며 동시성 프레임워크(concurrent.futures 또는 asyncio)를 통해서만 인스턴스화할 수 있습니다. 그 이유는 간단합니다. 선물은 결국 일어날 일을 나타내며, 어떤 일이 일어날지 확실히 알 수 있는 유일한 방법은 실행이 예약되어 있는지 여부입니다.

asyncio.Future

asyncio 패키지에서 BaseEventLoop.create_task(…) 메서드는 코루틴을 수신하고, 실행 시간을 예약하고, asyncio.Future 클래스의 인스턴스이기도 한 asyncio.Task 인스턴스를 반환합니다. Task는 Future의 하위 클래스이기 때문에 코루틴을 래핑하는 데 사용됩니다.

asyncio.ensure_future(coro_or_future, *, loop=None)

이 함수는 코루틴과 퓨처를 통합합니다. 첫 번째 매개변수는 둘 중 하나일 수 있습니다. Future 또는 Task 객체인 경우 변경되지 않은 채 반환됩니다. 코루틴인 경우 비동기 함수는 loop.create_task(…) 메서드를 호출하여 Task 객체를 생성합니다. loop= 키워드 인수는 선택 사항이며 이벤트 루프를 전달하는 데 사용됩니다. 전달되지 않은 경우 비동기 함수는 asyncio.get_event_loop() 함수를 호출하여 루프 객체를 얻습니다.

BaseEventLoop.create_task(coro)

이 메서드는 코루틴의 실행 시간을 예약하고 asyncio.Task 개체를 반환합니다.

asyncio 패키지에는 BaseEventLoop.run_until_complete(…) 메서드와 같이 asyncio.Task 객체의 매개변수로 지정된 코루틴을 자동으로 래핑하는 여러 함수가 있습니다.

asyncio.as_completed

为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。

使用asyncio和aiohttp包

从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。


cc_list = ['China', 'USA']

@asyncio.coroutine
def get_flag(cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  image = yield from resp.read()
  return image

@asyncio.coroutine
def download_one(name): 
  image = yield from get_flag(name) 
  save_flag(image, name.lower() + '.gif')
  return name

loop = asyncio.get_event_loop() 
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) 
res, _ = loop.run_until_complete(wait_coro) 
loop.close()

使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。

上面的脚本为什么会很快

在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。

async和await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

例如:


@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等同于


async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")

网站请求实例


import asyncio
import aiohttp

urls = [
  'http://www.163.com/',
  'http://www.sina.com.cn/',
  'https://www.hupu.com/',
  'http://www.php.cn/'
]


async def get_url_data(u):
  """
  读取url的数据
  :param u:
  :return:
  """
  print('running ', u)
  async with aiohttp.ClientSession() as session:
    async with session.get(u) as resp:
      print(u, resp.status, type(resp.text()))
      # print(await resp.text())

  return resp.headers


async def request_url(u):
  """
  主调度函数
  :param u:
  :return:
  """
  res = await get_url_data(u)
  return res


loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()

print(all_res)

위 내용은 Python이 asyncio 패키지를 사용하여 동시성을 처리하는 방법에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.