一些概念:
GIL (全局解释器锁)
影响对象:多线程 (Threading)
这不是 Python 语言本身的问题,而是 CPython 解释器(官方默认版)的历史遗留问题。为了防止内存管理(垃圾回收机制)出错,它必须保证同一时刻只有一个线程在操作内存。
对于CPU密集任务,这个是不好的,因为过一段时间就会强制换出,多线程算数换入换出的时间还不如完全单线程快。
对于IO密集,是好的,因为GIL 很智能:只要你在“等”(IO 操作),你就必须换

Event Loop
主线程运行同步任务被阻塞。应该把可异步的阻塞任务都await掉,把同步的阻塞任务都扔线程池,或者通过aiosqlite换成异步驱动的相应操作

协程asyncio(协作式调度)

高并发IO密集
(爬虫,接口服务)
协程和线程的主要区别就是await,显式表明了切换是自愿的才能切.
有点像js

  • async def fn(): 定义一个协程函数
  • await something 表示这一步可能要等 I/O,等的时候我可以去干别的任务
    • await asyncio.sleep(1)(睡觉时把执行权让给别人)
    • await websocket.recv()(等网络数据时把执行权让给别人)
    • await db.fetch()(异步驱动的数据库客户端会在 I/O 等待期让出执行权)
  • asyncio.gather(a,b,c) 等大家都跑完,并把每个任务的返回值按顺序(0..9)收集成列表返回
  • asyncio.create_task(func)把这个异步工作丢给事件循环,让它并发跑起来;把工单贴到任务墙(Task任务手柄,可以取消和检查)上,工人现在可以开始干这活了.也可以直接运行func()
  • asyncio.run(func)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import random
import time

async def fn(x:str):
xxx
await 干别的事情 #只有await并发
xxx
return

async def main():
#task=[
#asyncio.create_task(fn(str))
#for i in range(10)
#]
tasks = []
for i in range(10):
t = asyncio.create_task(fake_io_task(f"task-{i}"))#永远不要“随手”创建任务。要么 `await` 它,要么用 `asyncio.gather()` 收集它。不然被遗忘了不好
tasks.append(t)

result=await asyncio.gather(*task)
print()

asyncio.run(main())

线程也有自己管理生命周期的东西 Taskgroup

1
2
3
4
5
6
7
8
9
async def main(): 
try: # 3.11+ 的 TaskGroup 会自动管理生命周期 # 只要退出 with 块,所有任务都会被自动 await 或取消 # 并且,只要有一个任务失败,它会立即取消其他所有任务并抛出异常
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(sub_task("A"))
t2 = tg.create_task(sub_task("B", fail=True))
t3 = tg.create_task(sub_task("C"))
# 如果没有失败,这里可以收集结果 # print(t1.result(), t2.result(), t3.result())
except* ValueError as e: # 3.11+ 的 ExceptionGroup
print(f"Tasks failed: {e.exceptions}")

把阻塞的同步代码塞进线程池(async + thread 结合)

要注意await只在可等待的对象上让出控制权,表现出并发的样子.如果在协程里调用的是普通的阻塞函数(比如 requests.get() / sqlite3.execute() / boto3.client().put_object() 这种同步 I/O),这个函数是不会给事件循环机会切走的,

async def 只决定了这个函数本身返回的是一个协程对象
它完全不保证你里面的每一行代码都是异步友好的。
只有 await 某个awaitable 这一瞬间,事件循环才能把控制权拿回来分配给别的协程。
如果你在 async def 里面跑的是同步的东西,它没有 await 点,loop 根本接管不了。

所以假如我的代码里全部都是同步库可以使用offload 到线程池技巧

asyncio.to_thread()(Python 3.9+)会把一个阻塞的同步函数扔进线程池跑

1
2
3
4
5
6
7
def block(url):
return requests.get(url).text # 这是阻塞的老代码

async def fetch(url):
# 关键点:把同步阻塞任务放到线程池里跑
text = await asyncio.to_thread(block, url)
return text

