python并发
一些概念:
GIL (全局解释器锁)
影响对象:多线程 (Threading)
这不是 Python 语言本身的问题,而是 CPython 解释器(官方默认版)的历史遗留问题。为了防止内存管理(垃圾回收机制)出错,它必须保证同一时刻只有一个线程在操作内存。
对于CPU密集任务,这个是不好的,因为过一段时间就会强制换出,多线程算数换入换出的时间还不如完全单线程快。
对于IO密集,是好的,因为GIL 很智能:只要你在“等”(IO 操作),你就必须换
Event Loop
主线程运行同步任务被阻塞。应该把可异步的阻塞任务都await掉,把同步的阻塞任务都扔线程池,或者通过aiosqlite换成异步驱动的相应操作
协程asyncio(协作式调度)
高并发IO密集
(爬虫,接口服务)
协程和线程的主要区别就是await,显式表明了切换是自愿的才能切.
有点像js
async def fn():定义一个协程函数await something表示这一步可能要等 I/O,等的时候我可以去干别的任务await asyncio.sleep(1)(睡觉时把执行权让给别人)await websocket.recv()(等网络数据时把执行权让给别人)await db.fetch()(异步驱动的数据库客户端会在 I/O 等待期让出执行权)
asyncio.gather(a,b,c)等大家都跑完,并把每个任务的返回值按顺序(0..9)收集成列表返回asyncio.create_task(func)把这个异步工作丢给事件循环,让它并发跑起来;把工单贴到任务墙(Task任务手柄,可以取消和检查)上,工人现在可以开始干这活了.也可以直接运行func()- asyncio.run(func)
1 | import asyncio |
线程也有自己管理生命周期的东西 Taskgroup
1 | async def main(): |
把阻塞的同步代码塞进线程池(async + thread 结合)
要注意await只在可等待的对象上让出控制权,表现出并发的样子.如果在协程里调用的是普通的阻塞函数(比如 requests.get() / sqlite3.execute() / boto3.client().put_object() 这种同步 I/O),这个函数是不会给事件循环机会切走的,
async def 只决定了这个函数本身返回的是一个协程对象。
它完全不保证你里面的每一行代码都是异步友好的。
只有 await 某个awaitable 这一瞬间,事件循环才能把控制权拿回来分配给别的协程。
如果你在 async def 里面跑的是同步的东西,它没有 await 点,loop 根本接管不了。
所以假如我的代码里全部都是同步库可以使用offload 到线程池技巧
asyncio.to_thread()(Python 3.9+)会把一个阻塞的同步函数扔进线程池跑
1 | def block(url): |
异步驱动:
和 丢进线程池(Threading) 都是为了解决同一个问题:防止数据库操作(IO)卡死主线程(Event Loop)
情况 A:同步阻塞(现在的代码)
点单员把单子递给制作员后,站在柜台前死等,眼睛死死盯着制作员,直到奶茶做好拿到手,才转身去接待下一个排队的顾客。
- 后果: 后面排队的顾客(其他请求)全部卡死,店铺吞吐量极低。
情况 B:丢进线程池(run_in_threadpool)
点单员把单子递给一个临时工(Thread),让临时工站在柜台前死等。点单员自己转身去接待下一个顾客。等奶茶做好了,临时工拍拍点单员肩膀说“好了”。
- 原理: 用操作系统层面的线程(Thread)来包裹同步代码。
- 消耗: 雇佣临时工需要开工资(内存开销),而且临时工多了,厨房会拥挤(上下文切换开销)。
- 适用: 代码不想重写,或者数据库驱动不支持异步时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sync_db_query():
print(f"[临时工] 开始查询数据库 (线程 ID: {asyncio.get_running_loop()})") # 模拟建立连接(阻塞)
conn = sqlite3.connect(':memory:')
cursor = conn.cursor() # 模拟一个耗时查询(强制睡2秒,模拟慢SQL)
time.sleep(2)
cursor.execute('SELECT "奶茶做好了"')
result = cursor.fetchone()[0]
conn.close()
print("[临时工] 查询结束")
return result
丢线程池
result = await asyncio.to_thread(sync_db_query)
情况 C:异步驱动(asyncpg / aiosqlite)
点单员把单子贴在厨房的“订单墙”上(发送非阻塞请求),然后立刻转身接待下一个顾客。当厨房做好奶茶后,按一下呼叫铃(Callback/Await),点单员听到铃声,把做好的奶茶递给顾客。
- 原理: 利用数据库协议的非阻塞特性,不占用额外的系统线程,纯粹利用 CPU 的空闲时间片切换。
- 消耗: 极低。不需要雇临时工,完全由点单员一人搞定。
- 适用: 追求极致性能、高并发,且愿意重写代码时。
asyncio的最佳实践是使用原生的异步驱动(如asyncpg),因为它们不需要线程上下文切换的开销,并发能力最强。感觉就是和JavaScript一模一样,核心共性就是“单线程 + 非阻塞”。但是JavaScript是原生的,python生来是同步的(为了简单易学)。1
2
3
4
5
6
7
8
9
10
11
12
13
14import asyncio
import aiosqlite # 这是一个原生支持异步的 SQLite 驱动
async def async_db_query():
print("[点单员] 开始非阻塞查询...") # 建立连接是异步的 (注意 async with)
async with aiosqlite.connect(':memory:') as db: # 甚至获取游标也是异步的(部分库如此)
async with db.execute('SELECT "奶茶做好了"') as cursor:
# --- 关键动作 --- # 这里的 sleep 是非阻塞的,模拟数据库正在忙,但 CPU 是空闲的
await asyncio.sleep(2) # 获取结果也是异步的
row = await cursor.fetchone()
print("[点单员] 查询结束")
return row[0]
挂起
result = await async_db_query() - 它的标准库(如
time.sleep,sqlite3,requests)默认全是阻塞的!如果用它们,服务员就会傻等。 - 为了实现高并发,Python 引入了
asyncio库(相当于给 Python 装了一个事件循环)。 - 但光有循环没用,你还需要把那些“会傻等”的库(驱动),换成“不会傻等”的库(异步驱动,如
asyncpg,httpx)。
但是有一些老的算法库不支持异步,只能扔线程池。
除了扔到线程池,也可以扔到进程池
线程(Thread)是假的并发。因为有 GIL,同一时刻只能有一个 CPU 核在跑 Python 代码。asyncio 线程负责网络吞吐,ProcessPool 里的子进程负责“啃” CPU,互不干扰。loop.run_in_executor:“异步”调用“同步”
1 | # 最好全局共享一个 ProcessPoolExecutor |
多线程threading(强制抢占式调度)
IO密集
(少量文件读写,等待网络磁盘数据库)
- thread = threading.Thread(target=fn, args=(…))
thread.start()开始,thread.join()等它结束- GIL:同一时间只有一个线程在跑 Python 字节码(CPU 密集时线程不会真正并行),但 I/O 密集时线程切换还是很有用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import threading
import time
def fn(x:str):
里面所有内容都是并发
return
#threads = [threading.Thread(target=inc) for _ in range(4)]
#[t.start() for t in threads]
#[t.join() for t in threads]
tasks = []
for i in range(10):
t =threading.Thread(target=fn,args=(i,))
t.start()
tasks.append(t)
for t in tasks:
task.join()
print()
因为线程里的所有内容都在并发,所以要注重竞态
保护共享变量 :Lock加锁 在写前面加with lock:任何“读-改-写”的共享数据结构(全局计数器、缓存字典、统计信息等)
发送数据 :Queue消息队列管道.生产者和多个并行消费者从管道里存取东西.从“很多线程围着同一块内存改啊改”的混战,降级成了“把一份具体工作(消息)打包给一个明确负责人”。get还有天然的阻塞等待.并且可以有多个Queue来给消息分类.
- 消息传递:比如全局变量(例如
counter)的所有权被转移给了某一个线程,其他工作线程不能直接改这个变量,它们只能把“我要+1”这种意图作为消息丢到队列里。同时不同线程只处理自己感兴趣的任务img_q = queue.Queue() ||task = img_q.get() - 数据传递:保证同一条数据只被一个线程拿到
- 任务调度:每个任务只会被一个线程消费,多线程自动分工,无需手动加锁task_q = queue.Queue()||task_q.put(task)||task = task_q.get()
- 停机:发送停机信号(每个线程一个)for _ in range(2):q.put(None)
- 异步解耦:生产者(比如网络监听、GUI事件)不能被阻塞,它只要把数据扔进队列就可以继续工作。不用担心传给谁,收没收到 .
- 消息传递:比如全局变量(例如
发信号控制状态 :Event发送开始停止.
stop_event = threading.Event()
stop_event.set()
while not stop_event.is_set():
async可以放部分函数到线程池,那如果线程想要调async那应该怎么做
同步调用异步asyncio.run_coroutine_threadsafe
1 | # 假设在一个线程 B 中(比如 Flask 的 worker 线程) |
线程也有线程池concurrent.futures.ThreadPoolExecutor
1 | from concurrent.futures import ThreadPoolExecutor |
锁和信号量
with Lock
with RLock 可重入锁,当你的类中,一个加了锁的方法 A,内部又调用了另一个也加了锁的方法 B。如果用 Lock,B 就会死锁;用 RLock 就没问题
with threading.Semaphore(5)
1 | api_limit_sem = threading.Semaphore(5) |
进程 multiprocessing
cpu密集
(例如压缩、加密、解析 JSON 巨文件、机器学习推理)
Python 的 GIL 限制了同进程多线程的并行执行 CPU-bound 代码
进程池
1 | import multiprocessing |
pool = multiprocessing.Pool(processes = 3)提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。否则等待某个进程结束
pool.map 会把任务分散到多个进程真正并行执行。使进程阻塞直到结果返回.
也可以看出pool是一批一批的,任务导向的,不能实时控制一个长期循环的线程
注意:if __name__ == “__main__“:
python myscript.py运行脚本时,__name__的值是"__main__",所以里面的代码会执行。- 如果别的文件
import myscript,那myscript.__name__就是"myscript",这个if里面的代码就不会跑。 - 因为python在Windows/macOS上跑的时候启动子进程不用fork用spawn,子进程并不会“接着父进程的执行点往下跑”,它其实是- 启动一个全新的 Python 解释器进程(干净的)
import你的主脚本文件(比如myscript.py)。 运行这个模块的顶层代码(也就是文件最外层的所有语句)。根据父进程告诉它的“target 函数”去执行实际任务。然后就会无限运行没有main包裹的p = Process(…)创建进程.
with multiprocessing.Pool() as pool: 是一种**上下文管理器 (Context Manager)**。- 进入 with 块时: Python 会调用 Pool 对象的 __enter__ 方法,这个方法启动了进程池并返回 pool 对象,让你可以在 with 块内部使用它。 退出 with 块时 (无论正常结束还是发生异常): Python 会自动调用 Pool 对象的 __exit__ 方法。
所以那两行代码等价于
1 | pool = multiprocessing.Pool() |
进程间通信multiprocessing.Queue
1 | import multiprocessing |
没有用线程池.因为这个任务是while的常驻的需要持续读取数据监控当时状态的.
Pool更像:“帮我把这堆活做完并把结果还给我”- 手工开
Process更像:“我要一群常驻工人,主进程会源源不断地给你们推事件,你们就一直干,直到我告诉你们停”
通信是有代价的,所以“每个小步骤都发消息”会拖垮性能。进程适合“少量大块任务”,而不是“每毫秒都同步状态”。
共享状态multiprocessing.Manager
- 线程可以共享内存(在同一进程里)
- 进程不能直接共享内存(是隔离的地址空间)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import multiprocessing
import time
import os
def worker(shared_dict, lock):
pid = os.getpid()
for _ in range(5):
time.sleep(0.1)
with lock:
shared_dict["count"] = shared_dict.get("count", 0) + 1
print(f"{pid} done")
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared = manager.dict()
lock = manager.Lock()
procs = [multiprocessing.Process(target=worker, args=(shared, lock)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print("final shared dict:", shared)
必须dict,就像把一个变量发送到云端,给各个进程客户端读取.但是要注意也要加锁..所有进程必须用同一把锁,不然根本起不到互斥。所以使用manager.lock
1 | data = {} # 你以为共享,实际上不是 |
multiprocessing.Lock() 也能创建跨进程锁,但它跟 manager.Lock() 的传播方式、生命周期、兼容方式不太一样.
主要是manager.lock也是个代理对象,风格统一,而且也可以放到某个结构体里.manager 产生的东西(包括 manager.dict(), manager.Lock())本质上就是“可序列化的代理”,天生就是为 spawn 模型准备的——它们知道如何跨进程远程调用。
multiprocessing.Lock()
→ 这是一把“操作系统级的跨进程锁的句柄”。
→ 你手里拿着的是一个直接的同步原语。
→ 适合你手动管理、直接传给子进程。manager.Lock()
→ 这是一把“放在共享服务中心里的锁”。
→ 你拿到的是一个“远程遥控器(proxy)”,每次上锁解锁都去找那个 manager 服务协调。
→ 适合你已经把所有共享状态都托管进 manager,希望所有东西都用同一种“远程代理”语义来共享。
manager非常慢,只适合共享少量、低频更新的“状态信息”(比如一个配置字典、一个计数器)。永远只向子进程传递简单的、可序列化的数据(字符串、数字、列表、字典)。子进程自己去初始化它需要的资源(比如数据库连接)。减少“每秒很多小消息”,而是“批量发一坨,大块处理,再一次性回结果”。
高性能共享:multiprocessing.shared_memory
绕过了序列化,直接操作同一块物理内存。这很复杂(需要手动管理生命周期和锁),但性能是极致的。
profiling(找瓶颈,决定用哪种并发)
1 | import cProfile |
- 用 cProfile + pstats 定位你的瓶颈到底是 I/O 还是 CPU
1
2
3
4
5
6
7import time
t0 = time.perf_counter()
# ... run something ...
t1 = time.perf_counter()
print(f"took {t1 - t0:.4f} sec") - 用 time.perf_counter() 做微基准
“我要不要调大线程池/进程池/并发数/队列大小”常用的指标
- 当前队列长度(说明下游跟不跟得上)
- 平均任务处理耗时 / p95 耗时
- 每秒任务吞吐量
- 错误率、超时率
- 当前活跃 worker 数
示例:在你的 worker loop 里周期性打印 / 上报这些东西到日志或 Prometheus。这是容量规划的核心。
决策速查
- 这是 I/O 密集任务,有成百上千个并发?
- 首选 asyncio
- 如果有老的阻塞库,wrap 成 asyncio.to_thread
- 这是 I/O 密集任务,但数量不大(几十个以内),而且都是同步库?
- threading.Thread / ThreadPoolExecutor 很够用,简单直接
- 这是 CPU 密集任务,要吃满多核?
- multiprocessing.Pool / Process
- 我需要多个消费者一起处理任务队列
- 线程间用 queue.Queue
- 进程间用 multiprocessing.Queue
- 用 None 作为优雅退出信号
- 我担心数据竞争?
- 单进程多线程:用 threading.Lock
- 多进程:尽量别共享状态。如果必须共享,用 multiprocessing.Manager() + Lock
- 性能不行,我怎么知道瓶颈在哪?
- 用 cProfile / perf_counter 证据说话
- 不要猜
“高并发”不是指你能发很多请求,而是你能承受下游变慢、不稳定、半挂、雪崩。
必须给每个 I/O 操作加这些东西:
- 超时(timeout)
- 有限重试(retry with backoff)
- 熔断/降级(不继续打坏掉的下游,而是短路)
总结(这一段建议你背下来):
并发不是只管“跑更快”,并发是“把不同类型的等待/计算隔离开,让它们互不拖死”。
任何生产者-消费者结构都必须考虑背压(队列上限)。即限制队列大小,让生产者也被迫慢下来。
q = queue.Queue(maxsize=1000) # 限制容量
等队列满了就 等待
不做这个,线上某个慢下游(例如数据库写慢了)会把你的内存干穿。所有 I/O 都必须有 timeout + 重试策略 + 熔断/降级(不继续打坏掉的下游,而是短路)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import asyncio
import aiohttp
from random import random
async def safe_fetch(url):
for attempt in range(3): # 最多3次
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as resp:
if resp.status == 200:
return await resp.text()
else:
# 非200,不一定要重试,这取决于业务
raise RuntimeError(f"bad status {resp.status}")
except Exception as e:
await asyncio.sleep(0.1 * (2 ** attempt) + random() * 0.05)
return None # 或者返回fallbackasyncio 的并发不是无限并发:用
Semaphore管住,避免自杀。不然会- 打爆对方 API, 打爆数据库连接池,打爆你自己本机 CPU/带宽1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import asyncio
import aiohttp
sem = asyncio.Semaphore(50) # 最多50个并发
async def bounded_fetch(session, url):
async with sem:
async with session.get(url, timeout=5) as resp:
return await resp.text()
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(bounded_fetch(session, u)) for u in urls]
return await asyncio.gather(*tasks)
asyncio.run(main(urls))gather 默认是“有一个挂了全军覆没”,记得
return_exceptions=True。1
2
3
4
5
6results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
# 局部错误处理
...不要用多进程频繁互传巨对象,尽量让进程本地热身后常驻干活.还有共享内存的复用,尽量不要多线程,要把危险对象包进一个“worker单线程”,而所有对它的操作都变成“往队列里发送指令”。
指标埋点和优雅停机,是能不能上生产环境的底线。
架构模式
1 | import asyncio |


