FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!

张开发
2026/4/6 11:05:27 15 分钟阅读

分享文章

FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!
1. 一个让人抓狂的“小问题”之前有个项目需要在上传用户头像后异步生成几种不同尺寸的缩略图并把处理结果和状态存到MySQL同时把用户ID和任务ID塞到Redis里做状态追踪。一切看起来都很美好代码也跑通了。但上线后噩梦开始了应用跑了一两天就开始随机报错有的说MySQL连接已关闭有的说Redis连接数超限。最离谱的是有时候任务执行到一半数据库连接突然断开了导致部分数据写入失败状态成了“薛定谔的完成”。后来debug了好几天才发现问题的根源我把数据库和Redis的会话Session/Connection直接传递到了异步任务函数里但生命周期完全错乱了 2. 问题到底出在哪FastAPI的BackgroundTasks虽然用起来简单但它本质上是在响应返回之后在同一个进程中“偷偷”执行的一个函数。问题是你在请求生命周期内创建的数据库会话比如通过依赖项注入的db: Session在请求结束后通常会被框架自动关闭。但你的后台任务还在用这个已经被关闭的会话不出错才怪Redis连接也是类似如果你把连接池里“借”出来的连接直接传进去一旦主请求结束连接被归还或关闭后台任务再用的时候就会直接GG。这里我要特别强调一点千万不要在异步任务里直接复用请求生命周期内的资源对象这是新手最容易踩的坑也是我当初血泪教训的核心。⚙️ 3. 核心原理各管各的生命周期要分离那正确的姿势是什么呢核心思想就是“谁用谁创建用完自己关”。异步任务函数内部不应该依赖外部传递进来的“活”连接而是应该拥有自己独立的资源管理逻辑。具体来说我们需要在异步任务函数内部重新创建所需的资源比如新的数据库Session新的Redis连接并在任务执行完毕后确保这些资源被正确关闭或归还。这听起来像是个体力活但其实我们可以通过一些好的代码组织模式让它变得优雅且可维护。️ 4. 实战生产级别的组织方案好理论说完了咱们直接上代码。 第一步目录结构project/ ├── app/ │ ├── api/ # 路由层 │ ├── core/ # 核心配置数据库、Redis等 │ │ ├── database.py │ │ └── redis_client.py │ ├── models/ # 数据库模型 │ ├── schemas/ # Pydantic模型 │ └── tasks/ # 异步任务模块✨ │ ├── __init__.py │ ├── user_tasks.py │ └── worker.py # 任务入口 └── ... 第二步核心资源管理重点在tasks/worker.py里我们定义一个基类或辅助函数专门负责在每个任务中初始化和管理这些资源。# tasks/worker.py from sqlalchemy.orm import sessionmaker from app.core.database import engine from app.core.redis_client import get_redis_client from contextlib import contextmanager # 注意这里是在模块级别创建SessionLocal它是一个工厂不是具体的会话 SessionLocal sessionmaker(autocommitFalse, autoflushFalse, bindengine) contextmanager def get_db_session(): 每个任务自己独立创建一个数据库会话用完即关 db SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close() contextmanager def get_redis_conn(): 每个任务自己独立创建一个Redis连接用完即关 # 这里假设你的redis_client是连接池这个函数返回一个连接实例 redis_client get_redis_client() try: yield redis_client finally: # 注意如果用的是连接池归还连接的操作通常是在你调close()时内部处理的 # 这里只是示意具体看你的Redis库实现 redis_client.close()看到没这里的关键就是get_db_session和get_redis_conn这两个上下文管理器。它们确保了每一个独立的异步任务都拥有一个属于自己的、生命周期完整的资源。 第三步编写具体的异步任务现在我们可以在tasks/user_tasks.py里编写具体的业务逻辑了。# tasks/user_tasks.py from celery import Task from app.tasks.worker import get_db_session, get_redis_conn from app.models.user import User import asyncio # 如果任务里需要异步IO class ProcessAvatarTask(Task): name process_avatar def run(self, user_id: int, image_path: str): # 这里我们自己来创建和管理资源 with get_db_session() as db: user db.query(User).filter(User.id user_id).first() # 1. 更新数据库状态 user.avatar_status processing db.commit() # 2. 处理图片耗时操作可以是同步的也可以跑在异步线程池 # 这里为了简单用同步模拟 # ... # 3. 再次更新状态并写入Redis with get_redis_conn() as redis: redis.set(fuser:{user_id}:avatar:status, completed) with get_db_session() as db: user.avatar_status completed db.commit() return {status: success, user_id: user_id} 第四步在API路由中调用# api/user.py from fastapi import APIRouter, BackgroundTasks from app.tasks.user_tasks import ProcessAvatarTask router APIRouter() router.post(/upload_avatar) async def upload_avatar(user_id: int, background_tasks: BackgroundTasks): # 这里不要传数据库session或redis连接进去 # 只传必要的业务数据比如user_id和文件路径 background_tasks.add_task(ProcessAvatarTask().run, user_id, /tmp/avatar.jpg) return {msg: 任务已添加} 5. 进阶思考与踩坑预警-关于连接池大小别以为资源独立了就万事大吉。如果任务并发太高每个任务都独立创建一个数据库连接很容易把连接池撑爆。所以你的数据库连接池比如SQLAlchemy的pool_size要设置得合理一些比如pool_size20, max_overflow10然后根据你的任务并发量去调整。-关于Redis连接池上面的get_redis_conn最好是返回一个从连接池中获取的连接而不是每次都新建一个TCP连接。这点很重要-关于错误重试

更多文章