Home  >  Article  >  Backend Development  >  Detailed explanation of how Python uses the asyncio package to handle concurrency

Detailed explanation of how Python uses the asyncio package to handle concurrency

巴扎黑
巴扎黑Original
2017-09-11 10:59:031831browse

This article mainly introduces in detail the relevant information on Python's use of the asyncio package to handle concurrency. It has certain reference value. Interested friends can refer to

Blocking I/ O and the GIL

The CPython interpreter itself is not thread-safe, so there is a Global Interpreter Lock (GIL) that only allows one thread to execute Python bytecode at a time. Therefore, a Python process typically cannot use multiple CPU cores simultaneously.

However, all functions in the standard library that perform blocking I/O operations will release the GIL while waiting for the operating system to return results. This means that multi-threading is possible at the Python language level, and I/O-intensive Python programs can benefit from this: while one Python thread waits for a network response, the blocking I/O function releases the GIL and runs another thread.

asyncio

This package uses event loop driven coroutines to achieve concurrency. asyncio makes heavy use of yield from expressions and is therefore incompatible with older versions of Python.

The "coroutine" used by the asyncio package is a stricter definition. Coroutines suitable for the asyncio API must use yield from in the definition body, but cannot use yield. In addition, coroutines suitable for asyncio should be driven by the caller and called by the caller through yield from;

Example 1


import threading
import asyncio

@asyncio.coroutine
def hello():
  print('Start Hello', threading.currentThread())
  yield from asyncio.sleep(5)
  print('End Hello', threading.currentThread())

@asyncio.coroutine
def world():
  print('Start World', threading.currentThread())
  yield from asyncio.sleep(3)
  print('End World', threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine marks the generator function as a coroutine type.
asyncio.sleep(3) creates a coroutine that completes in 3 seconds.
loop.run_until_complete(future), run until the future is completed; if the parameter is a coroutine object, you need to use the ensure_future() function wrapper.
loop.close() Close the event loop

Example 2


import asyncio

@asyncio.coroutine
def worker(text):
  """
  协程运行的函数
  :param text:
  :return:
  """
  i = 0
  while True:
    print(text, i)

    try:
      yield from asyncio.sleep(.1)
    except asyncio.CancelledError:
      break

    i += 1


@asyncio.coroutine
def client(text, io_used):
  worker_fu = asyncio.ensure_future(worker(text))

  # 假装等待I/O一段时间
  yield from asyncio.sleep(io_used)

  # 结束运行协程
  worker_fu.cancel()
  return 'done'


loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

Explanation:

1. asyncio.ensure_future(coro_or_future, *, loop=None): Plans to schedule the execution of a coroutine object and returns an asyncio.Task object.
2. worker_fu.cancel(): Cancel the execution of a coroutine and throw a CancelledError exception.
3. asyncio.wait(): The parameter of the coroutine is an iterable object composed of futures or coroutines; wait will wrap each coroutine into a Task object.

Comparison of asyncio.Task object and threading.Thread object

The asyncio.Task object is almost equivalent to the threading.Thread object.
Task objects are used to drive coroutines, and Thread objects are used to call callable objects.
Task objects are not instantiated by yourself, but are obtained by passing the coroutine to the asyncio.ensure_future(…) function or loop.create_task(…) method.
The obtained Task object has been scheduled to run; the Thread instance must call the start method to explicitly tell it to run.
If you want to terminate the task, you can use the Task.cancel() instance method to throw a CancelledError exception inside the coroutine.

Safety comparison between threads and coroutines

If you have done important programming using threads, the scheduler can interrupt the thread at any time. You must remember to retain locks to protect important parts of the program, prevent multi-step operations from being interrupted during execution, and prevent data from being in an invalid state.

Coroutines will be fully protected by default to prevent interruptions. We have to output explicitly in order for the rest of the program to run. For coroutines, there is no need to retain locks and synchronize operations between multiple threads. The coroutines themselves will be synchronized because only one coroutine is running at any time. When you want to hand over control, you can use yield or yield from to return control to the scheduler. This is why coroutines can be safely canceled: by definition, coroutines can only be canceled at a paused yield, so CancelledError exceptions can be handled and cleanup operations performed.

Future (future)

Normally you should not create futures yourself, but can only be instantiated by the concurrency framework (concurrent.futures or asyncio). The reason is simple: Futures represent something that will eventually happen, and the only way to know for sure that something will happen is if its execution is scheduled.

asyncio.Future

In the asyncio package, the BaseEventLoop.create_task(…) method receives a coroutine, schedules its run time, and returns an asyncio. Task instance - is also an instance of the asyncio.Future class, because Task is a subclass of Future and is used to wrap coroutines.

asyncio.ensure_future(coro_or_future, *, loop=None)

This function unifies coroutines and futures: the first parameter can be either of the two. If it is a Future or Task object, it is returned unchanged. If it is a coroutine, the async function will call the loop.create_task(...) method to create a Task object. The loop= keyword argument is optional and is used to pass in the event loop; if not passed in, the async function will obtain the loop object by calling the asyncio.get_event_loop() function.

BaseEventLoop.create_task(coro)

This method schedules the execution time of the coroutine and returns an asyncio.Task object.

There are multiple functions in the asyncio package that automatically wrap the coroutine specified by the parameter in an asyncio.Task object, such as the BaseEventLoop.run_until_complete(…) method.

asyncio.as_completed

为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。

使用asyncio和aiohttp包

从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。


cc_list = ['China', 'USA']

@asyncio.coroutine
def get_flag(cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  image = yield from resp.read()
  return image

@asyncio.coroutine
def download_one(name): 
  image = yield from get_flag(name) 
  save_flag(image, name.lower() + '.gif')
  return name

loop = asyncio.get_event_loop() 
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) 
res, _ = loop.run_until_complete(wait_coro) 
loop.close()

使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。

上面的脚本为什么会很快

在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。

async和await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

例如:


@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等同于


async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")

网站请求实例


import asyncio
import aiohttp

urls = [
  'http://www.163.com/',
  'http://www.sina.com.cn/',
  'https://www.hupu.com/',
  'http://www.php.cn/'
]


async def get_url_data(u):
  """
  读取url的数据
  :param u:
  :return:
  """
  print('running ', u)
  async with aiohttp.ClientSession() as session:
    async with session.get(u) as resp:
      print(u, resp.status, type(resp.text()))
      # print(await resp.text())

  return resp.headers


async def request_url(u):
  """
  主调度函数
  :param u:
  :return:
  """
  res = await get_url_data(u)
  return res


loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()

print(all_res)

The above is the detailed content of Detailed explanation of how Python uses the asyncio package to handle concurrency. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn