首頁  >  文章  >  後端開發  >  詳解Python使用asyncio包處理併發的方法

詳解Python使用asyncio包處理併發的方法

巴扎黑
巴扎黑原創
2017-09-11 10:59:031831瀏覽

這篇文章主要為大家詳細介紹了Python使用asyncio包處理並發的相關資料,具有一定的參考價值,有興趣的小伙伴們可以參考一下

阻塞型I/ O和GIL

CPython 解釋器本身就不是線程安全的,因此有全域解釋器鎖定(GIL),一次只允許使用一個執行緒執行Python 字節碼。因此,一個 Python 進程通常不能同時使用多個 CPU 核心。

然而,標準函式庫中所有執行阻塞型 I/O 操作的函數,在等待作業系統回傳結果時都會釋放GIL。這意味著在 Python 語言這個層次上可以使用多線程,而 I/O 密集型 Python 程式能從中受益:當一個 Python 線程等待網路回應時,阻塞型 I/O 函數會釋放 GIL,再運行一個線程。

asyncio

這個套件使用事件循環驅動的協程實作並發。 asyncio 大量使用 yield from 表達式,因此與Python 舊版不相容。

asyncio 套件使用的「協程」是較嚴格的定義。適合asyncio API 的協程在定義體中必須使用 yield from,而不能使用 yield。此外,適合asyncio 的協程要由呼叫方驅動,並由呼叫方透過yield from 呼叫;

範例1


##

import threading
import asyncio

@asyncio.coroutine
def hello():
  print('Start Hello', threading.currentThread())
  yield from asyncio.sleep(5)
  print('End Hello', threading.currentThread())

@asyncio.coroutine
def world():
  print('Start World', threading.currentThread())
  yield from asyncio.sleep(3)
  print('End World', threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函數標記為協程型別。

asyncio.sleep(3) 建立一個3秒後完成的協程。
loop.run_until_complete(future),執行直到future完成;如果參數是 coroutine object,則需要使用 ensure_future()函式包裝。
loop.close() 關閉事件循環

範例2


#

import asyncio

@asyncio.coroutine
def worker(text):
  """
  协程运行的函数
  :param text:
  :return:
  """
  i = 0
  while True:
    print(text, i)

    try:
      yield from asyncio.sleep(.1)
    except asyncio.CancelledError:
      break

    i += 1


@asyncio.coroutine
def client(text, io_used):
  worker_fu = asyncio.ensure_future(worker(text))

  # 假装等待I/O一段时间
  yield from asyncio.sleep(io_used)

  # 结束运行协程
  worker_fu.cancel()
  return 'done'


loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

解釋:


1. asyncio.ensure_future(coro_or_future, *, loop=None):計畫安排一個coroutine object的執行,回傳一個asyncio.Task object。

2. worker_fu.cancel(): 取消一個協程的執行,拋出CancelledError例外。
3. asyncio.wait():協程的參數是一個由期物或協程構成的可迭代物件; wait 會分別把各個協程包裝進一個 Task 物件。

asyncio.Task 物件與threading.Thread物件的比較

asyncio.Task 物件差不多與 threading.Thread 物件等效。

Task 物件用於驅動協程, Thread 物件用於呼叫可呼叫的物件。
Task 物件不是由自己動手實例化,而是透過把協程傳給 asyncio.ensure_future(…) 函數或loop.create_task(…) 方法來取得。
取得的 Task 物件已經排定了運行時間;Thread 實例則必須呼叫 start 方法,明確告知讓它運行。
如果想要終止任務,可以使用 Task.cancel() 實例方法,在協程內部拋出CancelledError 例外。

執行緒與協程的安全性比較

如果使用執行緒做過重要的編程,因為排程器任何時候都能中斷執行緒。必須記住保留鎖,去保護程式中的重要部分,防止多步驟操作在執行的過程中中斷,防止資料處於無效狀態。

協程預設會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程式的餘下部分運作。對協程來說,無須保留鎖,在多個執行緒之間同步操作,協程本身就會同步,因為在任意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權交還調度程式。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執行清理操作。

Future(期物)

通常自己不應該建立期物,而只能由並發框架(concurrent.futures 或 asyncio)實例化。原因很簡單:期物表示終將發生的事情,而決定某件事會發生的唯一方式是執行的時間已經排定。

asyncio.Future

在asyncio 套件中, BaseEventLoop.create_task(…) 方法接收一個協程,排定它的運行時間,然後傳回一個asyncio. Task 實例-也是asyncio.Future 類別的實例,因為Task 是Future 的子類,用來包裝協程。

asyncio.ensure_future(coro_or_future, *, loop=None)

這個函數統一了協程和期物:第一個參數可以是二者中的任何一個。如果是 Future 或 Task 對象,那就原封不動地回傳。如果是協程,那麼 async 函數會呼叫loop.create_task(…) 方法建立 Task 物件。 loop= 關鍵字參數是可選的,用於傳入事件循環;如果沒有傳入,那麼 async 函數會透過呼叫 asyncio.get_event_loop() 函數來取得循環物件。

BaseEventLoop.create_task(coro)

這個方法排定協程的執行時間,傳回一個 asyncio.Task 物件。

asyncio 套件中有多個函式會自動把參數指定的協程包裝在 asyncio.Task 物件中,例如 BaseEventLoop.run_until_complete(…) 方法。

asyncio.as_completed#

为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。

使用asyncio和aiohttp包

从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。


cc_list = ['China', 'USA']

@asyncio.coroutine
def get_flag(cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  image = yield from resp.read()
  return image

@asyncio.coroutine
def download_one(name): 
  image = yield from get_flag(name) 
  save_flag(image, name.lower() + '.gif')
  return name

loop = asyncio.get_event_loop() 
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) 
res, _ = loop.run_until_complete(wait_coro) 
loop.close()

使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。

上面的脚本为什么会很快

在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。

async和await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

例如:


@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等同于


async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")

网站请求实例


import asyncio
import aiohttp

urls = [
  'http://www.163.com/',
  'http://www.sina.com.cn/',
  'https://www.hupu.com/',
  'http://www.php.cn/'
]


async def get_url_data(u):
  """
  读取url的数据
  :param u:
  :return:
  """
  print('running ', u)
  async with aiohttp.ClientSession() as session:
    async with session.get(u) as resp:
      print(u, resp.status, type(resp.text()))
      # print(await resp.text())

  return resp.headers


async def request_url(u):
  """
  主调度函数
  :param u:
  :return:
  """
  res = await get_url_data(u)
  return res


loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()

print(all_res)

以上是詳解Python使用asyncio包處理併發的方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn