ホームページ  >  記事  >  バックエンド開発  >  Python Asyncio のスケジューリング原理とは何ですか

Python Asyncio のスケジューリング原理とは何ですか

王林
王林転載
2023-05-20 14:31:121120ブラウズ

1. 基本的な紹介

Python.Asyncio は、多くの関数を含む大規模で包括的なライブラリです。3 つの待機可能オブジェクトに加えて、コア スケジューリングに関連するロジックは次のとおりです。その他の関数は、runners.pybase_event.py、および event.py の 3 つのファイルにあります。

runners.pyこのファイルには 1 つのメイン クラス --Runner があります。その主な役割は、イベント ループを実行してコルーチン モードに入り、初期化を待つことです。コルーチン モードを終了するときに、メモリ内に残っているコルーチン、ジェネレーター、その他のオブジェクトをクリーンアップします。

コルーチン モードは、理解を容易にするためのものであり、コンピューターの場合、そのような区別はありません。

event.py加えて、 storage EventLoop オブジェクトのインターフェイスと EventLoop を取得および設定する関数に加えて、2 つの EventLoop スケジュール可能なオブジェクト、つまり Handler# があります。 ## と TimerHandler は、他のオブジェクトを呼び出す EvnetLoop コンテナとして考えることができ、スケジュールされるオブジェクトとイベント ループの間の関係を接続するために使用されますが、その実装はHandler, の場合は非常に単純です。そのソース コードは次のとおりです。

# 已经移除了一些不想关的代码
class Handle:
    def __init__(self, callback, args, loop, context=None):
        # 初始化上下文,确保执行的时候能找到Handle所在的上下文
        if context is None:
            context = contextvars.copy_context()
        self._context = context
        self._loop = loop
        self._callback = callback
        self._args = args
        self._cancelled = False

    def cancel(self):
        # 设置当前Handle为取消状态
        if not self._cancelled:
            self._cancelled = True
            self._callback = None
            self._args = None
    def cancelled(self):
        return self._cancelled
    def _run(self):
        # 用于执行真正的函数,且通过context.run方法来确保在自己的上下文内执行。
        try:
            # 保持在自己持有的上下文中执行对应的回调
            self._context.run(self._callback, *self._args)
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            cb = format_helpers._format_callback_source(
                self._callback, self._args)
            msg = f'Exception in callback {cb}'
            context = {
                'message': msg,
                'exception': exc,
                'handle': self,
            }
            self._loop.call_exception_handler(context)

ソース コードから、

Handle 関数が次のとおりであることがわかります。は非常にシンプルで、キャンセルでき、現在のコンテキストで使用できるコンテキストを提供します。実行関数と TimerHandleHandle から継承し、関連するパラメータがさらにいくつかあります。 Handle よりも時間と並べ替えが容易です。ソース コードは次のとおりです:

class TimerHandle(Handle):
    def __init__(self, when, callback, args, loop, context=None):
        super().__init__(callback, args, loop, context)
        self._when = when
        self._scheduled = False
    def __hash__(self):
        return hash(self._when)
    def __lt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when
        return NotImplemented
    def __le__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when or self.__eq__(other)
        return NotImplemented
    def __gt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when
        return NotImplemented
    def __ge__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when or self.__eq__(other)
        return NotImplemented
    def __eq__(self, other):
        if isinstance(other, TimerHandle):
            return (self._when == other._when and
                    self._callback == other._callback and
                    self._args == other._args and
                    self._cancelled == other._cancelled)
        return NotImplemented
    def cancel(self):
        if not self._cancelled:
            # 用于通知事件循环当前Handle已经退出了
            self._loop._timer_handle_cancelled(self)
        super().cancel()
    def when(self):
        return self._when

コードを通して、これら 2 つのオブジェクトが非常に単純であることがわかります。

、これら 2 つのオブジェクトを直接使用するのではなく、loop.call_xxx シリーズ メソッドを使用して呼び出しを Handle オブジェクトにカプセル化し、EventLoop を待ちます。 が実行されます。したがって、loop.call_xxx 一連のメソッドは、EventLoop の登録操作とみなすことができます。基本的に、すべての非 IO 非同期操作は、loop.call_xxx を介して自身を渡す必要があります。 メソッド。呼び出しは EventLoop に登録されます。たとえば、Task オブジェクトは、loop.call_soon# を呼び出すことによって EventLoop に登録されます。 ## 初期化後のメソッドです。loop.call_sonn の実装は非常に簡単です。 そのソース コードは次のとおりです。

