Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Apakah prinsip penjadualan Python Asyncio

Apakah prinsip penjadualan Python Asyncio

王林
王林ke hadapan
2023-05-20 14:31:121122semak imbas

1. Pengenalan asas

Python.Asyncio ialah perpustakaan yang besar dan komprehensif yang merangkumi banyak fungsi Selain tiga objek yang boleh ditunggu, logik yang berkaitan dengan penjadualan teras juga mempunyai fungsi lain tiga fail: runners.py, base_event.py dan event.py masing-masing. Fail

runners.py mempunyai satu kelas utama - Runner Tanggungjawab utamanya ialah melengkapkan gelung acara untuk memasuki mod coroutine, tunggu permulaan dan bersihkan baki memori apabila keluar dari coroutine. mod Coroutine, penjana dan objek lain.

Mod coroutine hanya untuk kemudahan pemahaman Untuk komputer, tiada perbezaan seperti itu

event.py kecuali untuk menyimpan antara muka <.> objek dan Selain fungsi mendapatkan dan menetapkan EventLoop, terdapat juga dua EventLoop objek berjadual, iaitu EventLoop dan Handler Ia boleh dianggap sebagai bekas untuk TimerHandler untuk memanggil objek lain untuk sambungan dijadualkan. Hubungan antara objek dan gelung peristiwa, tetapi pelaksanaannya sangat mudah Untuk EvnetLoop, Handler kod sumbernya adalah seperti berikut:

# 已经移除了一些不想关的代码
class Handle:
    def __init__(self, callback, args, loop, context=None):
        # 初始化上下文,确保执行的时候能找到Handle所在的上下文
        if context is None:
            context = contextvars.copy_context()
        self._context = context
        self._loop = loop
        self._callback = callback
        self._args = args
        self._cancelled = False

    def cancel(self):
        # 设置当前Handle为取消状态
        if not self._cancelled:
            self._cancelled = True
            self._callback = None
            self._args = None
    def cancelled(self):
        return self._cancelled
    def _run(self):
        # 用于执行真正的函数,且通过context.run方法来确保在自己的上下文内执行。
        try:
            # 保持在自己持有的上下文中执行对应的回调
            self._context.run(self._callback, *self._args)
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            cb = format_helpers._format_callback_source(
                self._callback, self._args)
            msg = f&#39;Exception in callback {cb}&#39;
            context = {
                &#39;message&#39;: msg,
                &#39;exception&#39;: exc,
                &#39;handle&#39;: self,
            }
            self._loop.call_exception_handler(context)

Ia boleh didapati. melalui kod sumber bahawa fungsi

sangat mudah dan menyediakan Ia mempunyai fungsi yang boleh dibatalkan dan dilaksanakan dalam konteksnya sendiri, dan Handle mewarisi daripada TimerHandle dan mempunyai beberapa lagi parameter yang berkaitan dengan masa dan pengisihan daripada Handle. Kod sumber adalah seperti berikut: Handle

class TimerHandle(Handle):
    def __init__(self, when, callback, args, loop, context=None):
        super().__init__(callback, args, loop, context)
        self._when = when
        self._scheduled = False
    def __hash__(self):
        return hash(self._when)
    def __lt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when
        return NotImplemented
    def __le__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when or self.__eq__(other)
        return NotImplemented
    def __gt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when
        return NotImplemented
    def __ge__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when or self.__eq__(other)
        return NotImplemented
    def __eq__(self, other):
        if isinstance(other, TimerHandle):
            return (self._when == other._when and
                    self._callback == other._callback and
                    self._args == other._args and
                    self._cancelled == other._cancelled)
        return NotImplemented
    def cancel(self):
        if not self._cancelled:
            # 用于通知事件循环当前Handle已经退出了
            self._loop._timer_handle_cancelled(self)
        super().cancel()
    def when(self):
        return self._when

Melalui kod tersebut, kita dapati bahawa kedua-dua objek ini sangat mudah apabila kita menggunakan

, sebaliknya kita tidak akan menggunakan kedua-dua objek ini. kami menggunakan siri Python.Asyncio kaedah untuk merangkum panggilan ke dalam objek loop.call_xxx, dan kemudian Menunggu Handle untuk dilaksanakan. Oleh itu, siri kaedah EventLoop boleh dianggap sebagai operasi pendaftaran loop.call_xxx Pada asasnya, semua operasi tak segerak IO perlu mendaftarkan panggilan mereka ke dalam EventLoop melalui kaedah loop.call_xxx > objek berada dalam Selepas permulaan, daftar ke EventLoop dengan memanggil kaedah Task Pelaksanaan loop.call_soon adalah sangat mudah Kod sumber EventLooploop.call_sonn

