Maison  >  Article  >  développement back-end  >  Comment utiliser le module contextvars en python

Comment utiliser le module contextvars en python

王林
王林avant
2023-05-13 12:04:051339parcourir

    Preface

    Après Python3.7, le module contextvars est apparu dans la bibliothèque officielle, dont la fonction principale est d'ajouter des fonctions contextuelles à l'écologie multi-threading et asyncio. Même si le programme s'exécute simultanément avec plusieurs coroutines, les variables contextuelles du programme peuvent être appelées, découplant ainsi notre logique.contextvars模块, 它的主要功能就是可以为多线程以及asyncio生态添加上下文功能,即使程序在多个协程并发运行的情况下,也能调用到程序的上下文变量, 从而使我们的逻辑解耦.

    上下文,可以理解为我们说话的语境, 在聊天的过程中, 有些话脱离了特定的语境,他的意思就变了,程序的运行也是如此.在线程中也是有他的上下文,只不过称为堆栈,如在python中就是保存在thread.local变量中,而协程也有他自己的上下文,但是没有暴露出来,不过有了contextvars模块后我们可以通过contextvars模块去保存与读取.

    使用contextvars的好处不仅可以防止’一个变量传遍天’的事情发生外,还能很好的结合TypeHint,可以让自己的代码可以被mypy以及IDE检查,让自己的代码更加适应工程化.
    不过用了contextvars后会多了一些隐性的调用, 需要解决好这些隐性的成本.

    更新说明

    • 切换web框架sanicstarlette

    • 增加一个自己编写且可用于starlette,fastapi的context说明

    • 更新fast_tools.context的最新示例以及简单的修改行文。

    1.有无上下文传变量的区别

    如果有用过Flask框架, 就知道了Flask拥有自己的上下文功能, 而contextvars跟它很像, 而且还增加了对asyncio的上下文提供支持。
    Flask的上下文是基于threading.local实现的, threading.local的隔离效果很好,但是他是只针对线程的,只隔离线程之间的数据状态, 而werkzeug为了支持在gevent中运行,自己实现了一个Local变量, 常用的Flask上下文变量request的例子如下:

    from flask import Flask, request
    app = Flask(__name__)
    @app.route('/')
    def root():
        so1n_name = request.get('so1n_name')
        return f'Name is {so1n_name}'

    与之相比的是Python的另一个经典Web框架Djano, 它没有上下文的支持, 所以只能显示的传request对象, 例子如下:

    from django.http import HttpResponse
    def root(request):
        so1n_name = request.get('so1n_name')
        return HttpResponse(f'Name is {so1n_name}')

    通过上面两者的对比可以发现, 在Django中,我们需要显示的传一个叫request的变量,而Flask则是import一个叫request的全局变量,并在视图中直接使用,达到解耦的目的.

    可能会有人说, 也就是传个变量的区别,为了省传这个变量,而花许多功夫去维护一个上下文变量,有点不值得,那可以看看下面的例子,如果层次多就会出现’一个参数传一天’的情况(不过分层做的好或者需求不坑爹一般不会出现像下面的情况,一个好的程序员能做好代码的分层, 但可能也有出现一堆烂需求的时候)

    # 伪代码,举个例子一个request传了3个函数
    from django.http import HttpResponse
    def is_allow(request, uid):
        if request.ip == '127.0.0.1' and check_permissions(uid):
            return True
        else:
            return False
    def check_permissions(request, uid):
        pass
    
    def root(request):
        user_id = request.GET.get('uid')
        if is_allow(request, id):
        	return HttpResponse('ok')
        else
            return HttpResponse('error')

    此外, 除了防止一个参数传一天这个问题外, 通过上下文, 可以进行一些解耦, 比如有一个最经典的技术业务需求就是在日志打印request_id, 从而方便链路排查, 这时候如果有上下文模块, 就可以把读写request_id给解耦出来, 比如下面这个基于Flask框架读写request_id的例子:

    import logging
    from typing import Any
    from flask import g  # type: ignore
    from flask.logging import default_handler
    # 这是一个Python logging.Filter的对象, 日志在生成之前会经过Filter步骤, 这时候我们可以为他绑定request_id变量
    class RequestIDLogFilter(logging.Filter):
        """
        Log filter to inject the current request id of the request under `log_record.request_id`
        """
        def filter(self, record: Any) -> Any:
            record.request_id = g.request_id or None
            return record
    # 配置日志的format格式, 这里多配了一个request_id变量
    format_string: str = (
        "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
    )
    # 为flask的默认logger设置format和增加一个logging.Filter对象
    default_handler.setFormatter(logging.Formatter(format_string))
    default_handler.addFilter(RequestIDLogFilter())
    # 该方法用于设置request_id
    def set_request_id() -> None:
        g.request_id = request.headers.get("X-Request-Id", str(uuid4()))
    # 初始化FLask对象, 并设置before_request
    app: Flask = Flask("demo")
    app.before_request(set_request_id)

    2.如何使用contextvars模块

    这里举了一个例子, 但这个例子也有别的解决方案. 只不过通过这个例子顺便说如何使用contextvar模块

    首先看看未使用contextvars时,asyncio的web框架是如何传变量的,根据starlette的文档,在未使用contextvars时,传递Redis客户端实例的办法是通过request.stat这个变量保存Redis客户端的实例,改写代码如下:

    # demo/web_tools.py
    # 通过中间件把变量给存进去
    class RequestContextMiddleware(BaseHTTPMiddleware):
        async def dispatch(
                self, request: Request, call_next: RequestResponseEndpoint
        ) -> Response:
            request.stat.redis = REDIS_POOL
            response = await call_next(request)
            return response
    # demo/server.py
    # 调用变量
    @APP.route('/')
    async def homepage(request):
        # 伪代码,这里是执行redis命令
        await request.stat.redis.execute()
        return JSONResponse({'hello': 'world'})

    代码非常简便, 也可以正常的运行, 但你下次在重构时, 比如简单的把redis这个变量名改为new_redis, 那IDE不会识别出来, 需要一个一个改。 同时, 在写代码的时候, IDE永远不知道这个方法调用到的变量的类型是什么, IDE也无法智能的帮你检查(如输入request.stat.redis.时,IDE不会出现execute,或者出错时,IDE并不会提示). 这非常不利于项目的工程化, 而通过contextvarsTypeHints, 恰好能解决这个问题.

    说了那么多, 下面以一个Redis client为例子,展示如何在asyncio生态中使用contextvars, 并引入TypeHints# 🎜🎜 Le #Contexte peut être compris comme le contexte dans lequel nous parlons. Au cours du processus de discussion, certains mots ont des significations différentes lorsqu'ils sont séparés du contexte spécifique. Il en va de même pour le fonctionnement du programme. thread. , cela s'appelle simplement une pile. Par exemple, en Python, il est stocké dans la variable thread.local. La coroutine a également son propre contexte, mais elle n'est pas exposée avec les contextvars module, on peut passer le module <code>contextvars pour sauvegarder et lire

    #🎜🎜#L'avantage d'utiliser contextvars n'est pas seulement d'empêcher "une variable de se propager". partout dans le monde", mais cela peut également être très bien combiné avec TypeHint, afin que votre code puisse être vérifié par mypy et IDE, rendant votre code plus adapté à l'ingénierie.
    Mais après avoir utilisé contextvars, ce sera plus compliqué. Les appels cachés doivent résoudre ces coûts cachés #🎜🎜##🎜🎜#Update instructions#🎜🎜#
    • . #🎜🎜#Switch web framework sanic ajoute une version auto-écrite pour starlette#🎜🎜#
    • #🎜🎜# qui peut être utilisée dans Description du contexte de starlette,Fastapi#🎜🎜#
    • #🎜🎜# Mettez à jour le dernier exemple de fast_tools.context et modifiez simplement le texte. #🎜🎜#
    • #🎜🎜##🎜🎜#1. La différence entre passer des variables avec ou sans contexte #🎜🎜##🎜🎜#Si vous avez utilisé le framework Flask, vous saurez queFlask a sa propre fonction de contexte, et contextvars lui est très similaire et ajoute également la prise en charge du contexte asyncio. Le contexte de
      Flask est implémenté sur la base de threading.local. L'effet d'isolation de threading.local est très bon, mais il. est uniquement destiné aux threads qui isolent uniquement l'état des données entre les threads, et afin de prendre en charge l'exécution dans gevent, werkzeug a implémenté une variable Local, qui est couramment utilisé L'exemple de la variable contextuelle Flask request est le suivant : #🎜🎜#
      # demo/context.py
      # 该文件存放contextvars相关
      import contextvars
      if TYPE_CHECKING:
          from demo.redis_dal import RDS  # 这里是一个redis的封装实例
      # 初始化一个redis相关的全局context
      redis_pool_context = contextvars.ContextVar(&#39;redis_pool&#39;)
      # 通过函数调用可以获取到当前协程运行时的context上下文
      def get_redis() -> &#39;RDS&#39;:
          return redis_pool_context.get()
      # demo/web_tool.py
      # 该文件存放starlette相关模块
      from starlette.middleware.base import BaseHTTPMiddleware
      from starlette.requests import Request
      from starlette.middleware.base import RequestResponseEndpoint
      from starlette.responses import Response
      from demo.redis_dal import RDS
      # 初始化一个redis客户端变量,当前为空
      REDIS_POOL = None  # type: Optional[RDS]
      class RequestContextMiddleware(BaseHTTPMiddleware):
          async def dispatch(
                  self, request: Request, call_next: RequestResponseEndpoint
          ) -> Response:
              # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中
              token = redis_pool_context.set(REDIS_POOL)
              try:
              	response = await call_next(request)
                  return response
              finally:
              	# 调用完成,回收当前请求设置的redis客户端的上下文
                  redis_pool_context.reset(token)
      async def startup_event() -> None:
          global REDIS_POOL
          REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接
      async def shutdown_event() -> None:
          if REDIS_POOL:
              await REDIS_POOL.close() # 关闭redis客户端
      # demo/server.py
      # 该文件存放starlette main逻辑
      from starlette.applications import Starlette
      from starlette.responses import JSONResponse
      from demo.web_tool import RequestContextMiddleware
      from demo.context import get_redis
      APP = Starlette()
      APP.add_middleware(RequestContextMiddleware)
      @APP.route(&#39;/&#39;)
      async def homepage(request):
          # 伪代码,这里是执行redis命令
          # 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态
          await get_redis().execute()
          return JSONResponse({&#39;hello&#39;: &#39;world&#39;})
      #🎜🎜#Par rapport à elle, c'est un autre classique de Python Le framework Web Djano n'a pas de support de contexte, il ne peut donc transmettre que l'objet request explicitement. L'exemple est le suivant : #🎜🎜#
      import asyncio
      import uuid
      from contextvars import Context, copy_context
      from functools import partial
      from typing import Optional, Set
      import httpx
      from fastapi import FastAPI, Request, Response
      from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
      app: FastAPI = FastAPI()
      check_set: Set[int] = set()
      class ContextModel(ContextBaseModel):
          """
      	通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可.
      	属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints)
          """
          # 用于把自己的实例(如上文所说的redis客户端)存放于contextvars中
          http_client: httpx.AsyncClient
          # HeaderHepler用于把header的变量存放于contextvars中
          request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
          ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
          user_agent: str = HeaderHelper.i("User-Agent")
      
          async def before_request(self, request: Request) -> None:
              # 请求之前的钩子, 通过该钩子可以设置自己的变量
              self.http_client = httpx.AsyncClient()
              check_set.add(id(self.http_client))
      
          async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
              # 准备退出中间件的钩子, 这步奏后会清掉上下文
              await self.http_client.aclose()
      context_model: ContextModel = ContextModel()
      app.add_middleware(ContextMiddleware, context_model=context_model)
      async def test_ensure_future() -> None:
          assert id(context_model.http_client) in check_set
      def test_run_in_executor() -> None:
          assert id(context_model.http_client) in check_set
      def test_call_soon() -> None:
          assert id(context_model.http_client) in check_set
      @app.get("/")
      async def root() -> dict:
          # 在使用asyncio.ensure_future开启另外一个子协程跑任务时, 也可以复用上下文
          asyncio.ensure_future(test_ensure_future())
          loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
          # 使用call_soon也能复用上下文
          loop.call_soon(test_call_soon)
          # 使用run_in_executor也能复用上下文, 但必须使用上下文的run方法, copy_context表示复制当前的上下文
          ctx: Context = copy_context()
          await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
          return {
              "message": context_model.to_dict(is_safe_return=True),  # not return CustomQuery
              "client_id": id(context_model.http_client),
          }
      if __name__ == "__main__":
          import uvicorn  # type: ignore
          uvicorn.run(app)
      #🎜 🎜#Grâce à la comparaison entre les deux ci-dessus, nous pouvons constater que dans Django, nous devons passer explicitement une variable appelée request, tandis que Flask importe une variable globale appelée request Variables et utilisée directement dans la vue pour atteindre l'objectif de découplage.#🎜🎜##🎜🎜#Certaines personnes peuvent dire que la différence est que pour éviter de transmettre cette variable, il faut beaucoup d'efforts. est dépensé pour maintenir un contexte. Les variables sont un peu indignes, vous pouvez donc jeter un œil à l'exemple suivant, s'il y a plusieurs niveaux, la situation "un paramètre est passé pour un jour" se produira (mais si la superposition est effectuée. eh bien, ou la demande n'est pas trompeuse, la situation suivante ne se produira généralement pas. Un bon programmeur peut faire du bon travail en superposant du code, mais il peut y avoir des moments où un tas de mauvaises exigences apparaissent)#🎜🎜#
      class ContextMeta(type(collections.abc.Mapping)):
          # contextvars.Context is not subclassable.
          def __new__(mcls, names, bases, dct):
              cls = super().__new__(mcls, names, bases, dct)
              if cls.__module__ != &#39;contextvars&#39; or cls.__name__ != &#39;Context&#39;:
                  raise TypeError("type &#39;Context&#39; is not an acceptable base type")
              return cls
      #🎜🎜# De plus, en plus d'éviter le problème du passage d'un paramètre pendant un jour De plus, grâce au contexte, un certain découplage peut être effectué. Par exemple, l'une des exigences techniques commerciales les plus classiques est d'imprimer request_id. dans le journal pour faciliter le dépannage du lien. À ce stade, s'il existe un module de contexte, la lecture et l'écriture de request_id peuvent être découplées. Par exemple, l'exemple suivant de lecture et d'écriture de request_id basé sur le Flask framework : #🎜🎜#<pre class="brush:py;">class Token(metaclass=TokenMeta): MISSING = object() def __init__(self, context, var, old_value): # 分别存放上下文变量, 当前set的数据以及上次set的数据 self._context = context self._var = var self._old_value = old_value self._used = False @property def var(self): return self._var @property def old_value(self): return self._old_value def __repr__(self): r = &amp;#39;&lt;Token &amp;#39; if self._used: r += &amp;#39; used&amp;#39; r += &amp;#39; var={!r} at {:0x}&gt;&amp;#39;.format(self._var, id(self)) return r</pre>#🎜🎜#2. Comment utiliser le module contextvars #🎜🎜#<blockquote>#🎜🎜# Voici un exemple , mais cet exemple a aussi d'autres solutions Mais au fait, à travers. cet exemple, je vais vous expliquer comment utiliser le module contextvar#🎜🎜#</blockquote>#🎜🎜#Tout d'abord, jetons un coup d'œil à asyncio lorsque <code>contextvars n'est pas utilisé Comment fonctionne le framework web passer des variables ? Selon la documentation de starlette, lorsque contextvars n'est pas utilisé, la façon de transmettre l'instance client Redis est via la requête . La variable stat enregistre l'instance du client Redis. Réécrivez le code comme suit : #🎜🎜#
      def copy_context():
          return _get_context().copy()
      def _get_context():
          ctx = getattr(_state, &#39;context&#39;, None)
          if ctx is None:
              ctx = Context()
              _state.context = ctx
          return ctx
      def _set_context(ctx):
          _state.context = ctx
      _state = threading.local()
      #🎜🎜#Le code est très simple et peut s'exécuter normalement, mais la prochaine fois que vous le refactoriserez, par exemple. Changez simplement le nom de la variable redis en new_redis. L'EDI ne le reconnaîtra pas et vous devrez le modifier un par un. Dans le même temps, lors de l'écriture du code, l'IDE ne connaîtra jamais le type de la variable appelée par cette méthode, et l'IDE ne pourra pas le vérifier intelligemment pour vous (par exemple, lorsque vous entrez request.stat.redis., l'EDI le fera). ne s'exécute pas ou une erreur apparaîtra, l'EDI ne vous le demandera pas). Ceci est très préjudiciable à l'ingénierie du projet. Cependant, contextvars et TypeHints peuvent résoudre ce problème. . #🎜🎜##🎜 🎜#Cela dit, prenons un client Redis comme exemple pour montrer comment utiliser les contextvars dans l'écosystème asyncio et introduisons TypeHints</code.> (détails Voir le code pour explication).#🎜🎜#
      # demo/context.py
      # 该文件存放contextvars相关
      import contextvars
      if TYPE_CHECKING:
          from demo.redis_dal import RDS  # 这里是一个redis的封装实例
      # 初始化一个redis相关的全局context
      redis_pool_context = contextvars.ContextVar(&#39;redis_pool&#39;)
      # 通过函数调用可以获取到当前协程运行时的context上下文
      def get_redis() -> &#39;RDS&#39;:
          return redis_pool_context.get()
      # demo/web_tool.py
      # 该文件存放starlette相关模块
      from starlette.middleware.base import BaseHTTPMiddleware
      from starlette.requests import Request
      from starlette.middleware.base import RequestResponseEndpoint
      from starlette.responses import Response
      from demo.redis_dal import RDS
      # 初始化一个redis客户端变量,当前为空
      REDIS_POOL = None  # type: Optional[RDS]
      class RequestContextMiddleware(BaseHTTPMiddleware):
          async def dispatch(
                  self, request: Request, call_next: RequestResponseEndpoint
          ) -> Response:
              # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中
              token = redis_pool_context.set(REDIS_POOL)
              try:
              	response = await call_next(request)
                  return response
              finally:
              	# 调用完成,回收当前请求设置的redis客户端的上下文
                  redis_pool_context.reset(token)
      async def startup_event() -> None:
          global REDIS_POOL
          REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接
      async def shutdown_event() -> None:
          if REDIS_POOL:
              await REDIS_POOL.close() # 关闭redis客户端
      # demo/server.py
      # 该文件存放starlette main逻辑
      from starlette.applications import Starlette
      from starlette.responses import JSONResponse
      from demo.web_tool import RequestContextMiddleware
      from demo.context import get_redis
      APP = Starlette()
      APP.add_middleware(RequestContextMiddleware)
      @APP.route(&#39;/&#39;)
      async def homepage(request):
          # 伪代码,这里是执行redis命令
          # 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态
          await get_redis().execute()
          return JSONResponse({&#39;hello&#39;: &#39;world&#39;})

      3.如何优雅的使用contextvars

      从上面的示例代码来看, 使用contextvarTypeHint确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.

      自己在使用了contextvars一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapistarlette), 它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader负责托管Headers相关的参数, 调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.

      以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:

      import asyncio
      import uuid
      from contextvars import Context, copy_context
      from functools import partial
      from typing import Optional, Set
      import httpx
      from fastapi import FastAPI, Request, Response
      from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
      app: FastAPI = FastAPI()
      check_set: Set[int] = set()
      class ContextModel(ContextBaseModel):
          """
      	通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可.
      	属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints)
          """
          # 用于把自己的实例(如上文所说的redis客户端)存放于contextvars中
          http_client: httpx.AsyncClient
          # HeaderHepler用于把header的变量存放于contextvars中
          request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
          ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
          user_agent: str = HeaderHelper.i("User-Agent")
      
          async def before_request(self, request: Request) -> None:
              # 请求之前的钩子, 通过该钩子可以设置自己的变量
              self.http_client = httpx.AsyncClient()
              check_set.add(id(self.http_client))
      
          async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
              # 准备退出中间件的钩子, 这步奏后会清掉上下文
              await self.http_client.aclose()
      context_model: ContextModel = ContextModel()
      app.add_middleware(ContextMiddleware, context_model=context_model)
      async def test_ensure_future() -> None:
          assert id(context_model.http_client) in check_set
      def test_run_in_executor() -> None:
          assert id(context_model.http_client) in check_set
      def test_call_soon() -> None:
          assert id(context_model.http_client) in check_set
      @app.get("/")
      async def root() -> dict:
          # 在使用asyncio.ensure_future开启另外一个子协程跑任务时, 也可以复用上下文
          asyncio.ensure_future(test_ensure_future())
          loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
          # 使用call_soon也能复用上下文
          loop.call_soon(test_call_soon)
          # 使用run_in_executor也能复用上下文, 但必须使用上下文的run方法, copy_context表示复制当前的上下文
          ctx: Context = copy_context()
          await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
          return {
              "message": context_model.to_dict(is_safe_return=True),  # not return CustomQuery
              "client_id": id(context_model.http_client),
          }
      if __name__ == "__main__":
          import uvicorn  # type: ignore
          uvicorn.run(app)

      可以从例子中看到, 通过封装的上下文调用会变得非常愉快, 只要通过一两步方法就能设置好自己的上下文属性, 同时不用考虑如何编写上下文的生命周期. 另外也能通过这个例子看出, 在asyncio生态中, contextvars能运用到包括子协程, 多线程等所有的场景中.

      4.contextvars的原理

      在第一次使用时,我就很好奇contextvars是如何去维护程序的上下文的,好在contextvars的作者出了一个向下兼容的contextvars库,虽然他不支持asyncio,但我们还是可以通过代码了解到他的基本原理.

      4.1 ContextMeta,ContextVarMeta和TokenMeta

      代码仓中有ContextMeta,ContextVarMetaTokenMeta这几个对象, 它们的功能都是防止用户来继承Context,ContextVarToken,原理都是通过元类来判断类名是否是自己编写类的名称,如果不是则抛错.

      class ContextMeta(type(collections.abc.Mapping)):
          # contextvars.Context is not subclassable.
          def __new__(mcls, names, bases, dct):
              cls = super().__new__(mcls, names, bases, dct)
              if cls.__module__ != &#39;contextvars&#39; or cls.__name__ != &#39;Context&#39;:
                  raise TypeError("type &#39;Context&#39; is not an acceptable base type")
              return cls

      4.2 Token

      上下文的本质是一个堆栈, 每次set一次对象就向堆栈增加一层数据, 每次reset就是pop掉最上层的数据, 而在Contextvars中, 通过Token对象来维护堆栈之间的交互.

      class Token(metaclass=TokenMeta):
          MISSING = object()
          def __init__(self, context, var, old_value):
              # 分别存放上下文变量, 当前set的数据以及上次set的数据
              self._context = context
              self._var = var
              self._old_value = old_value
              self._used = False
          @property
          def var(self):
              return self._var
          @property
          def old_value(self):
              return self._old_value
          def __repr__(self):
              r = &#39;<Token &#39;
              if self._used:
                  r += &#39; used&#39;
              r += &#39; var={!r} at {:0x}>&#39;.format(self._var, id(self))
              return r

      可以看到Token的代码很少, 它只保存当前的context变量, 本次调用set的数据和上一次被set的旧数据. 用户只有在调用contextvar.context后才能得到Token, 返回的Token可以被用户在调用context后, 通过调用context.reset(token)来清空保存的上下文,方便本次context的变量能及时的被回收, 回到上上次的数据.

      4.3 全局唯一context

      前面说过, Python中由threading.local()负责每个线程的context, 协程属于线程的’子集’,所以contextvar直接基于threading.local()生成自己的全局context. 从他的源代码可以看到, _state就是threading.local()的引用, 并通过设置和读取_statecontext属性来写入和读取当前的上下文, copy_context调用也很简单, 同样也是调用到threading.local()API.

      def copy_context():
          return _get_context().copy()
      def _get_context():
          ctx = getattr(_state, &#39;context&#39;, None)
          if ctx is None:
              ctx = Context()
              _state.context = ctx
          return ctx
      def _set_context(ctx):
          _state.context = ctx
      _state = threading.local()

      关于threading.local(),虽然不是本文重点,但由于contextvars是基于threading.local()进行封装的,所以还是要明白threading.local()的原理,这里并不直接通过源码分析, 而是做一个简单的示例解释.

      在一个线程里面使用线程的局部变量会比直接使用全局变量的性能好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁, 性能会变得很差, 比如下面全局变量的例子:

      pet_dict = {}
      def get_pet(pet_name):
          return pet_dict[pet_name]
      def set_pet(pet_name):
          return pet_dict[pet_name]

      这份代码就是模仿一个简单的全局变量调用, 如果是多线程调用的话, 那就需要加锁啦, 每次在读写之前都要等到持有锁的线程放弃了锁后再去竞争, 而且还可能污染到了别的线程存放的数据.

      而线程的局部变量则是让每个线程有一个自己的pet_dict, 假设每个线程调用get_pet,set_pet时,都会把自己的pid传入进来, 那么就可以避免多个线程去同时竞争资源, 同时也不会污染到别的线程的数据, 那么代码可以改为这样子:

      pet_dict = {}
      def get_pet(pet_name, pid):
          return pet_dict[pid][pet_name]
      def set_pet(pet_name, pid):
          return pet_dict[pid][pet_name]

      不过这样子使用起来非常方便, 同时示例例子没有对异常检查和初始化等处理, 如果值比较复杂, 我们还要维护异常状况, 这样太麻烦了.

      这时候threading.local()就应运而生了,他负责帮我们处理这些维护的工作,我们只要对他进行一些调用即可,调用起来跟单线程调用一样简单方便, 应用threading.local()后的代码如下:

      import threading
      thread_local=threading.local()
      def get_pet(pet_name):
          return thread_local[pet_name]
      def set_pet(pet_name):
          return thread_local[pet_name]

      可以看到代码就像调用全局变量一样, 但是又不会产生竞争状态。

      4.4contextvar自己封装的Context

      contextvars自己封装的Context比较简单, 这里只展示他的两个核心方法(其他的魔术方法就像dict的魔术方法一样):

      class Context(collections.abc.Mapping, metaclass=ContextMeta):
          def __init__(self):
              self._data = immutables.Map()
              self._prev_context = None
          def run(self, callable, *args, **kwargs):
              if self._prev_context is not None:
                  raise RuntimeError(
                      &#39;cannot enter context: {} is already entered&#39;.format(self))
              self._prev_context = _get_context()
              try:
                  _set_context(self)
                  return callable(*args, **kwargs)
              finally:
                  _set_context(self._prev_context)
                  self._prev_context = None
          def copy(self):
              new = Context()
              new._data = self._data
              return new

      首先, 在__init__方法可以看到self._data,这里使用到了一个叫immutables.Map()的不可变对象,并对immutables.Map()进行一些封装,所以context可以看成一个不可变的dict。这样可以防止调用copy方法后得到的上下文的变动会影响到了原本的上下文变量。

      查看immutables.Map()的示例代码可以看到,每次对原对象的修改时,原对象并不会发生改变,并会返回一个已经发生改变的新对象.

      map2 = map.set(&#39;a&#39;, 10)
      print(map, map2)
      # will print:
      #   <immutables.Map({&#39;a&#39;: 1, &#39;b&#39;: 2})>
      #   <immutables.Map({&#39;a&#39;: 10, &#39;b&#39;: 2})>
      map3 = map2.delete(&#39;b&#39;)
      print(map, map2, map3)
      # will print:
      #   <immutables.Map({&#39;a&#39;: 1, &#39;b&#39;: 2})>
      #   <immutables.Map({&#39;a&#39;: 10, &#39;b&#39;: 2})>
      #   <immutables.Map({&#39;a&#39;: 10})>

      此外,context还有一个叫run的方法, 上面在执行loop.run_in_executor时就用过run方法, 目的就是可以产生一个新的上下文变量给另外一个线程使用, 同时这个新的上下文变量跟原来的上下文变量是一致的.
      执行run的时候,可以看出会copy一个新的上下文来调用传入的函数, 由于immutables.Map的存在, 函数中对上下文的修改并不会影响旧的上下文变量, 达到进程复制数据时的写时复制的目的. 在run方法的最后, 函数执行完了会再次set旧的上下文, 从而完成一次上下文切换.

      def run(self, callable, *args, **kwargs):
          # 已经存在旧的context,抛出异常,防止多线程循环调用
          if self._prev_context is not None:
              raise RuntimeError(
                  &#39;cannot enter context: {} is already entered&#39;.format(self))
          self._prev_context = _get_context()  # 保存当前的context
          try:
              _set_context(self) # 设置新的context
              return callable(*args, **kwargs)  # 执行函数
          finally:
              _set_context(self._prev_context)  # 设置为旧的context
              self._prev_context = None

      4.5 ContextVar

      我们一般在使用contextvars模块时,经常使用的就是ContextVar这个类了,这个类很简单,主要提供了set–设置值,get–获取值,reset–重置值三个方法, 从Context类中写入和获取值, 而set和reset的就是通过上面的token类进行交互的.

      set – 为当前上下文设置变量

      def set(self, value):
          ctx = _get_context()  # 获取当前上下文对象`Context`
          data = ctx._data
          try:
              old_value = data[self]  # 获取Context旧对象
          except KeyError:
              old_value = Token.MISSING  # 获取不到则填充一个object(全局唯一)
          updated_data = data.set(self, value) # 设置新的值
          ctx._data = updated_data
          return Token(ctx, self, old_value) # 返回带有旧值的token

      get – 从当前上下文获取变量

      def get(self, default=_NO_DEFAULT):
          ctx = _get_context()  # 获取当前上下文对象`Context`
          try:
              return ctx[self]  # 返回获取的值
          except KeyError:
              pass
          if default is not _NO_DEFAULT:
              return default    # 返回调用get时设置的值
          if self._default is not _NO_DEFAULT:
              return self._default  # 返回初始化context时设置的默认值
          raise LookupError  # 都没有则会抛错

      reset – 清理本次用到的上下文数据

      def reset(self, token):
             if token._used:
             	# 判断token是否已经被使用
                 raise RuntimeError("Token has already been used once")
             if token._var is not self:
             	# 判断token是否是当前contextvar返回的
                 raise ValueError(
                     "Token was created by a different ContextVar")
             if token._context is not _get_context():
             	# 判断token的上下文是否跟contextvar上下文一致
                 raise ValueError(
                     "Token was created in a different Context")
             ctx = token._context
             if token._old_value is Token.MISSING:
             	# 如果没有旧值则删除该值
                 ctx._data = ctx._data.delete(token._var)
             else:
             	# 有旧值则当前contextvar变为旧值
                 ctx._data = ctx._data.set(token._var, token._old_value)
             token._used = True  # 设置flag,标记token已经被使用了

      则此,contextvar的原理了解完了,接下来再看看他是如何在asyncio运行的.

      5.contextvars asyncio

      由于向下兼容的contextvars并不支持asyncio, 所以这里通过aiotask-context的源码简要的了解如何在asyncio中如何获取和设置context。

      5.1在asyncio中获取context

      相比起contextvars复杂的概念,在asyncio中,我们可以很简单的获取到当前协程的task, 然后通过task就可以很方便的获取到task的context了,由于Pyhon3.7对asyncio的高级API 重新设计,所以可以看到需要对获取当前task进行封装

      PY37 = sys.version_info >= (3, 7)
      if PY37:
          def asyncio_current_task(loop=None):
              """Return the current task or None."""
              try:
                  return asyncio.current_task(loop)
              except RuntimeError:
                  # simulate old behaviour
                  return None
      else:
          asyncio_current_task = asyncio.Task.current_task

      不同的版本有不同的获取task方法, 之后我们就可以通过调用asyncio_current_task().context即可获取到当前的上下文了…

      5.2 对上下文的操作

      同样的,在得到上下文后, 我们这里也需要set, get, reset的操作,不过十分简单, 类似dict一样的操作即可, 它没有token的逻辑:

      set

      def set(key, value):
          """
          Sets the given value inside Task.context[key]. If the key does not exist it creates it.
          :param key: identifier for accessing the context dict.
          :param value: value to store inside context[key].
          :raises
          """
          current_task = asyncio_current_task()
          if not current_task:
              raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
      
          current_task.context[key] = value

      get

      def get(key, default=None):
          """
          Retrieves the value stored in key from the Task.context dict. If key does not exist,
          or there is no event loop running, default will be returned
          :param key: identifier for accessing the context dict.
          :param default: None by default, returned in case key is not found.
          :return: Value stored inside the dict[key].
          """
          current_task = asyncio_current_task()
          if not current_task:
              raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
          return current_task.context.get(key, default)

      clear – 也就是contextvar.ContextVars中的reset

      def clear():
          """
          Clear the Task.context.
          :raises ValueError: if no current task.
          """
          current_task = asyncio_current_task()
          if not current_task:
              raise ValueError("No event loop found")
          current_task.context.clear()

      5.2 copying_task_factory和chainmap_task_factory

      在Python的更高级版本中,已经支持设置context了,所以这两个方法可以不再使用了.他们最后都用到了task_factory的方法.
      task_factory简单说就是创建一个新的task,再通过工厂方法合成context,最后把context设置到task

      def task_factory(loop, coro, copy_context=False, context_factory=None):
          """
          By default returns a task factory that uses a simple dict as the task context,
          but allows context creation and inheritance to be customized via ``context_factory``.
          """
          # 生成context工厂函数
          context_factory = context_factory or partial(
              dict_context_factory, copy_context=copy_context)
          # 创建task, 跟asyncio.ensure_future一样
          task = asyncio.tasks.Task(coro, loop=loop)
          if task._source_traceback:
              del [-1]
      
          # 获取task的context
          try:
              context = asyncio_current_task(loop=loop).context
          except AttributeError:
              context = None
          # 从context工厂中处理context并赋值在task
          task.context = context_factory(context)
          return task

      aiotask-context提供了两个对context处理的函数dict_context_factorychainmap_context_factory.在aiotask-context中,context是一个dict对象,dict_context_factory可以选择赋值或者设置新的context

      def dict_context_factory(parent_context=None, copy_context=False):
          """A traditional ``dict`` context to keep things simple"""
          if parent_context is None:
              # initial context
              return {}
          else:
              # inherit context
              new_context = parent_context
              if copy_context:
                  new_context = deepcopy(new_context)
              return new_context

      chainmap_context_factorydict_context_factory的区别就是在合并context而不是直接继承.同时借用ChainMap保证合并context后,还能同步context的改变

      def chainmap_context_factory(parent_context=None):
          """
          A ``ChainMap`` context, to avoid copying any data
          and yet preserve strict one-way inheritance
          (just like with dict copying)
          """
          if parent_context is None:
              # initial context
              return ChainMap()
          else:
              # inherit context
              if not isinstance(parent_context, ChainMap):
                  # if a dict context was previously used, then convert
                  # (without modifying the original dict)
                  parent_context = ChainMap(parent_context)
              return parent_context.new_child()

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer