最近我对一个用 async/await 语法去做异步编程的项目做贡献。 经过一段时间不停地码代码,测试及修复,我总结了如何在异步编程中正确地使用 Contextvars 的经验。
What Is contextvars
contextvars 来源于 PEP-567 ,它提供了 APIs 允许开发者去保存,访问及管理 local context。 它与 thread-local storage (TLS) 类似。但它还支持维护异步代码的 context.
import asyncio
from contextvars import ContextVar, Context
var_what = ContextVar("what")
async def hello():
try:
return f"hello {var_what.get()}"
except LookupError:
return "hello?"
# 以下代码通过 `python -m asyncio` 去执行
assert await hello() == "hello?"
task_1 = asyncio.create_task(hello())
coroutine_1 = hello()
var_what.set("world")
task_2 = asyncio.create_task(hello())
coroutine_2 = hello()
assert await task_1 == "hello?"
assert await task_2 == "hello world"
assert await coroutine_1 == "hello world"
assert await coroutine_2 == "hello world"
这个例子使用预先创造好的 ContextVar
对象去保存及访问当前 context 里的 what 变量。
这例子还暴露了一个问题,就是 coroutine
对象无法处理异步代码的 context 。
只有 asyncio.Task
在创建的时候会保存当前的 context,及自动的应用该 context 。
查看 contextvars 文档以了解更多信息及用法。
最佳示例
如果你要创建一个 context,首先要做的是创建一些资源,保存在 context 中,然后在该 context 下执行异步代码,最后清理掉 context 中不需要的资源 。
最好的方式是使用 contextlib.asynccontextmanager
函数去做装饰器。
- 能够用 async-with 语法显得更符合 Python 哲学
- 资源初始化及清理代码处于同个函数内,更具有可读性
在实际情况下,我们在 web 应用中需要保存请求相关的数据,比如安全密钥及请求参数;需要共享全局资源,比如数据库连接池。所以我码了个如何在异步编程中使用 contextvars 的例子。
from contextlib import asynccontextmanager
from contextvars import copy_context, ContextVar
var_redis_client = ContextVar("redis_client")
@asynccontextmanager
async def create_app_context(settings):
# 资源初始化过程
redis_client = object() # FIXME: 创建 Redis 客户端实例
var_redis_client.set(redis_client)
try:
# 生产当前的 context
yield copy_context()
finally:
# 资源清理过程
# FIXME: 清理 Redis 客户端实例
pass
# 以下代码通过 `python -m asyncio` 去执行
settings = {"redis_uri": "redis://..."}
async with create_app_context(settings) as app_ctx:
assert var_redis_client in copy_context()
# 用两种方式去获取被保存在当前 context 的 Redis 客户端
assert var_redis_client.get() is app_ctx[var_redis_client]
测试代码
使用 pytest fixtures 去创建 app context。这样能让测试样例很容易地访问使用
@pytest.mark.asyncio
装饰器来自 pytest-asyncio 第三方库.
import pytest
@pytest.fixture
def settings():
return {"redis_uri": "redis://..."}
@pytest.fixture
async def app_ctx(settings):
async with create_app_context(settings) as ctx:
# 使用 yield fixture 去初始化,产生 context 方便测试样例访问
yield ctx
@pytest.fixture
def redis_client(app_ctx):
return app_ctx[var_redis_client]
@pytest.mark.asyncio
async def test_without_context(app_ctx, redis_client):
assert var_redis_client not in copy_context()
# 虽然当前 context 没有 Redis 客户端实例
# 但仍然可以通过 app_ctx 去访问 Redis 客户端实例
assert redis_client is app_ctx[var_redis_client]
def apply_context(ctx):
"""
更新当前的 context
"""
for var in ctx:
var.set(ctx[var])
@pytest.mark.asyncio
async def test_within_context(app_ctx, redis_client):
# 必须更新当前的 context,这样才能在测试函数体内访问到 Redis 客户端
apply_context(app_ctx)
assert var_redis_client in copy_context()
assert redis_client is app_ctx[var_redis_client]
因为来自异步 yield_fixture 的 context 无法传递给其他 fixtures 或测试函数,所以我们需要显式地传递 app context。我还写了个简单的函数 apply_context
去把传递进来的 context 应用到当前的 context 里。如果 fixtures 或者测试函数需要 context 的话,就需要先调用这个函数。
如果测试函数被标记成 @pytest.mark.asyncio
的话,最好能隐式地把它的异步 yield_fixtures 里的 context,传递给其他 fixtures 及测试函数。所以我向 pytest-asyncio 提了个 PR ,如果能被合并就好了。