call_soon

が実際に関連していることを確認してください コードは数行しかありませんが、呼び出しを Handle にカプセル化し、それを

self._reday

に追加する役割を果たします。 、これにより、呼び出しがイベント ループに登録されます。 loop.call_xxx loop.call_soon 一連の関数に加えて、他に 2 つのメソッド (

loop.call_at

と # #) があります。 #loop.call_laterloop.call_soon に似ていますが、EventLoop を呼び出すことができる時間を指定する追加の時間パラメーターがあります。 ## を通じて #loop.call_at および loop.call_later によって登録された呼び出しは、Python のヒープ ソート モジュールを通じて self._scheduled## に登録されますheadpq #変数内の 具体的なコードは次のとおりです。

class BaseEventLoop:
    ...
    def call_soon(self, callback, *args, context=None):
        # 检查是否事件循环是否关闭,如果是则直接抛出异常
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        return handle

   def _call_soon(self, callback, args, context):
        # 把调用封装成一个handle,这样方便被事件循环调用
        handle = events.Handle(callback, args, self, context)
        # 添加一个handle到_ready,等待被调用
        self._ready.append(handle)
        return handle
2.EventLoop の実装のスケジューリングIn記事「Python Asyncio のコルーチン、タスク、将来」「待機可能なオブジェクトの関係と機能」で、runner

loop.run_until_complete を通じて mainTask を呼び出すことが分析されています。

を使用して

EventLoop

Scheduling を開くため、

EventLoop のスケジューリングを分析する場合は、loop.run_until_complete から開始する必要があります。 の対応するソース コード は次のとおりです:

class BaseEventLoop:
    ...
    def call_later(self, delay, callback, *args, context=None):
        if delay is None:
            raise TypeError(&#39;delay must not be None&#39;)
        timer = self.call_at(self.time() + delay, callback, *args, context=context)
        return timer

    def call_at(self, when, callback, *args, context=None):
        if when is None:
            raise TypeError("when cannot be None")
        self._check_closed()
        # 创建一个timer handle,然后添加到事件循环的_scheduled中,等待被调用
        timer = events.TimerHandle(when, callback, args, self, context)
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer
このソース コードは複雑ではありません。主なロジックは、CorotinueTask

オブジェクトに変換することです。

loop.call_sonn メソッドはそれ自体を EventLoop

に登録し、最後に ## のループ コードを実行します。 #loop.run_forever

until _stoppingTrue:<pre class="brush:py;">class BaseEventLoop: def run_until_complete(self, future): ... new_task = not futures.isfuture(future) # 把coroutine转换成task,这样事件循环就可以调度了,事件循环的最小调度单位为task # 需要注意的是此时事件循环并没注册到全局变量中,所以需要显示的传进去, # 同时Task对象注册的时候,已经通过loop.call_soon把自己注册到事件循环中,等待调度 future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn&amp;#39;t complete, so there # is no need to log the &quot;destroy pending task&quot; message future._log_destroy_pending = False # 当该task完成时,意味着当前事件循环失去了调度对象,无法继续调度,所以需要关闭当前事件循环,程序会由协程模式返回到线程模式 future.add_done_callback(_run_until_complete_cb) try: # 事件循环开始运行 self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn&amp;#39;t have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError(&amp;#39;Event loop stopped before Future completed.&amp;#39;) return future.result() def run_forever(self): # 进行一些初始化工作 self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() # 通过asyncgen钩子来自动关闭asyncgen函数,这样可以提醒用户生成器还未关闭 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: # 设置当前在运行的事件循环到全局变量中,这样就可以在任一阶段获取到当前的事件循环了 events._set_running_loop(self) while True: # 正真执行任务的逻辑 self._run_once() if self._stopping: break finally: # 关闭循环, 并且清理一些资源 self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)</pre> としてマークされる このコードにより、イベント ループが常に実行され、ループは自動的に終了します。実際のスケジューリングの中心は _run_onceFunction、 です。そのソース コードは次のとおりです。 <pre class="brush:py;">class BaseEventLoop: ... def _run_once(self): # self._scheduled是一个列表,它只存放TimerHandle sched_count = len(self._scheduled) ############################### # 第一阶段,整理self._scheduled # ############################### if (sched_count &gt; _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count &gt; _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # 当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑 # 把需要取消的任务移除 new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # 设置handle的_cancelled为True,并且把handle从_scheduled中移除 handle._scheduled = False else: new_scheduled.append(handle) # 重新排列堆 heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # 需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆 while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False ################################# # 第二阶段,计算超时值以及等待事件IO # ################################# timeout = None # 当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度 if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. # 如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制 when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) # 事件循环等待事件,直到有事件或者超时 event_list = self._selector.select(timeout) ################################################## # 第三阶段,把满足条件的TimeHandle放入到self._ready中 # ################################################## # 获取得到的事件的回调,然后装填到_ready self._process_events(event_list) # 把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。 # end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件 end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when &gt;= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ################################################################################ # 第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback # ################################################################################ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # 如果handle已经被取消,则不调用 if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt &gt;= self.slow_callback_duration: # 执行太久的回调,记录下来,这些需要开发者自己优化 logger.warning(&amp;#39;Executing %s took %.3f seconds&amp;#39;, _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.</pre><p>通过源码分析,可以很明确的知道调度逻辑中第一步是先规整<code>self._scheduled,在规整的过程是使用堆排序来进行的,因为堆排序在调度的场景下效率是非常高的,不过这段规整代码分成两种,我猜测是当需要取消的数量过多时直接遍历的效率会更高。 在规整self._scheduled后,就进入第二步,该步骤开始等待系统事件循环返回对应的事件,如果self._ready中有数据,就不做等待了,需要马上到下一步骤,以便能赶紧安排调度。 在得到系统事件循环得到的事件后,就进入到了第三步,该步骤会通过self._process_events方法处理对应的事件,并把事件对应的回调存放到了self._ready中,最后再遍历self._ready中的所有Handle并逐一执行(执行时可以认为EventLoop把控制权返回给对应的调用逻辑),至此一个完整的调度逻辑就结束了,并进入下一个调度逻辑。

3.网络IO事件的处理

注:由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:github.com/python/asyn…

在分析EventLoop调度实现的时候忽略了self._process_events的具体实现逻辑,因为_process_events方法所在asyncio.base_event.py文件中的BaseEventLoop类并未有具体实现的,因为网络IO相关的需要系统的事件循环来帮忙处理,所以与系统事件循环相关的逻辑都在asyncio.selector_events.py中的BaseSelectorEventLoop类中。BaseSelectorEventLoop类封装了selector模块与系统事件循环交互,使调用者不需要去考虑sock的创建以及sock产生的文件描述符的监听与注销等操作,下面以BaseSelectorEventLoop中自带的pipe为例子,分析BaseSelectorEventLoop是如何进行网络IO事件处理的。

在分析之前,先看一个例子,代码如下:

import asyncio
import threading
def task():
    print("task")
def run_loop_inside_thread(loop):
    loop.run_forever()
loop = asyncio.get_event_loop()
threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
loop.call_soon(task)

如果直接运行这个例子,它并不会输出task(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_foreverEventLoop会一直卡在这段逻辑中:

event_list = self._selector.select(timeout)

所以调用loop.call_soon并不会使EventLoop马上安排调度,而如果把call_soon换成call_soon_threadsafe则可以正常输出,这是因为call_soon_threadsafe中多了一个self._write_to_self的调用,它的源码如下:

class BaseEventLoop:
    ...
    def call_soon_threadsafe(self, callback, *args, context=None):
        """Like call_soon(), but thread-safe."""
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        self._write_to_self()
        return handle

由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop类查看,接下来以pipe相关的网络IO操作来分析EventLoop是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),

对应的源码如下:

class BaseSelectorEventLoop(base_events.BaseEventLoop):
    #######
    # 创建 #
    #######
    def __init__(self, selector=None):
        super().__init__()

        if selector is None:
            # 获取最优的selector
            selector = selectors.DefaultSelector()
        self._selector = selector
        # 创建pipe
        self._make_self_pipe()
        self._transports = weakref.WeakValueDictionary()
    def _make_self_pipe(self):
        # 创建Pipe对应的sock 
        self._ssock, self._csock = socket.socketpair()
        # 设置sock为非阻塞
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        self._internal_fds += 1
        # 阻塞服务端sock读事件对应的回调
        self._add_reader(self._ssock.fileno(), self._read_from_self)
    def _add_reader(self, fd, callback, *args):
        # 检查事件循环是否关闭
        self._check_closed()
        # 封装回调为handle对象
        handle = events.Handle(callback, args, self, None)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            # 如果没有注册到系统的事件循环,则注册
            self._selector.register(fd, selectors.EVENT_READ,
                                    (handle, None))
        else:
            # 如果已经注册过,则更新
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_READ,
                                  (handle, writer))
            if reader is not None:
                reader.cancel()
        return handle

    def _read_from_self(self):
        # 负责消费sock数据
        while True:
            try:
                data = self._ssock.recv(4096)
                if not data:
                    break
                self._process_self_data(data)
            except InterruptedError:
                continue
            except BlockingIOError:
                break
    #######
    # 删除 #
    #######
    def _close_self_pipe(self):
        # 注销Pipe对应的描述符 
        self._remove_reader(self._ssock.fileno())
        # 关闭sock
        self._ssock.close()
        self._ssock = None
        self._csock.close()
        self._csock = None
        self._internal_fds -= 1

    def _remove_reader(self, fd):
        # 如果事件循环已经关闭了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查询文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在则返回
            return False
        else:
            # 存在则进入移除的工作
            mask, (reader, writer) = key.events, key.data
            # 通过事件掩码判断是否有其它事件
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已经注册到selector的文件描述符
                self._selector.unregister(fd)
            else:
                # 移除已经注册到selector的文件描述符,并注册新的事件
                self._selector.modify(fd, mask, (None, writer))

            # 如果reader不为空,则取消reader
            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

通过源码中的创建部分可以看到,EventLoop在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop的执行逻辑会阻塞在下面的调用中,等待事件响应:

event_list = self._selector.select(timeout)

这时如果执行loop.call_soon_threadsafe,那么会通过write_to_self写入一点信息:

    def _write_to_self(self):
        csock = self._csock
        if csock is None:
            return
        try:
            csock.send(b&#39;\0&#39;)
        except OSError:
            if self._debug:
                logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

由于csock被写入了数据,那么它对应的ssock就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop就会获得到对应的数据,并交给process_events方法进行处理,

它的相关代码如下:

class BaseSelectorEventLoop:
    def _process_events(self, event_list):
        for key, mask in event_list:
            # 从回调事件中获取到对应的数据,key.data在注册时是一个元祖,所以这里要对元祖进行解包
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & selectors.EVENT_READ and reader is not None:
                # 得到reader handle,如果是被标记为取消,就移除对应的文件描述符
                if reader._cancelled:
                    self._remove_reader(fileobj)
                else:
                    # 如果没被标记为取消,则安排到self._ready中
                    self._add_callback(reader)
            if mask & selectors.EVENT_WRITE and writer is not None:
                # 对于写对象,也是同样的道理。
                if writer._cancelled:
                    self._remove_writer(fileobj)
                else:
                    self._add_callback(writer)

    def _add_callback(self, handle):
        # 把回调的handle添加到_ready中
        assert isinstance(handle, events.Handle), &#39;A Handle is required here&#39;
        if handle._cancelled:
            return
        assert not isinstance(handle, events.TimerHandle)
        self._ready.append(handle)

    def _remove_reader(self, fd):
        # 如果事件循环已经关闭了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查询文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在则返回
            return False
        else:
            # 存在则进入移除的工作
            mask, (reader, writer) = key.events, key.data
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已经注册到selector的文件描述符
                self._selector.unregister(fd)
            else:
                self._selector.modify(fd, mask, (None, writer))

            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

从代码中可以看出_process_events会对事件对应的文件描述符进行处理,并从事件回调中获取到对应的Handle对象添加到self._ready中,由EventLoop在接下来遍历self._ready并执行。

可以看到网络IO事件的处理并不复杂,因为系统事件循环已经为我们做了很多工作了,但是用户所有与网络IO相关的操作都需要有一个类似的操作,这样是非常的繁琐的,幸好asyncio库已经为我们做了封装,我们只要调用就可以了,方便了很多。

以上がPython Asyncio のスケジューリング原理とは何ですかの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。