adalah seperti berikut:

.

class BaseEventLoop:
    ...
    def call_soon(self, callback, *args, context=None):
        # 检查是否事件循环是否关闭,如果是则直接抛出异常
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        return handle

   def _call_soon(self, callback, args, context):
        # 把调用封装成一个handle,这样方便被事件循环调用
        handle = events.Handle(callback, args, self, context)
        # 添加一个handle到_ready,等待被调用
        self._ready.append(handle)
        return handle
boleh dilihat Terdapat hanya beberapa baris kod yang benar-benar berkaitan dengan

Ia bertanggungjawab untuk merangkum panggilan ke dalam

dan menambahkannya pada call_soon, dengan itu mendaftarkan. panggil ke dalam gelung acara. Sebagai tambahan kepada siri fungsi Handle, terdapat dua kaedah lain - self._reday dan

Ia serupa dengan

, tetapi mempunyai satu lagi parameter masa Beritahu loop.call_xxx selepas pukul berapa dipanggil. loop.call_soonloop.call_at

class BaseEventLoop:
    ...
    def call_later(self, delay, callback, *args, context=None):
        if delay is None:
            raise TypeError(&#39;delay must not be None&#39;)
        timer = self.call_at(self.time() + delay, callback, *args, context=context)
        return timer

    def call_at(self, when, callback, *args, context=None):
        if when is None:
            raise TypeError("when cannot be None")
        self._check_closed()
        # 创建一个timer handle,然后添加到事件循环的_scheduled中,等待被调用
        timer = events.TimerHandle(when, callback, args, self, context)
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer
loop.call_later2 Pelaksanaan penjadualan EventLoop loop.call_soonEventLoop telah diterangkan dalam artikel "Hubungan dan peranan Coroutines, Tasks, dan objek menunggu masa hadapan dalam Python Asyncio" Ia dianalisis yang loop.call_at akan memanggil loop.call_laterTugas melalui Python untuk memulakan penjadualan headpq, jadi apabila menganalisis penjadualan self._scheduled, anda harus bermula dengan

dahulu,

Kod sumber yang sepadan adalah seperti berikut:

class BaseEventLoop:
    def run_until_complete(self, future):
        ...
        new_task = not futures.isfuture(future)
        # 把coroutine转换成task,这样事件循环就可以调度了,事件循环的最小调度单位为task
        # 需要注意的是此时事件循环并没注册到全局变量中,所以需要显示的传进去,
        # 同时Task对象注册的时候,已经通过loop.call_soon把自己注册到事件循环中,等待调度
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn&#39;t complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
        # 当该task完成时,意味着当前事件循环失去了调度对象,无法继续调度,所以需要关闭当前事件循环,程序会由协程模式返回到线程模式
        future.add_done_callback(_run_until_complete_cb)
        try:
            # 事件循环开始运行
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn&#39;t have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError(&#39;Event loop stopped before Future completed.&#39;)

        return future.result()

    def run_forever(self):
        # 进行一些初始化工作
        self._check_closed()
        self._check_running()
        self._set_coroutine_origin_tracking(self._debug)
        self._thread_id = threading.get_ident()

        old_agen_hooks = sys.get_asyncgen_hooks()
        # 通过asyncgen钩子来自动关闭asyncgen函数,这样可以提醒用户生成器还未关闭
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                               finalizer=self._asyncgen_finalizer_hook)
        try:
            # 设置当前在运行的事件循环到全局变量中,这样就可以在任一阶段获取到当前的事件循环了
            events._set_running_loop(self)
            while True:
                # 正真执行任务的逻辑
                self._run_once()
                if self._stopping:
                    break
        finally:
            # 关闭循环, 并且清理一些资源
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)

Kod sumber ini tidak rumit adalah untuk menukar

menjadi objek runner, dan kemudian memanggilnya apabila objek dimulakan. Kaedah loop.run_until_complete mendaftarkan dirinya ke dalam main, dan akhirnya berjalan melalui kod gelung dalam EventLoop sehingga EventLoop ditandakan sebagai loop.run_until_complete:

