跳转至

Python Core Concepts

asyncio Coroutine and Event Loop

代码:

import asyncio
import time
from typing import Any


# 1. Asynchronous Task (Coroutine)
async def coroutine_worker(name: str, delay: float) -> str:
    """
    An asynchronous coroutine that performs I/O blocking.
    """
    for i in range(4):
        print(f"[{time.strftime('%M:%S')}] {name}: Waiting for {delay} seconds...")

        # When await, voluntarily yield control to the Event Loop
        await asyncio.sleep(delay)

        print(f"[{time.strftime('%M:%S')}] {name}: Completed\tIteration: {i+1}")

    return "Completed all iterations"


# 2. Synchronous Callback Function
count = 1


def periodic_callback(loop: asyncio.AbstractEventLoop) -> None:
    """
    A synchronous function scheduled by the Event Loop
    using call_later to simulate a timer callback.
    """
    # Use the module-level counter variable
    global count

    print(f"[{time.strftime('%M:%S')}] Timer callback: {count}...\n")

    if count < 5:
        # Reschedule itself after 2 seconds
        loop.call_later(2, periodic_callback, loop)
    else:
        # Stop the Event Loop
        loop.stop()
        print(f"[{time.strftime('%M:%S')}] Event Loop stop request has been issued.")

    count += 1


# 3. Manual Event Loop Management
def main():
    print("\nasyncio coroutine Event Loop Management Learning\n")

    # 1. Initialization
    # 1). Get the Event Loop object (AbstractEventLoop Instance)
    try:
        loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
        print("Existing Event Loop found and acquired:")
    except RuntimeError:
        # If there is no running Event Loop, create one
        loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
        print("New Event Loop created and set as current:")

        # Set the newly created loop as the current Event Loop
        # Necessary so that asyncio.get_event_loop() can returns this loop
        asyncio.set_event_loop(loop)

    print(f"Event Loop Type:\t{type(loop)}")
    print(f"Event Loop ID:\t\t{id(loop)}\n")

    # 2. Scheduling Tasks and Callbacks
    # 2). Schedule asynchronous tasks
    # i.e.: Wrap coroutines into Tasks and put them into the ready Queue
    task1: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-1", 3.0))
    task2: asyncio.Task[Any] = loop.create_task(coroutine_worker("Task-2", 1.0))

    # 3). Schedule synchronous callbacks
    # loop.call_later(delay, callback, *args)
    # Run periodic_callback after 2 seconds
    loop.call_later(2.0, periodic_callback, loop)

    # 3. Running the Event Loop
    print(f"[{time.strftime('%M:%S')}] Starting Event Loop...\n")

    # 4). Run the Event Loop until loop.stop() is called
    loop.run_forever()

    # 4. Post-Loop Cleanup and Result Retrieval
    # 5). Perform cleanup after the loop stops
    print(f"[{time.strftime('%M:%S')}] Event Loop has stopped\n")

    # 6). Retrieve task results
    def get_task_result(task: asyncio.Task[Any]):
        # Only call result() if the task is done
        if task.done():
            return task.result()
        else:
            # If the task is not done, return a placeholder instead of calling result()
            return "<Pending and No Result>"

    print(
        f"{task1.get_name()}: {'Done' if task1.done() else 'Pending'}, Result: {get_task_result(task1)}"
    )
    print(
        f"{task2.get_name()}: {'Done' if task2.done() else 'Pending'}, Result: {get_task_result(task2)}"
    )


if __name__ == "__main__":
    main()

运行结果:

(venv) $ python3 asyncioEventLoop_v1.py 

asyncio coroutine Event Loop Management Learning

Existing Event Loop found and acquired:
Event Loop Type:        <class 'asyncio.unix_events._UnixSelectorEventLoop'>
Event Loop ID:          139907526240464

[09:27] Starting Event Loop...

[09:27] Task-1: Waiting for 3.0 seconds...
[09:27] Task-2: Waiting for 1.0 seconds...
[09:28] Task-2: Completed       Iteration: 1
[09:28] Task-2: Waiting for 1.0 seconds...
[09:29] Timer callback: 1...

