AI Blog

Start typing to search...

All Articles

Python async programming patterns

# Python 异步编程模式:从回调地狱到优雅并发 ## 一、问题背景:I/O 密集型场景的痛点 在传统的同步编程模型中,当程序执行 I/O 操作(如网络请求、文件读写、数据库查询)时,CPU 会处于空闲等待状态。例如,一个 Web 爬虫需要依次请求 100 个网页,每个请求耗时 200ms,总耗时将达到 20 秒。这种阻塞等待极大地浪费了 CPU 资源,也降低了程序的吞吐能力。 Pyth

· 4 min read

Python 异步编程模式:从回调地狱到优雅并发

一、问题背景:I/O 密集型场景的痛点

在传统的同步编程模型中,当程序执行 I/O 操作(如网络请求、文件读写、数据库查询)时,CPU 会处于空闲等待状态。例如,一个 Web 爬虫需要依次请求 100 个网页,每个请求耗时 200ms,总耗时将达到 20 秒。这种阻塞等待极大地浪费了 CPU 资源,也降低了程序的吞吐能力。

Python 的 asyncio 库提供了基于事件循环的异步编程模型,允许程序在等待 I/O 操作时切换到其他任务,从而实现高效的并发处理。但异步编程的复杂性往往让开发者望而却步,尤其是回调地狱、状态管理、异常处理等问题。

本文将从实际问题出发,对比同步、多线程、异步三种方案,深入剖析 asyncio 的核心实现,并通过性能分析和最佳实践,帮助读者掌握 Python 异步编程的精髓。

二、方案对比:同步 vs 多线程 vs 异步

2.1 同步方案(最直观但最慢)

import time
import requests

def fetch_url(url):
    response = requests.get(url)
    return response.status_code

urls = ["https://httpbin.org/delay/1"] * 5
start = time.time()
results = [fetch_url(url) for url in urls]
print(f"同步耗时: {time.time() - start:.2f}s")

每个请求阻塞 1 秒,5 个请求串行执行,总耗时约 5 秒。

2.2 多线程方案(快但资源开销大)

import time
import requests
from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    response = requests.get(url)
    return response.status_code

urls = ["https://httpbin.org/delay/1"] * 5
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))
print(f"多线程耗时: {time.time() - start:.2f}s")

利用 5 个线程并行执行,耗时约 1 秒。但线程切换有开销,且 Python 的 GIL 限制了 CPU 密集型任务的并行性。

2.3 异步方案(高效且轻量)

import time
import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return response.status

async def main():
    urls = ["https://httpbin.org/delay/1"] * 5
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)

start = time.time()
asyncio.run(main())
print(f"异步耗时: {time.time() - start:.2f}s")

核心优势:单线程内通过事件循环实现协程切换,无需线程上下文切换开销,可轻松管理数万个连接。

三、深入实现:事件循环与协程调度

3.1 async/await 语法糖

async def 定义的函数返回一个协程对象(coroutine)。await 关键字会挂起当前协程,将控制权交还给事件循环,直到等待的 awaitable 对象完成。

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟 I/O 等待
    print("World")

asyncio.sleep(1) 不会阻塞线程,而是注册一个 1 秒后的回调,事件循环在此期间可以执行其他协程。

3.2 事件循环的工作机制

事件循环本质上是一个 while True 循环,不断检查就绪的任务队列:

  1. 从就绪队列取出一个协程执行,直到遇到 await
  2. 将协程挂起,注册回调到 I/O 多路复用器(如 epoll)
  3. 当 I/O 事件就绪,将协程放回就绪队列
  4. 重复上述过程
# 简化版事件循环演示
import selectors
import time

class SimpleEventLoop:
    def __init__(self):
        self._tasks = []
        self._selector = selectors.DefaultSelector()
    
    def add_task(self, coro):
        self._tasks.append(coro)
    
    def run_forever(self):
        while self._tasks:
            # 执行所有就绪任务
            current_tasks = self._tasks[:]
            self._tasks = []
            for task in current_tasks:
                try:
                    task.send(None)  # 恢复协程执行
                except StopIteration:
                    pass  # 协程完成
                else:
                    self._tasks.append(task)  # 未完成继续添加

3.3 任务调度与并发控制