while True:
    # 正真执行任务的逻辑
    self._run_once()
    if self._stopping:
        break

dilihat bahawa sekeping kod ini memastikan gelung acara boleh dilaksanakan sepanjang masa dan tamat secara automatik Inti penjadualan sebenar ialah fungsi ,

berikut: CorotinueTask

class BaseEventLoop:
    ...
    def _run_once(self):
        # self._scheduled是一个列表,它只存放TimerHandle
        sched_count = len(self._scheduled)
        ###############################
        # 第一阶段,整理self._scheduled #
        ###############################
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # 当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑
            # 把需要取消的任务移除
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    # 设置handle的_cancelled为True,并且把handle从_scheduled中移除
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            # 重新排列堆
            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # 需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        #################################
        # 第二阶段,计算超时值以及等待事件IO #
        #################################
        timeout = None
        # 当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            # 如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        # 事件循环等待事件,直到有事件或者超时
        event_list = self._selector.select(timeout)

        ##################################################
        # 第三阶段,把满足条件的TimeHandle放入到self._ready中 #
        ##################################################
        # 获取得到的事件的回调,然后装填到_ready
        self._process_events(event_list)

        # 把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。
        # end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        ################################################################################
        # 第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback #
        ################################################################################
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            # 如果handle已经被取消,则不调用
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        # 执行太久的回调,记录下来,这些需要开发者自己优化
                        logger.warning(&#39;Executing %s took %.3f seconds&#39;,
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

通过源码分析,可以很明确的知道调度逻辑中第一步是先规整self._scheduled,在规整的过程是使用堆排序来进行的,因为堆排序在调度的场景下效率是非常高的,不过这段规整代码分成两种,我猜测是当需要取消的数量过多时直接遍历的效率会更高。 在规整self._scheduled后,就进入第二步,该步骤开始等待系统事件循环返回对应的事件,如果self._ready中有数据,就不做等待了,需要马上到下一步骤,以便能赶紧安排调度。 在得到系统事件循环得到的事件后,就进入到了第三步,该步骤会通过self._process_events方法处理对应的事件,并把事件对应的回调存放到了self._ready中,最后再遍历self._ready中的所有Handle并逐一执行(执行时可以认为EventLoop把控制权返回给对应的调用逻辑),至此一个完整的调度逻辑就结束了,并进入下一个调度逻辑。

3.网络IO事件的处理

注:由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:github.com/python/asyn…

在分析EventLoop调度实现的时候忽略了self._process_events的具体实现逻辑,因为_process_events方法所在asyncio.base_event.py文件中的BaseEventLoop类并未有具体实现的,因为网络IO相关的需要系统的事件循环来帮忙处理,所以与系统事件循环相关的逻辑都在asyncio.selector_events.py中的BaseSelectorEventLoop类中。BaseSelectorEventLoop类封装了selector模块与系统事件循环交互,使调用者不需要去考虑sock的创建以及sock产生的文件描述符的监听与注销等操作,下面以BaseSelectorEventLoop中自带的pipe为例子,分析BaseSelectorEventLoop是如何进行网络IO事件处理的。

在分析之前,先看一个例子,代码如下:

import asyncio
import threading
def task():
    print("task")
def run_loop_inside_thread(loop):
    loop.run_forever()
loop = asyncio.get_event_loop()
threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
loop.call_soon(task)

如果直接运行这个例子,它并不会输出task(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_foreverEventLoop会一直卡在这段逻辑中:

event_list = self._selector.select(timeout)

所以调用loop.call_soon并不会使EventLoop马上安排调度,而如果把call_soon换成call_soon_threadsafe则可以正常输出,这是因为call_soon_threadsafe中多了一个self._write_to_self的调用,它的源码如下:

class BaseEventLoop:
    ...
    def call_soon_threadsafe(self, callback, *args, context=None):
        """Like call_soon(), but thread-safe."""
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        self._write_to_self()
        return handle

由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop类查看,接下来以pipe相关的网络IO操作来分析EventLoop是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),

对应的源码如下:

class BaseSelectorEventLoop(base_events.BaseEventLoop):
    #######
    # 创建 #
    #######
    def __init__(self, selector=None):
        super().__init__()

        if selector is None:
            # 获取最优的selector
            selector = selectors.DefaultSelector()
        self._selector = selector
        # 创建pipe
        self._make_self_pipe()
        self._transports = weakref.WeakValueDictionary()
    def _make_self_pipe(self):
        # 创建Pipe对应的sock 
        self._ssock, self._csock = socket.socketpair()
        # 设置sock为非阻塞
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        self._internal_fds += 1
        # 阻塞服务端sock读事件对应的回调
        self._add_reader(self._ssock.fileno(), self._read_from_self)
    def _add_reader(self, fd, callback, *args):
        # 检查事件循环是否关闭
        self._check_closed()
        # 封装回调为handle对象
        handle = events.Handle(callback, args, self, None)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            # 如果没有注册到系统的事件循环,则注册
            self._selector.register(fd, selectors.EVENT_READ,
                                    (handle, None))
        else:
            # 如果已经注册过,则更新
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_READ,
                                  (handle, writer))
            if reader is not None:
                reader.cancel()
        return handle

    def _read_from_self(self):
        # 负责消费sock数据
        while True:
            try:
                data = self._ssock.recv(4096)
                if not data:
                    break
                self._process_self_data(data)
            except InterruptedError:
                continue
            except BlockingIOError:
                break
    #######
    # 删除 #
    #######
    def _close_self_pipe(self):
        # 注销Pipe对应的描述符 
        self._remove_reader(self._ssock.fileno())
        # 关闭sock
        self._ssock.close()
        self._ssock = None
        self._csock.close()
        self._csock = None
        self._internal_fds -= 1

    def _remove_reader(self, fd):
        # 如果事件循环已经关闭了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查询文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在则返回
            return False
        else:
            # 存在则进入移除的工作
            mask, (reader, writer) = key.events, key.data
            # 通过事件掩码判断是否有其它事件
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已经注册到selector的文件描述符
                self._selector.unregister(fd)
            else:
                # 移除已经注册到selector的文件描述符,并注册新的事件
                self._selector.modify(fd, mask, (None, writer))

            # 如果reader不为空,则取消reader
            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

