优化后的当前架构

TD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
User((用户)) -->|1. 搜索请求 (等待)| API[FastAPI Server]

subgraph "FastAPI 进程内 (Asyncio)"
API -->|并发| Search[向量搜索]
API -->|并发| Expand[Query扩展]
Search & Expand -->|合并结果| Candidates[候选论文]
Candidates -->|并发| Rerank[LLM 重排序 (httpx)]
end

Rerank -->|2. 直接返回结果 (2-3s)| User

User -->|3. 生成详细报告 (不等待)| API
API -->|4. 丢入队列| Redis[(Redis)]

subgraph "后台 Worker"
Redis -->|5. 领取任务| Worker[Worker 进程]
Worker -->|6. 下载PDF/精读| Task[耗时任务]
end

[[python并发]]

明显慢的是生成论文实现路径,要怎么提高并发性:await异步+同步操作丢进程池

目前是串行的,论文一下载分析,再2下载分析
直接改并行,下载 PDF、请求 AI 分析)全是 IO 密集型(大部分时间在等网络响应),所以选 asyncio实现并行。
但是requests 下载或本地读文件,这些是同步库,所以会阻塞任务,对数据库的操作也是同步操作
1.将 for paper in papers 循环内的逻辑提取为一个独立的 async 函数,然后
2.把下载pdf这样的同步任务扔到线程池里。
3.用列表推导式生成 tasks 列表,最后用 await asyncio.gather(*tasks) 一起跑。

1
2
3
4
5
6
7
8
9
10
11
        tasks = [
            process_paper(
                pdf_service=pdf_service,
                llm_service=llm_service,
                paper=paper,
                user_requirement=user_requirement,
            max_pages_per_paper=max_pages_per_paper,
            )
            for paper in papers
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

return_exceptions=True这样不会因为一篇论文的 PDF 损坏或下载失败,就导致整个请求崩溃。
4.考虑最大并发量,设置信号量sem=asyncio.Semaphore(5)
async with sem

1
2
3
4
5
async with self.sem: # 并发控制
                content = await self._call_deepseek([
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ])

如果我们在生产环境用 Gunicorn 启动了 4 个 Worker 进程,现在的并发限制是多少?” 你的回答: “是 4 * 5 = 20 个。因为信号量只在当前进程内存中有效。如果 DeepSeek 的全局限流很严格(比如每分钟只能 60 次),多进程部署时可能还是会超限。” 解决方案(口述即可): “如果需要严格的全局限流,我会引入 Redis 来做分布式限流(Token Bucket 算法),而不是依赖单机的内存信号量。”
5.被说httpx太耗费时间

1
2
3
4
5
6
7
8
9
async with httpx.AsyncClient(timeout=60.0) as client:  
response = await client.post(
                self.api_base,
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                },
                json=request_body
            )

建立 HTTP 连接(DNS 解析 + TCP 握手 + SSL 握手)是非常耗时的。如果并发高,你大部分时间都在做握手,而不是传输数据。 修复方案:LLMService 生命周期内复用同一个 client
在init里初始化一个持久化的 client self.client = httpx.AsyncClient( timeout=60.0, headers={“Authorization”: f”Bearer {self.api_key}”} )
再写一个close函数清理资源
async def close(self): await self.client.aclose()

即使没有明显慢,能并发的还是应该尽可能并发,因为多个用户操作的时候,主线程应该尽可能空着。并发是对服务器整体来说的

async里的都应该是极快的操作,不是很快的和不和CPU相关的都丢到线程池,这个不占据其他用户时间;可异步进行的重复操作,可以丢到另一个async操作里并发,然后await gather收集,但是这样还是会占据其他用户CPU操作最慢任务的时间,注意当另一个async操作在进行IO操作的时候,CPU是空闲去服务其他用户的,所以其实其他用户不会等待太多。很慢的CPU操作可以扔进程池

pdf_service 解析 PDF 其实不仅仅是 IO,还有大量的 CPU 计算(解压、渲染文字)。Python 的线程受 GIL 限制,跑在线程池里其实并不能利用多核 CPU。这里是不是用 ProcessPoolExecutor(进程池)更好

也放到Redis里了,但是同时也要放到进程池里

  • Redis + 进程池 = 完美:Redis 负责水平扩展(加机器),进程池负责垂直扩展(吃满 CPU)。当 Worker 遇到 PDF 解析这种硬骨头时,它不自己啃,而是丢给内部的进程池去啃,自己继续去 Redis 接单或处理网络 IO。
  • 实现关键:把解析代码剥离成独立的纯函数(不读库、不联网),通过 run_in_executor(process_pool, ...) 调用。

然后就发现我之前直接把整个处理pdf的函数扔到线程池里并不好。因为一个函数体里既有同步阻塞操作比如下载PDF,又有CPU密集操作比如解析pdf。to_thread虽然解决了下载(I/O)阻塞问题,但解析(CPU)部分会被 GIL(全局解释器锁) 卡死,导致多核 CPU 围观单核干活,性能无法最大化。

修改

需要一个全局的进程池(不要在函数内部反复创建进程池,开销极大),和一个默认的线程池asyncio 自带)。
进程池初始化,然后把CPU密集函数扔进去。要注意的是CPU密集函数必须定义在顶层而且可以序列化,不要依赖任何外部的东西,就一个纯函数。也不要全局变量

不错,其实挺好改的

生成的结果其实并不需要马上知道并导出,可以回头再看

[[任务队列]]
任务耗时极长,很多网关(Nginx)或浏览器默认超时时间是 60秒。如果 API 响应慢,连接会被掐断,前端报错,后端还在白跑。
资源消耗不可控,如果把这些压力放在 API 服务器上,只要并发一高,API 接口就会变慢,连“登录”、“搜索”这种简单请求都会被拖累。
引入 Redis 任务队列(通常配合 CeleryRQ)不仅是“更好”,而且在多用户生产环境中几乎是必须的。

HTTP 连接池未复用(握手风暴)

数据库连接“裸奔”(缺乏连接池)

位置: backend/services/matching_service.py

现状:

1
2
3
4
5
def fetch_papers_from_db(paper_ids: List[str]):
conn = get_db_connection() # 每次都由 OS 分配一个新的文件句柄/TCP连接
cursor = conn.cursor()
# ...
conn.close()

面试官点评: “你虽然把 SQL 查询放到了 run_in_threadpool 里,避免了阻塞主线程,但这治标不治本。

  1. SQLite 锁竞争:SQLite 是文件锁,高并发写入时会报错 database is locked
  2. 连接开销:如果换成 MySQL/PostgreSQL,频繁创建销毁连接会把数据库打挂。”
    优化方案:
  • 短期(MVP):引入 SQLAlchemyQueuePool(连接池),即使是 SQLite 也能管理连接复用。
  • 长期(生产):换掉 SQLite,上 PostgreSQL,并配合 asyncpg 驱动。这是 Python 异步后端的黄金标准。