[09:29] Task-2: Completed       Iteration: 2
[09:29] Task-2: Waiting for 1.0 seconds...
[09:30] Task-1: Completed       Iteration: 1
[09:30] Task-1: Waiting for 3.0 seconds...
[09:30] Task-2: Completed       Iteration: 3
[09:30] Task-2: Waiting for 1.0 seconds...
[09:31] Timer callback: 2...

[09:31] Task-2: Completed       Iteration: 4
[09:33] Timer callback: 3...

[09:33] Task-1: Completed       Iteration: 2
[09:33] Task-1: Waiting for 3.0 seconds...
[09:35] Timer callback: 4...

[09:36] Task-1: Completed       Iteration: 3
[09:36] Task-1: Waiting for 3.0 seconds...
[09:37] Timer callback: 5...

[09:37] Event Loop stop request has been issued.
[09:37] Event Loop has stopped

Task-1: Pending, Result: <Pending and No Result>
Task-2: Done, Result: Completed all iterations
(venv) $ 

contextvars

Python 标准库中的一个模块,它提供了用于管理、存储和访问上下文相关状态 (context-local state) 的 API。它主要用于并发 (concurrency) 和 异步 (asynchronous) 编程中,例如在 asyncio 框架内。

代码:

import asyncio
import contextvars
import threading

# 1. Contextvars variable used for async task isolation
request_id_ctx: contextvars.ContextVar[int] = contextvars.ContextVar(
    "request_id_ctx", default=0
)

# 2. threading.local() variable
#   - only isolates threads, not async tasks
#   - In single-threaded asyncio, this behaves like a normal global variable
request_id_local = threading.local()
request_id_local.value = 0


async def process_data(task_name: str, local_id: int):
    """
    Simulate an asynchronous function that processes data.
    """
    # Task sets its own threading.local() value
    request_id_local.value = local_id

    # First get and print
    ctx_initial = request_id_ctx.get()
    local_initial = request_id_local.value

    print(f"[{task_name}] Context ID: {ctx_initial} | Local ID: {local_initial}")

    # Simulate I/O operation, during which control switches to another Task
    await asyncio.sleep(1)

    # After I/O resumes, get and check again
    ctx_current = request_id_ctx.get()
    local_current = request_id_local.value

    # Check the result of ContextVar
    ctx_result = (
        "contextvars Isolation successful"
        if ctx_current == ctx_initial
        else "contextvars Isolation failed"
    )

    # Check the result of threading.local()
    # Because Task B will overwrite this value when Task A is paused
    local_result = (
        "threading.local Isolation successful"
        if local_current == local_initial
        else "threading.local Isolation failed (overwritten)"
    )

    print(
        f"\n[{task_name}] Resume:\n   \
        \tContext ID:\t{ctx_current} ({ctx_result})\n   \
        \tLocal ID:\t{local_current} ({local_result})"
    )

    return ctx_current


async def main():
    # Task A: simulate request A, Request ID is 100
    async def task_a():
        # Set value for contextvars, get Token
        token = request_id_ctx.set(100)
        try:
            # Pass 100 to process_data as local.value
            return await process_data("Task A", 100)
        finally:
            request_id_ctx.reset(token)  # 2. No matter what, reset the value

    # Task B: simulate request B, Request ID is 200
    async def task_b():
        token = request_id_ctx.set(200)  # 1. Set value, get Token
        try:
            # Pass 200 to process_data as local.value
            return await process_data("Task B", 200)
        finally:
            request_id_ctx.reset(token)  # 2. No matter what, reset the value

    print("\nStart concurrent async tasks (running on the same thread)")

    await asyncio.gather(task_a(), task_b())


if __name__ == "__main__":
    asyncio.run(main())

运行结果:

(venv) $ python3 ctx_vars.py 

Start concurrent async tasks (running on the same thread)
[Task A] Context ID: 100 | Local ID: 100
[Task B] Context ID: 200 | Local ID: 200

[Task A] Resume:
                Context ID:     100 (contextvars Isolation successful)
                Local ID:       200 (threading.local Isolation failed (overwritten))

[Task B] Resume:
                Context ID:     200 (contextvars Isolation successful)
                Local ID:       200 (threading.local Isolation successful)
(venv) $