通过源码中的创建部分可以看到,EventLoop在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop的执行逻辑会阻塞在下面的调用中,等待事件响应:

event_list = self._selector.select(timeout)

这时如果执行loop.call_soon_threadsafe,那么会通过write_to_self写入一点信息:

    def _write_to_self(self):
        csock = self._csock
        if csock is None:
            return
        try:
            csock.send(b&#39;\0&#39;)
        except OSError:
            if self._debug:
                logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

由于csock被写入了数据,那么它对应的ssock就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop就会获得到对应的数据,并交给process_events方法进行处理,

它的相关代码如下:

class BaseSelectorEventLoop:
    def _process_events(self, event_list):
        for key, mask in event_list:
            # 从回调事件中获取到对应的数据,key.data在注册时是一个元祖,所以这里要对元祖进行解包
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & selectors.EVENT_READ and reader is not None:
                # 得到reader handle,如果是被标记为取消,就移除对应的文件描述符
                if reader._cancelled:
                    self._remove_reader(fileobj)
                else:
                    # 如果没被标记为取消,则安排到self._ready中
                    self._add_callback(reader)
            if mask & selectors.EVENT_WRITE and writer is not None:
                # 对于写对象,也是同样的道理。
                if writer._cancelled:
                    self._remove_writer(fileobj)
                else:
                    self._add_callback(writer)

    def _add_callback(self, handle):
        # 把回调的handle添加到_ready中
        assert isinstance(handle, events.Handle), &#39;A Handle is required here&#39;
        if handle._cancelled:
            return
        assert not isinstance(handle, events.TimerHandle)
        self._ready.append(handle)

    def _remove_reader(self, fd):
        # 如果事件循环已经关闭了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查询文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在则返回
            return False
        else:
            # 存在则进入移除的工作
            mask, (reader, writer) = key.events, key.data
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已经注册到selector的文件描述符
                self._selector.unregister(fd)
            else:
                self._selector.modify(fd, mask, (None, writer))

            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

从代码中可以看出_process_events会对事件对应的文件描述符进行处理,并从事件回调中获取到对应的Handle对象添加到self._ready中,由EventLoop在接下来遍历self._ready并执行。

可以看到网络IO事件的处理并不复杂,因为系统事件循环已经为我们做了很多工作了,但是用户所有与网络IO相关的操作都需要有一个类似的操作,这样是非常的繁琐的,幸好asyncio库已经为我们做了封装,我们只要调用就可以了,方便了很多。

Atas ialah kandungan terperinci Apakah prinsip penjadualan Python Asyncio. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam