Python Core Concepts
asyncio Coroutine and Event Loop
示例程序
代码:
| import asyncio
import time
from typing import Any
# 1. Asynchronous Task (Coroutine)
async def coroutine_worker(name: str, delay: float) -> str:
"""
An asynchronous coroutine that performs I/O blocking.
"""
for i in range(4):
print(f"[{time.strftime('%M:%S')}] {name}: Waiting for {delay} seconds...")
# When await, voluntarily yield control to the Event Loop
await asyncio.sleep(delay)
print(f"[{time.strftime('%M:%S')}] {name}: Completed\tIteration: {i+1}")
return "Completed all iterations"
# 2. Synchronous Callback Function
count = 1
def periodic_callback(loop: asyncio.AbstractEventLoop) -> None:
"""
A synchronous function scheduled by the Event Loop
using call_later to simulate a timer callback.
"""
# Use the module-level counter variable
global count
print(f"[{time.strftime('%M:%S')}] Timer callback: {count}...\n")
if count < 5:
# Reschedule itself after 2 seconds
loop.call_later(2, periodic_callback, loop)
else:
# Stop the Event Loop
loop.stop()
print(f"[{time.strftime('%M:%S')}] Event Loop stop request has been issued.")
count += 1
# 3. Manual Event Loop Management
def main():
print("\nasyncio coroutine Event Loop Management Learning\n")
# 1. Initialization
# 1). Get the Event Loop object (AbstractEventLoop Instance)
try:
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
print("Existing Event Loop found and acquired:")
except RuntimeError:
# If there is no running Event Loop, create one
loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
print("New Event Loop created and set as current:")
# Set the newly created loop as the current Event Loop
# Necessary so that asyncio.get_event_loop() can returns this loop
asyncio.set_event_loop(loop)
print(f"Event Loop Type:\t{type(loop)}")
print(f"Event Loop ID:\t\t{id(loop)}\n")
# 2. Scheduling Tasks and Callbacks
# 2). Schedule asynchronous tasks
# i.e.: Wrap coroutines into Tasks and put them into the ready Queue
task1: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-1", 3.0))
task2: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-2", 1.0))
# 3). Schedule synchronous callbacks
# loop.call_later(delay, callback, *args)
# Run periodic_callback after 2 seconds
loop.call_later(2.0, periodic_callback, loop)
# 3. Running the Event Loop
print(f"[{time.strftime('%M:%S')}] Starting Event Loop...\n")
# 4). Run the Event Loop until loop.stop() is called
loop.run_forever()
# 4. Post-Loop Cleanup and Result Retrieval
# 5). Perform cleanup after the loop stops
print(f"[{time.strftime('%M:%S')}] Event Loop has stopped\n")
# 6). Retrieve task results
def get_task_result(task: asyncio.Task[Any]):
# Only call result() if the task is done
if task.done():
return task.result()
else:
# If the task is not done, return a placeholder instead of calling result()
return "<Pending and No Result>"
print(
f"{task1.get_name()}: {'Done' if task1.done() else 'Pending'}, Result: {get_task_result(task1)}"
)
print(
f"{task2.get_name()}: {'Done' if task2.done() else 'Pending'}, Result: {get_task_result(task2)}"
)
if __name__ == "__main__":
main()
|
运行结果:
| (venv) $ python3 asyncioEventLoop_v1.py
asyncio coroutine Event Loop Management Learning
Existing Event Loop found and acquired:
Event Loop Type: <class 'asyncio.unix_events._UnixSelectorEventLoop'>
Event Loop ID: 139907526240464
[09:27] Starting Event Loop...
[09:27] Task-1: Waiting for 3.0 seconds...
[09:27] Task-2: Waiting for 1.0 seconds...
[09:28] Task-2: Completed Iteration: 1
[09:28] Task-2: Waiting for 1.0 seconds...
[09:29] Timer callback: 1...
[09:29] Task-2: Completed Iteration: 2
[09:29] Task-2: Waiting for 1.0 seconds...
[09:30] Task-1: Completed Iteration: 1
[09:30] Task-1: Waiting for 3.0 seconds...
[09:30] Task-2: Completed Iteration: 3
[09:30] Task-2: Waiting for 1.0 seconds...
[09:31] Timer callback: 2...
[09:31] Task-2: Completed Iteration: 4
[09:33] Timer callback: 3...
[09:33] Task-1: Completed Iteration: 2
[09:33] Task-1: Waiting for 3.0 seconds...
[09:35] Timer callback: 4...
[09:36] Task-1: Completed Iteration: 3
[09:36] Task-1: Waiting for 3.0 seconds...
[09:37] Timer callback: 5...
[09:37] Event Loop stop request has been issued.
[09:37] Event Loop has stopped
Task-1: Pending, Result: <Pending and No Result>
Task-2: Done, Result: Completed all iterations
(venv) $
|
Python 标准库中的一个模块,它提供了用于管理、存储和访问上下文相关状态 (context-local state) 的 API。它主要用于并发 (concurrency) 和 异步 (asynchronous) 编程中,例如在 asyncio 框架内。
上下文变量 (Context Variables)
ContextVar 类: 这是用于声明上下文变量的类。每个 ContextVar 实例代表一个独立的状态变量。
上下文 (Context): 在 contextvars 中,上下文是指一个执行单元, 比如:
- 一个 异步任务 (async task)
- 一个 线程
- 一个 协程链 (chain of coroutines)
的运行时的一组状态。
隔离性: contextvars 的关键作用在于它能确保在不同的并发执行路径(例如不同的 async task 或不同的 Context 对象)中,一个上下文变量的值是独立且隔离的。
在一个任务中设置一个上下文变量的值,不会影响到另一个同时运行的任务中该变量的值。
这与线程局部存储 thread-local storage 类似,但 contextvars 更适用于异步代码,因为多个协程/任务可以在同一个线程上运行。
异步任务中的状态隔离示例程序
contextvars vs. threading.local() 代码:
| import asyncio
import contextvars
import threading
# 1. Contextvars variable used for async task isolation
request_id_ctx: contextvars.ContextVar[int] = contextvars.ContextVar(
"request_id_ctx", default=0
)
# 2. threading.local() variable
# - only isolates threads, not async tasks
# - In single-threaded asyncio, this behaves like a normal global variable
request_id_local = threading.local()
request_id_local.value = 0
async def process_data(task_name: str, local_id: int):
"""
Simulate an asynchronous function that processes data.
"""
# Task sets its own threading.local() value
request_id_local.value = local_id
# First get and print
ctx_initial = request_id_ctx.get()
local_initial = request_id_local.value
print(f"[{task_name}] Context ID: {ctx_initial} | Local ID: {local_initial}")
# Simulate I/O operation, during which control switches to another Task
await asyncio.sleep(1)
# After I/O resumes, get and check again
ctx_current = request_id_ctx.get()
local_current = request_id_local.value
# Check the result of ContextVar
ctx_result = (
"contextvars Isolation successful"
if ctx_current == ctx_initial
else "contextvars Isolation failed"
)
# Check the result of threading.local()
# Because Task B will overwrite this value when Task A is paused
local_result = (
"threading.local Isolation successful"
if local_current == local_initial
else "threading.local Isolation failed (overwritten)"
)
print(
f"\n[{task_name}] Resume:\n \
\tContext ID:\t{ctx_current} ({ctx_result})\n \
\tLocal ID:\t{local_current} ({local_result})"
)
return ctx_current
async def main():
# Task A: simulate request A, Request ID is 100
async def task_a():
# Set value for contextvars, get Token
token = request_id_ctx.set(100)
try:
# Pass 100 to process_data as local.value
return await process_data("Task A", 100)
finally:
request_id_ctx.reset(token) # 2. No matter what, reset the value
# Task B: simulate request B, Request ID is 200
async def task_b():
token = request_id_ctx.set(200) # 1. Set value, get Token
try:
# Pass 200 to process_data as local.value
return await process_data("Task B", 200)
finally:
request_id_ctx.reset(token) # 2. No matter what, reset the value
print("\nStart concurrent async tasks (running on the same thread)")
await asyncio.gather(task_a(), task_b())
if __name__ == "__main__":
asyncio.run(main())
|
运行结果:
| (venv) $ python3 ctx_vars.py
Start concurrent async tasks (running on the same thread)
[Task A] Context ID: 100 | Local ID: 100
[Task B] Context ID: 200 | Local ID: 200
[Task A] Resume:
Context ID: 100 (contextvars Isolation successful)
Local ID: 200 (threading.local Isolation failed (overwritten))
[Task B] Resume:
Context ID: 200 (contextvars Isolation successful)
Local ID: 200 (threading.local Isolation successful)
(venv) $
|
运行结果与对比分析
| Task |
步骤 |
ContextVar |
threading.local |
结果 |
| A |
🟢 开始 (设置 100) |
100 |
100 |
都正确 |
| B |
🟢 开始 (设置 200) |
200 |
200 |
都正确 |
| A |
🟡 恢复 (检查) |
100 |
200 |
A 的 Local ID 被 B 干扰 |
| B |
🟡 恢复 (检查) |
200 |
200 |
正确 |
为什么需要 contextvars?
在 Python 的异步编程中,传统的 threading.local() 有局限性,因为它只隔离线程,而一个线程可以运行多个异步任务。
contextvars 的优势:
-
异步兼容性 (Async Compatibility): 它是专门为异步代码设计的。当一个异步任务(Task)被创建、挂起或恢复时,它的上下文(包括所有上下文变量的值)也会随之保存和恢复。
-
状态隔离 (State Isolation): 它防止在并发代码(尤其是异步框架中的不同任务)中使用状态时,状态意外地泄漏或干扰到其他任务。例如,在一个 Web 服务器中,你可以用它来存储当前请求的用户 ID 或事务 ID,确保每个请求都有自己的独立 ID,即使它们在同一个线程上运行。
-
Context 复制: copy_context() 函数允许你获取当前上下文的一个快照(Context 对象),并可以在另一个地方(例如,一个新的 Task 或线程)运行代码时重新激活这个上下文。