异步驱动:

丢进线程池(Threading) 都是为了解决同一个问题:防止数据库操作(IO)卡死主线程(Event Loop)

情况 A:同步阻塞(现在的代码)

点单员把单子递给制作员后,站在柜台前死等,眼睛死死盯着制作员,直到奶茶做好拿到手,才转身去接待下一个排队的顾客。

  • 后果: 后面排队的顾客(其他请求)全部卡死,店铺吞吐量极低。
情况 B:丢进线程池(run_in_threadpool

点单员把单子递给一个临时工(Thread),让临时工站在柜台前死等。点单员自己转身去接待下一个顾客。等奶茶做好了,临时工拍拍点单员肩膀说“好了”。

  • 原理: 用操作系统层面的线程(Thread)来包裹同步代码。
  • 消耗: 雇佣临时工需要开工资(内存开销),而且临时工多了,厨房会拥挤(上下文切换开销)。
  • 适用: 代码不想重写,或者数据库驱动不支持异步时。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    def sync_db_query():
    print(f"[临时工] 开始查询数据库 (线程 ID: {asyncio.get_running_loop()})") # 模拟建立连接(阻塞)
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor() # 模拟一个耗时查询(强制睡2秒,模拟慢SQL)
    time.sleep(2)
    cursor.execute('SELECT "奶茶做好了"')
    result = cursor.fetchone()[0]
    conn.close()
    print("[临时工] 查询结束")
    return result

    丢线程池
    result = await asyncio.to_thread(sync_db_query)
情况 C:异步驱动(asyncpg / aiosqlite

点单员把单子贴在厨房的“订单墙”上(发送非阻塞请求),然后立刻转身接待下一个顾客。当厨房做好奶茶后,按一下呼叫铃(Callback/Await),点单员听到铃声,把做好的奶茶递给顾客。

  • 原理: 利用数据库协议的非阻塞特性,不占用额外的系统线程,纯粹利用 CPU 的空闲时间片切换。
  • 消耗: 极低。不需要雇临时工,完全由点单员一人搞定。
  • 适用: 追求极致性能、高并发,且愿意重写代码时。
    asyncio 的最佳实践是使用原生的异步驱动(如 asyncpg),因为它们不需要线程上下文切换的开销,并发能力最强。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import asyncio 
    import aiosqlite # 这是一个原生支持异步的 SQLite 驱动
    async def async_db_query():
    print("[点单员] 开始非阻塞查询...") # 建立连接是异步的 (注意 async with)
    async with aiosqlite.connect(':memory:') as db: # 甚至获取游标也是异步的(部分库如此)
    async with db.execute('SELECT "奶茶做好了"') as cursor:
    # --- 关键动作 --- # 这里的 sleep 是非阻塞的,模拟数据库正在忙,但 CPU 是空闲的
    await asyncio.sleep(2) # 获取结果也是异步的
    row = await cursor.fetchone()
    print("[点单员] 查询结束")
    return row[0]

    挂起
    result = await async_db_query()
    感觉就是和JavaScript一模一样,核心共性就是“单线程 + 非阻塞”。但是JavaScript是原生的,python生来是同步的(为了简单易学)。
  • 它的标准库(如 time.sleepsqlite3requests默认全是阻塞的!如果用它们,服务员就会傻等。
  • 为了实现高并发,Python 引入了 asyncio 库(相当于给 Python 装了一个事件循环)。
  • 但光有循环没用,你还需要把那些“会傻等”的库(驱动),换成“不会傻等”的库(异步驱动,如 asyncpg, httpx)。

但是有一些老的算法库不支持异步,只能扔线程池。

除了扔到线程池,也可以扔到进程池

线程(Thread)是假的并发。因为有 GIL,同一时刻只能有一个 CPU 核在跑 Python 代码。
asyncio 线程负责网络吞吐,ProcessPool 里的子进程负责“啃” CPU,互不干扰。
loop.run_in_executor“异步”调用“同步”

1
2
3
4
5
6
7
8
9
10
11
12
13
# 最好全局共享一个 ProcessPoolExecutor
cpu_executor = ProcessPoolExecutor()

@app.post("/process")
async def process_data(data):
loop = asyncio.get_running_loop()
# 把 CPU 密集任务扔到进程池
result = await loop.run_in_executor(
cpu_executor,
heavy_cpu_function, # 这是一个普通的 sync 函数
data
)
return {"result": result}

多线程threading(强制抢占式调度)

IO密集
(少量文件读写,等待网络磁盘数据库)

  • thread = threading.Thread(target=fn, args=(…))
  • thread.start() 开始,thread.join() 等它结束
  • GIL:同一时间只有一个线程在跑 Python 字节码(CPU 密集时线程不会真正并行),但 I/O 密集时线程切换还是很有用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import threading
    import time

    def fn(x:str):
    里面所有内容都是并发
    return

    #threads = [threading.Thread(target=inc) for _ in range(4)]
    #[t.start() for t in threads]
    #[t.join() for t in threads]
    tasks = []
    for i in range(10):
    t =threading.Thread(target=fn,args=(i,))
    t.start()
    tasks.append(t)

    for t in tasks:
    task.join()

    print()


因为线程里的所有内容都在并发,所以要注重竞态

  • 保护共享变量 :Lock加锁 在写前面加with lock:任何“读-改-写”的共享数据结构(全局计数器、缓存字典、统计信息等)

  • 发送数据 :Queue消息队列管道.生产者和多个并行消费者从管道里存取东西.从“很多线程围着同一块内存改啊改”的混战,降级成了“把一份具体工作(消息)打包给一个明确负责人”。get还有天然的阻塞等待.并且可以有多个Queue来给消息分类.

    • 消息传递:比如全局变量(例如 counter)的所有权被转移给了某一个线程,其他工作线程不能直接改这个变量,它们只能把“我要+1”这种意图作为消息丢到队列里。同时不同线程只处理自己感兴趣的任务img_q = queue.Queue() ||task = img_q.get()
    • 数据传递:保证同一条数据只被一个线程拿到
    • 任务调度:每个任务只会被一个线程消费,多线程自动分工,无需手动加锁task_q = queue.Queue()||task_q.put(task)||task = task_q.get()
    • 停机:发送停机信号(每个线程一个)for _ in range(2):q.put(None)
    • 异步解耦:生产者(比如网络监听、GUI事件)不能被阻塞,它只要把数据扔进队列就可以继续工作。不用担心传给谁,收没收到 .
  • 发信号控制状态 :Event发送开始停止.
    stop_event = threading.Event()
    stop_event.set()
    while not stop_event.is_set():

async可以放部分函数到线程池,那如果线程想要调async那应该怎么做
同步调用异步asyncio.run_coroutine_threadsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
# 假设在一个线程 B 中(比如 Flask 的 worker 线程)
# 而 asyncio 循环在线程 A 中
import asyncio
loop = asyncio.get_running_loop() # 假设能拿到在 A 线程的 loop

# 提交任务到 A 线程的 loop
future = asyncio.run_coroutine_threadsafe(my_async_func(), loop)
#不能用 `asyncio.run()`,因为它会创建_新_的循环。而你的 `async` 任务可能需要跑在_那个已经存在的_、位于线程 A 的循环上(比如那个循环上跑着 `asyncpg` 的连接池或 `httpx` 的客户端)。
- 我(线程 B)这里有一个 `async` 任务 `my_async_func`。”
- “请把它**线程安全地**‘扔’到那个正在线程 A 运行的 `loop` 的任务队列里。

# 在 B 线程中阻塞等待结果
result = future.result(timeout=10)

线程也有线程池concurrent.futures.ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io(url):
print(f"fetching {url} in {threading.current_thread().name}")
time.sleep(1) # 模拟 requests.get()
return f"{url} data"

urls = ["a", "b", "c", "d", "e"]

# 使用 ThreadPoolExecutor,自动管理 3 个 worker 线程
with ThreadPoolExecutor(max_workers=3) as executor:
# 1. 批量提交,保持顺序
results = executor.map(blocking_io, urls)

# 2. 逐个提交,不保顺序(更灵活)
# futures = [executor.submit(blocking_io, url) for url in urls]
# results = [f.result() for f in futures]

print(list(results))

锁和信号量

with Lock
with RLock 可重入锁,当你的类中,一个加了锁的方法 A,内部又调用了另一个也加了锁的方法 B。如果用 Lock,B 就会死锁;用 RLock 就没问题
with threading.Semaphore(5)

1
2
3
4
5
api_limit_sem = threading.Semaphore(5)
def call_api():
with api_limit_sem:
# with 块保证了同一时间最多只有 5 个线程能进入这里
requests.get(...)

进程 multiprocessing

cpu密集
(例如压缩、加密、解析 JSON 巨文件、机器学习推理)
Python 的 GIL 限制了同进程多线程的并行执行 CPU-bound 代码

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing
import math
import time

def cpu_heavy(n: int):
# 人为制造CPU负载:计算很多素数
total = 0
for x in range(2, n):
prime = True
for y in range(2, int(math.sqrt(x)) + 1):
if x % y == 0:
prime = False
break
if prime:
total += 1
return total

if __name__ == "__main__":
numbers = [50_000, 60_000, 70_000, 80_000]
# - 把一批离散任务[50_000, 60_000, 70_000, 80_000]交给池子。池子会自动把任务分发给进程,并收集返回值。 进程池负责调度、发活、回收。
t0 = time.perf_counter()
with multiprocessing.Pool() as pool:
results = pool.map(cpu_heavy, numbers)
t1 = time.perf_counter()

print("results:", results)
print(f"total time: {t1 - t0:.2f}s")

pool = multiprocessing.Pool(processes = 3)提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。否则等待某个进程结束

pool.map 会把任务分散到多个进程真正并行执行。使进程阻塞直到结果返回.
也可以看出pool是一批一批的,任务导向的,不能实时控制一个长期循环的线程

注意:if __name__ == “__main__“:

  • python myscript.py 运行脚本时,__name__ 的值是 "__main__",所以里面的代码会执行。
  • 如果别的文件 import myscript,那 myscript.__name__ 就是 "myscript",这个 if 里面的代码就不会跑。
  • 因为python在Windows/macOS上跑的时候启动子进程不用fork用spawn,子进程并不会“接着父进程的执行点往下跑”,它其实是- 启动一个全新的 Python 解释器进程(干净的)import 你的主脚本文件(比如 myscript.py)。 运行这个模块的顶层代码(也就是文件最外层的所有语句)。根据父进程告诉它的“target 函数”去执行实际任务。然后就会无限运行没有main包裹的p = Process(…)创建进程.

with multiprocessing.Pool() as pool: 是一种**上下文管理器 (Context Manager)**。- 进入 with 块时: Python 会调用 Pool 对象的 __enter__ 方法,这个方法启动了进程池并返回 pool 对象,让你可以在 with 块内部使用它。 退出 with 块时 (无论正常结束还是发生异常): Python 会自动调用 Pool 对象的 __exit__ 方法。
所以那两行代码等价于

1
2
3
4
5
6
7
pool = multiprocessing.Pool()
try:
    results = pool.map(cpu_heavy, numbers)
finally:
# 这两步是 "with" 语句自动帮你做的
    pool.close() # 1. 告诉池子:我没有新任务了
    pool.join() # 2. 告诉池子:请等待所有已提交的任务执行完毕

进程间通信multiprocessing.Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import multiprocessing
import time
import os

def worker(input_q: multiprocessing.Queue, output_q: multiprocessing.Queue):
pid = os.getpid()
while True:
item = input_q.get()
if item is None:
output_q.put(f"worker {pid} exit")
break
# 假装这里是重CPU的工作
time.sleep(0.5)
output_q.put((pid, item * item))

if __name__ == "__main__":
in_q = multiprocessing.Queue()
out_q = multiprocessing.Queue()

procs = [multiprocessing.Process(target=worker, args=(in_q, out_q)) for _ in range(3)]
for p in procs: p.start()

for x in range(10):
in_q.put(x)

# 发送停机信号
for _ in procs:
in_q.put(None)

# 收集结果
finished = 0
while finished < len(procs):
msg = out_q.get()
print("got:", msg)
if isinstance(msg, str) and "exit" in msg:
finished += 1

for p in procs: p.join()
print("all done")

没有用线程池.因为这个任务是while的常驻的需要持续读取数据监控当时状态的.

  • Pool 更像:“帮我把这堆活做完并把结果还给我”
  • 手工开 Process 更像:“我要一群常驻工人,主进程会源源不断地给你们推事件,你们就一直干,直到我告诉你们停”

通信是有代价的,所以“每个小步骤都发消息”会拖垮性能。进程适合“少量大块任务”,而不是“每毫秒都同步状态”。

共享状态multiprocessing.Manager

  • 线程可以共享内存(在同一进程里)
  • 进程不能直接共享内存(是隔离的地址空间)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import multiprocessing
    import time
    import os

    def worker(shared_dict, lock):
    pid = os.getpid()
    for _ in range(5):
    time.sleep(0.1)
    with lock:
    shared_dict["count"] = shared_dict.get("count", 0) + 1
    print(f"{pid} done")

    if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
    shared = manager.dict()
    lock = manager.Lock()

    procs = [multiprocessing.Process(target=worker, args=(shared, lock)) for _ in range(4)]
    for p in procs: p.start()
    for p in procs: p.join()

    print("final shared dict:", shared)

必须dict,就像把一个变量发送到云端,给各个进程客户端读取.但是要注意也要加锁..所有进程必须用同一把锁,不然根本起不到互斥。所以使用manager.lock

1
2
3
4
5
6
7
8
9
data = {}  # 你以为共享,实际上不是

def worker():
data["x"] = 42 #data不会更新.在多进程模型里,每个进程都有自己独立的内存副本。子进程修改的是它自己的 `data` 拷贝,不会回写到父进程的 `data`。

p = multiprocessing.Process(target=worker)
p.start(); p.join()
print(data)

multiprocessing.Lock() 也能创建跨进程锁,但它跟 manager.Lock() 的传播方式、生命周期、兼容方式不太一样.
主要是manager.lock也是个代理对象,风格统一,而且也可以放到某个结构体里.
manager 产生的东西(包括 manager.dict(), manager.Lock())本质上就是“可序列化的代理”,天生就是为 spawn 模型准备的——它们知道如何跨进程远程调用。

  • multiprocessing.Lock()
    → 这是一把“操作系统级的跨进程锁的句柄”。
    → 你手里拿着的是一个直接的同步原语。
    → 适合你手动管理、直接传给子进程。

  • manager.Lock()
    → 这是一把“放在共享服务中心里的锁”。
    → 你拿到的是一个“远程遥控器(proxy)”,每次上锁解锁都去找那个 manager 服务协调。
    → 适合你已经把所有共享状态都托管进 manager,希望所有东西都用同一种“远程代理”语义来共享。

manager非常慢,只适合共享少量、低频更新的“状态信息”(比如一个配置字典、一个计数器)。永远只向子进程传递简单的、可序列化的数据(字符串、数字、列表、字典)。子进程自己去初始化它需要的资源(比如数据库连接)。减少“每秒很多小消息”,而是“批量发一坨,大块处理,再一次性回结果”。

高性能共享:multiprocessing.shared_memory

绕过了序列化,直接操作同一块物理内存。这很复杂(需要手动管理生命周期和锁),但性能是极致的。

profiling(找瓶颈,决定用哪种并发)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import cProfile
import pstats
from io import StringIO

def slow_function():
s = 0
for i in range(10_0000):
for j in range(200):
s += (i*j) % 7
return s

pr = cProfile.Profile()
pr.enable()

slow_function()

pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats("cumtime")
ps.print_stats(20) # 只看前20条最慢的
print(s.getvalue())

  • 用 cProfile + pstats 定位你的瓶颈到底是 I/O 还是 CPU
    1
    2
    3
    4
    5
    6
    7
    import time

    t0 = time.perf_counter()
    # ... run something ...
    t1 = time.perf_counter()
    print(f"took {t1 - t0:.4f} sec")

  • 用 time.perf_counter() 做微基准

“我要不要调大线程池/进程池/并发数/队列大小”常用的指标

  • 当前队列长度(说明下游跟不跟得上)
  • 平均任务处理耗时 / p95 耗时
  • 每秒任务吞吐量
  • 错误率、超时率
  • 当前活跃 worker 数

示例:在你的 worker loop 里周期性打印 / 上报这些东西到日志或 Prometheus。这是容量规划的核心。

决策速查

  • 这是 I/O 密集任务,有成百上千个并发?
    • 首选 asyncio
    • 如果有老的阻塞库,wrap 成 asyncio.to_thread
  • 这是 I/O 密集任务,但数量不大(几十个以内),而且都是同步库?
    • threading.Thread / ThreadPoolExecutor 很够用,简单直接
  • 这是 CPU 密集任务,要吃满多核?
    • multiprocessing.Pool / Process
  • 我需要多个消费者一起处理任务队列
    • 线程间用 queue.Queue
    • 进程间用 multiprocessing.Queue
    • 用 None 作为优雅退出信号
  • 我担心数据竞争?
    • 单进程多线程:用 threading.Lock
    • 多进程:尽量别共享状态。如果必须共享,用 multiprocessing.Manager() + Lock
  • 性能不行,我怎么知道瓶颈在哪?
    • 用 cProfile / perf_counter 证据说话
    • 不要猜

“高并发”不是指你能发很多请求,而是你能承受下游变慢、不稳定、半挂、雪崩。
必须给每个 I/O 操作加这些东西:

  1. 超时(timeout)
  2. 有限重试(retry with backoff)
  3. 熔断/降级(不继续打坏掉的下游,而是短路)

总结(这一段建议你背下来):

  1. 并发不是只管“跑更快”,并发是“把不同类型的等待/计算隔离开,让它们互不拖死”。

  2. 任何生产者-消费者结构都必须考虑背压(队列上限)。即限制队列大小,让生产者也被迫慢下来。
    q = queue.Queue(maxsize=1000) # 限制容量
    等队列满了就 等待
    不做这个,线上某个慢下游(例如数据库写慢了)会把你的内存干穿。

  3. 所有 I/O 都必须有 timeout + 重试策略 + 熔断/降级(不继续打坏掉的下游,而是短路)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import asyncio
    import aiohttp
    from random import random

    async def safe_fetch(url):
    for attempt in range(3): # 最多3次
    try:
    async with aiohttp.ClientSession() as session:
    async with session.get(url, timeout=5) as resp:
    if resp.status == 200:
    return await resp.text()
    else:
    # 非200,不一定要重试,这取决于业务
    raise RuntimeError(f"bad status {resp.status}")
    except Exception as e:
    await asyncio.sleep(0.1 * (2 ** attempt) + random() * 0.05)
    return None # 或者返回fallback


  4. asyncio 的并发不是无限并发:用 Semaphore 管住,避免自杀。不然会- 打爆对方 API, 打爆数据库连接池,打爆你自己本机 CPU/带宽

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import asyncio
    import aiohttp

    sem = asyncio.Semaphore(50) # 最多50个并发

    async def bounded_fetch(session, url):
    async with sem:
    async with session.get(url, timeout=5) as resp:
    return await resp.text()

    async def main(urls):
    async with aiohttp.ClientSession() as session:
    tasks = [asyncio.create_task(bounded_fetch(session, u)) for u in urls]
    return await asyncio.gather(*tasks)

    asyncio.run(main(urls))
  5. gather 默认是“有一个挂了全军覆没”,记得 return_exceptions=True

    1
    2
    3
    4
    5
    6
       results = await asyncio.gather(*tasks, return_exceptions=True)
    for r in results:
    if isinstance(r, Exception):
    # 局部错误处理
    ...

  6. 不要用多进程频繁互传巨对象,尽量让进程本地热身后常驻干活.还有共享内存的复用,尽量不要多线程,要把危险对象包进一个“worker单线程”,而所有对它的操作都变成“往队列里发送指令”。

  7. 指标埋点和优雅停机,是能不能上生产环境的底线。

架构模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import aiohttp
import logging
import time
from asyncio import Queue, Semaphore
from concurrent.futures import ThreadPoolExecutor
from threading import Event
from random import random

# --- 配置部分 ---
# 1. 设置日志
logging.basicConfig(level=logging.INFO)

# 2. 限制并发数(Semaphore)
semaphore = Semaphore(50) # 最多允许50个并发请求

# 3. 异步HTTP请求客户端
async def fetch(session, url):
async with semaphore: # 限制并发数
try:
async with session.get(url, timeout=5) as resp:
if resp.status == 200:
return await resp.text()
else:
logging.error(f"Failed to fetch {url} with status {resp.status}")
return None
except Exception as e:
logging.error(f"Error fetching {url}: {e}")
return None

# --- 任务队列处理 ---
# 4. 生产者-消费者模型
task_queue = Queue(maxsize=100) # 限制队列大小

async def producer(urls):
"""生产者:将 URLs 放入队列"""
for url in urls:
await task_queue.put(url)
# 发送停止信号
for _ in range(3): # 假设有3个消费者
await task_queue.put(None)

async def consumer():
"""消费者:从队列取 URL 并处理"""
while True:
url = await task_queue.get()
if url is None: # 停机信号
break
content = await fetch(session, url)
if content:
# 假设我们进行一些处理并打印结果
logging.info(f"Processed content of {url}")
task_queue.task_done()

# --- 使用线程池处理阻塞任务 ---
# 5. 阻塞 I/O 任务通过线程池 offload
def blocking_task(url):
"""例如,旧的阻塞代码,比如 requests.get()"""
import requests
return requests.get(url).text

async def fetch_blocking(url):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, blocking_task, url)

# --- 数据采集 & 任务调度 ---
# 6. 数据处理任务,异步和阻塞混合模式
async def process_data(urls):
async with aiohttp.ClientSession() as session:
# 启动生产者-消费者模式
await asyncio.gather(
producer(urls),
*[consumer() for _ in range(3)] # 启动3个消费者
)

# --- 进程池与 CPU 密集任务 ---
# 7. CPU 密集型任务(进程池):
def cpu_heavy_task(n):
"""CPU 密集型任务示例,例如计算素数"""
total = 0
for x in range(2, n):
prime = True
for y in range(2, int(x**0.5) + 1):
if x % y == 0:
prime = False
break
if prime:
total += 1
return total

async def run_cpu_tasks():
# 用进程池处理 CPU 密集任务
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_task, 100000)
logging.info(f"Processed CPU task result: {result}")

# --- 主程序入口 ---
# 8. 启动服务与监控
async def main():
urls = ["https://example.com", "https://example.org", "https://example.net"]

# 线程池初始化,用于 offload 阻塞 I/O
global executor
executor = ThreadPoolExecutor(max_workers=10)

# 数据采集与任务处理
await process_data(urls)

# 同时处理 CPU 密集任务
await run_cpu_tasks()

# --- 程序启动 ---
if __name__ == "__main__":
# 设置停机信号
stop_event = Event()

try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Gracefully shutting down...")
stop_event.set() # 设置停机信号