Python Core Concepts
asyncio Coroutine and Event Loop
代码:
| import asyncio
import time
from typing import Any
# 1. Asynchronous Task (Coroutine)
async def coroutine_worker(name: str, delay: float) -> str:
"""
An asynchronous coroutine that performs I/O blocking.
"""
for i in range(4):
print(f"[{time.strftime('%M:%S')}] {name}: Waiting for {delay} seconds...")
# When await, voluntarily yield control to the Event Loop
await asyncio.sleep(delay)
print(f"[{time.strftime('%M:%S')}] {name}: Completed\tIteration: {i+1}")
return "Completed all iterations"
# 2. Synchronous Callback Function
count = 1
def periodic_callback(loop: asyncio.AbstractEventLoop) -> None:
"""
A synchronous function scheduled by the Event Loop
using call_later to simulate a timer callback.
"""
# Use the module-level counter variable
global count
print(f"[{time.strftime('%M:%S')}] Timer callback: {count}...\n")
if count < 5:
# Reschedule itself after 2 seconds
loop.call_later(2, periodic_callback, loop)
else:
# Stop the Event Loop
loop.stop()
print(f"[{time.strftime('%M:%S')}] Event Loop stop request has been issued.")
count += 1
# 3. Manual Event Loop Management
def main():
print("\nasyncio coroutine Event Loop Management Learning\n")
# 1. Initialization
# 1). Get the Event Loop object (AbstractEventLoop Instance)
try:
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
print("Existing Event Loop found and acquired:")
except RuntimeError:
# If there is no running Event Loop, create one
loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
print("New Event Loop created and set as current:")
# Set the newly created loop as the current Event Loop
# Necessary so that asyncio.get_event_loop() can returns this loop
asyncio.set_event_loop(loop)
print(f"Event Loop Type:\t{type(loop)}")
print(f"Event Loop ID:\t\t{id(loop)}\n")
# 2. Scheduling Tasks and Callbacks
# 2). Schedule asynchronous tasks
# i.e.: Wrap coroutines into Tasks and put them into the ready Queue
task1: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-1", 3.0))
task2: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-2", 1.0))
# 3). Schedule synchronous callbacks
# loop.call_later(delay, callback, *args)
# Run periodic_callback after 2 seconds
loop.call_later(2.0, periodic_callback, loop)
# 3. Running the Event Loop
print(f"[{time.strftime('%M:%S')}] Starting Event Loop...\n")
# 4). Run the Event Loop until loop.stop() is called
loop.run_forever()
# 4. Post-Loop Cleanup and Result Retrieval
# 5). Perform cleanup after the loop stops
print(f"[{time.strftime('%M:%S')}] Event Loop has stopped\n")
# 6). Retrieve task results
def get_task_result(task: asyncio.Task[Any]):
# Only call result() if the task is done
if task.done():
return task.result()
else:
# If the task is not done, return a placeholder instead of calling result()
return "<Pending and No Result>"
print(
f"{task1.get_name()}: {'Done' if task1.done() else 'Pending'}, Result: {get_task_result(task1)}"
)
print(
f"{task2.get_name()}: {'Done' if task2.done() else 'Pending'}, Result: {get_task_result(task2)}"
)
if __name__ == "__main__":
main()
|
运行结果:
| (venv) $ python3 asyncioEventLoop_v1.py
asyncio coroutine Event Loop Management Learning
Existing Event Loop found and acquired:
Event Loop Type: <class 'asyncio.unix_events._UnixSelectorEventLoop'>
Event Loop ID: 139907526240464
[09:27] Starting Event Loop...
[09:27] Task-1: Waiting for 3.0 seconds...
[09:27] Task-2: Waiting for 1.0 seconds...
[09:28] Task-2: Completed Iteration: 1
[09:28] Task-2: Waiting for 1.0 seconds...
[09:29] Timer callback: 1...
[09:29] Task-2: Completed Iteration: 2
[09:29] Task-2: Waiting for 1.0 seconds...
[09:30] Task-1: Completed Iteration: 1
[09:30] Task-1: Waiting for 3.0 seconds...
[09:30] Task-2: Completed Iteration: 3
[09:30] Task-2: Waiting for 1.0 seconds...
[09:31] Timer callback: 2...
[09:31] Task-2: Completed Iteration: 4
[09:33] Timer callback: 3...
[09:33] Task-1: Completed Iteration: 2
[09:33] Task-1: Waiting for 3.0 seconds...
[09:35] Timer callback: 4...
[09:36] Task-1: Completed Iteration: 3
[09:36] Task-1: Waiting for 3.0 seconds...
[09:37] Timer callback: 5...
[09:37] Event Loop stop request has been issued.
[09:37] Event Loop has stopped
Task-1: Pending, Result: <Pending and No Result>
Task-2: Done, Result: Completed all iterations
(venv) $
|
Python 标准库中的一个模块,它提供了用于管理、存储和访问上下文相关状态 (context-local state) 的 API。它主要用于并发 (concurrency) 和 异步 (asynchronous) 编程中,例如在 asyncio 框架内。
代码:
| import asyncio
import contextvars
import threading
# 1. Contextvars variable used for async task isolation
request_id_ctx: contextvars.ContextVar[int] = contextvars.ContextVar(
"request_id_ctx", default=0
)
# 2. threading.local() variable
# - only isolates threads, not async tasks
# - In single-threaded asyncio, this behaves like a normal global variable
request_id_local = threading.local()
request_id_local.value = 0
async def process_data(task_name: str, local_id: int):
"""
Simulate an asynchronous function that processes data.
"""
# Task sets its own threading.local() value
request_id_local.value = local_id
# First get and print
ctx_initial = request_id_ctx.get()
local_initial = request_id_local.value
print(f"[{task_name}] Context ID: {ctx_initial} | Local ID: {local_initial}")
# Simulate I/O operation, during which control switches to another Task
await asyncio.sleep(1)
# After I/O resumes, get and check again
ctx_current = request_id_ctx.get()
local_current = request_id_local.value
# Check the result of ContextVar
ctx_result = (
"contextvars Isolation successful"
if ctx_current == ctx_initial
else "contextvars Isolation failed"
)
# Check the result of threading.local()
# Because Task B will overwrite this value when Task A is paused
local_result = (
"threading.local Isolation successful"
if local_current == local_initial
else "threading.local Isolation failed (overwritten)"
)
print(
f"\n[{task_name}] Resume:\n \
\tContext ID:\t{ctx_current} ({ctx_result})\n \
\tLocal ID:\t{local_current} ({local_result})"
)
return ctx_current
async def main():
# Task A: simulate request A, Request ID is 100
async def task_a():
# Set value for contextvars, get Token
token = request_id_ctx.set(100)
try:
# Pass 100 to process_data as local.value
return await process_data("Task A", 100)
finally:
request_id_ctx.reset(token) # 2. No matter what, reset the value
# Task B: simulate request B, Request ID is 200
async def task_b():
token = request_id_ctx.set(200) # 1. Set value, get Token
try:
# Pass 200 to process_data as local.value
return await process_data("Task B", 200)
finally:
request_id_ctx.reset(token) # 2. No matter what, reset the value
print("\nStart concurrent async tasks (running on the same thread)")
await asyncio.gather(task_a(), task_b())
if __name__ == "__main__":
asyncio.run(main())
|
运行结果:
| (venv) $ python3 ctx_vars.py
Start concurrent async tasks (running on the same thread)
[Task A] Context ID: 100 | Local ID: 100
[Task B] Context ID: 200 | Local ID: 200
[Task A] Resume:
Context ID: 100 (contextvars Isolation successful)
Local ID: 200 (threading.local Isolation failed (overwritten))
[Task B] Resume:
Context ID: 200 (contextvars Isolation successful)
Local ID: 200 (threading.local Isolation successful)
(venv) $
|