asyncio.create_task() 将协程包装为 Task 对象并加入事件循环调度。asyncio.gather() 则实现并发等待多个任务完成。

async def worker(name, delay):
    await asyncio.sleep(delay)
    return f"Worker {name} done"

async def main():
    # 创建并调度三个任务
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 1))
    task3 = asyncio.create_task(worker("C", 3))
    
    # 等待所有任务完成,按完成顺序返回结果
    results = await asyncio.gather(task1, task2, task3)
    print(results)  # ['Worker A done', 'Worker B done', 'Worker C done']

四、性能分析:异步 vs 多线程

我们通过一个更真实的基准测试来对比两种方案:

import asyncio
import time
import aiohttp
import requests
from concurrent.futures import ThreadPoolExecutor

async def async_fetch(session, url):
    async with session.get(url) as resp:
        return await resp.text()

def sync_fetch(url):
    return requests.get(url).text

async def async_main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

def thread_main(urls):
    with ThreadPoolExecutor(max_workers=20) as executor:
        return list(executor.map(sync_fetch, urls))

urls = ["https://httpbin.org/get"] * 100

# 异步测试
start = time.time()
asyncio.run(async_main(urls))
async_time = time.time() - start

# 多线程测试
start = time.time()
thread_main(urls)
thread_time = time.time() - start

print(f"异步耗时: {async_time:.2f}s")
print(f"多线程耗时: {thread_time:.2f}s")

典型结果

  • 异步:0.8s
  • 多线程(20线程):1.2s

分析

  • 异步方案在单线程内实现并发,避免了线程创建和上下文切换的开销
  • 当并发量达到数千时,多线程会因线程数过多导致性能急剧下降,而异步仍能保持稳定
  • 异步的缺陷:CPU 密集型任务会阻塞事件循环,不适合计算密集场景

五、最佳实践

5.1 使用信号量限制并发

import asyncio
import aiohttp

semaphore = asyncio.Semaphore(10)  # 最多10个并发

async def fetch_with_limit(session, url):
    async with semaphore:
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

5.2 正确处理异常

async def safe_fetch(session, url):
    try:
        async with session.get(url, timeout=10) as resp:
            return resp.status
    except asyncio.TimeoutError:
        print(f"Timeout: {url}")
        return None
    except aiohttp.ClientError as e:
        print(f"Client error: {e}")
        return None

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [safe_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # return_exceptions=True 让异常作为结果返回,而不是抛出

5.3 避免阻塞事件循环

# 错误做法:同步阻塞
async def bad_example():
    import time
    time.sleep(5)  # 阻塞整个事件循环5秒!
    return "done"

# 正确做法:使用异步sleep
async def good_example():
    await asyncio.sleep(5)  # 让出控制权
    return "done"

# 如果必须执行CPU密集型任务,使用run_in_executor
import concurrent.futures
async def cpu_bound():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # 使用默认线程池
        lambda: sum(range(10**7))  # CPU密集型计算
    )
    return result

5.4 结构化并发

使用 asyncio.TaskGroup(Python 3.11+)管理任务生命周期:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_url("url1"))
        task2 = tg.create_task(fetch_url("url2"))
    # TaskGroup 退出时自动等待所有子任务完成

六、总结

Python 的 asyncio 为 I/O 密集型应用提供了高效、轻量的并发解决方案。核心要点:

  1. 适用场景:网络请求、数据库查询、文件读写等 I/O 密集型任务
  2. 关键概念:事件循环、协程、任务、awaitable 对象
  3. 性能优势:单线程内实现并发,无线程切换开销,可管理海量连接
  4. 注意事项:避免阻塞调用、合理控制并发数、妥善处理异常

掌握这些异步编程模式,能显著提升 Python 应用的吞吐能力和资源利用率。

参考资料

  1. Python 官方文档: asyncio — 异步 I/O
  2. aiohttp 官方文档: https://docs.aiohttp.org/
  3. Caleb Hattingh. Using Asyncio in Python. O’Reilly Media, 2020.
  4. Luciano Ramalho. Fluent Python (2nd Edition). O’Reilly Media, 2022. Chapter 21: Asynchronous Programming.
  5. Python PEP 492: Coroutines with async and await syntax