Python FastAPI 服务

让 Agent 用路由模块、Depends 与 Pydantic v2 模型组织 API,并生成一致的 OpenAPI 与错误体;lifespan 管理连接池与启动清理,异步边界与测试入口写进 SKILL。

本页为 Agent 提供 FastAPI 服务的完整实施参考:依赖注入(数据库连接/认证/权限检查)、Pydantic v2 模型(validators/computed_fields)、异步路由与后台任务、中间件实现(请求日志/CORS/gzip),以及 TestClient 测试用法。

SKILL 写明 APIRouter 拆分规则、全局异常处理器与 HTTPException 使用边界;异步 I/O 选用准则:阻塞库放线程池(run_in_executor);SQLAlchemy 2.0 async session 工厂在 lifespan 中初始化。

  • 安全:OAuth2/JWT Bearer + Depends(get_current_user)
  • 测试:TestClient + dependency_overrides 注入 fake DB。
  • 与 API 契约技能联动:FastAPI 自动生成 /openapi.json,导出后做 schema diff。

Pydantic v2 模型(validators 与 computed_fields)

# models/item.py — Pydantic v2 完整模型定义
from pydantic import BaseModel, Field, field_validator, computed_field, model_validator
from decimal import Decimal
from typing import Optional
import re

class ItemCreate(BaseModel):
    name: str = Field(min_length=1, max_length=200, description="商品名称")
    price: Decimal = Field(gt=0, decimal_places=2, description="价格(正数)")
    sku: str = Field(description="库存单位,格式 XXX-000")
    discount_pct: Optional[float] = Field(default=None, ge=0, le=100)

    @field_validator('sku')
    @classmethod
    def validate_sku(cls, v: str) -> str:
        if not re.match(r'^[A-Z]{3}-\d{3}$', v):
            raise ValueError('SKU must match pattern XXX-000 (e.g. WGT-001)')
        return v

    @model_validator(mode='after')
    def check_discount_requires_price(self) -> 'ItemCreate':
        if self.discount_pct is not None and self.price < 10:
            raise ValueError('Discount only available for items priced ≥ 10')
        return self

class ItemResponse(ItemCreate):
    id: str
    created_at: str

    @computed_field  # type: ignore[misc]
    @property
    def final_price(self) -> Decimal:
        if self.discount_pct:
            return self.price * Decimal(1 - self.discount_pct / 100)
        return self.price

请求主流程(Middleware → Depends → 端点)

下列为单次 HTTP 调用在 FastAPI 中的典型解析顺序;具体中间件栈与异常处理器以项目注册顺序为准。重点是:依赖在端点入参前解析完毕,且子依赖可嵌套并支持缓存(同一请求内同一 Depends 可复用结果)。

  [ 客户端 HTTP 请求 ]
        │
        ▼
  ┌─────────────┐     CORS、可信代理、request_id、超时(若自定义)
  │  Middleware  │
  └─────────────┘
        │
        ▼
  ┌─────────────┐     匹配 method + path;聚合各层 APIRouter prefix
  │   路由匹配   │
  └─────────────┘
        │
        ▼
  ┌─────────────┐     按签名顺序执行 Depends(含子依赖);可 yield 清理
  │  Depends 链  │──── 常见:Settings、DB session、CurrentUser、权限
  └─────────────┘
        │
        ▼
  ┌─────────────┐     Query/Path/Body → Pydantic;返回 response_model
  │   端点函数   │──── 未捕获异常 → exception_handler / HTTPException
  └─────────────┘
        │
        ▼
  [ JSON / HTML / Stream / File / Redirect ]

注册顺序:后添加的中间件更靠近端点;全局 exception_handler 应覆盖业务异常类型并统一错误体字段,避免在 handler 内再次抛出未映射类型。

依赖注入:数据库连接 / 认证 / 权限

# dependencies/database.py — DB session 依赖
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from typing import AsyncGenerator, Annotated
from fastapi import Depends

engine = create_async_engine(settings.DATABASE_URL, pool_size=10, max_overflow=5)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# 类型别名(推荐 Annotated 写法)
DbSession = Annotated[AsyncSession, Depends(get_db)]

# dependencies/auth.py — JWT 认证依赖
from fastapi import HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

bearer_scheme = HTTPBearer()

async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)],
    db: DbSession,
) -> User:
    try:
        payload = jwt.decode(credentials.credentials, settings.JWT_SECRET, algorithms=['HS256'])
        user = await UserRepo(db).get_by_id(payload['sub'])
        if not user:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
        return user
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail='Token expired')
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail='Invalid token')

# 权限检查依赖(工厂函数)
def require_role(*roles: str):
    async def check(current_user: Annotated[User, Depends(get_current_user)]):
        if current_user.role not in roles:
            raise HTTPException(status_code=403, detail='Insufficient permissions')
        return current_user
    return check

CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser   = Annotated[User, Depends(require_role('admin'))]

# 路由使用
@router.delete('/{item_id}')
async def delete_item(item_id: str, db: DbSession, _: AdminUser):
    ...

异步路由、后台任务与 TestClient

异步路由 + 后台任务

# routers/orders.py — 异步路由 + BackgroundTasks
from fastapi import APIRouter, BackgroundTasks
import asyncio

router = APIRouter(prefix='/orders', tags=['Orders'])

async def send_confirmation_email(order_id: str, email: str):
    """后台任务:不阻塞请求响应"""
    await asyncio.sleep(0)  # 真实场景接入邮件服务
    print(f'Sending confirmation for order {order_id} to {email}')

@router.post('/', response_model=OrderResponse, status_code=201)
async def create_order(
    body: OrderCreate,
    background_tasks: BackgroundTasks,
    db: DbSession,
    current_user: CurrentUser,
):
    order = await OrderService(db).create(body, user_id=current_user.id)
    # 注册后台任务(响应返回后异步执行)
    background_tasks.add_task(send_confirmation_email, str(order.id), current_user.email)
    return order

# 中间件示例:请求日志 + gzip + CORS
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=['*'],
    allow_headers=['*'],
)

# 请求日志中间件
from starlette.middleware.base import BaseHTTPMiddleware
import time, uuid

class RequestLogMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        req_id = str(uuid.uuid4())
        request.state.req_id = req_id
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start
        print(f'{req_id} {request.method} {request.url.path} {response.status_code} {duration:.3f}s')
        response.headers['X-Request-Id'] = req_id
        return response

app.add_middleware(RequestLogMiddleware)

TestClient 测试(含 dependency_overrides)

# tests/test_orders.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock
from app.main import app
from app.dependencies.auth import get_current_user
from app.models.user import User

@pytest.fixture
def mock_user():
    return User(id='test-user-id', email='test@example.com', role='admin')

@pytest.fixture
def client(mock_user: User):
    # 覆盖认证依赖,无需真实 JWT
    app.dependency_overrides[get_current_user] = lambda: mock_user
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()

def test_create_order(client: TestClient):
    resp = client.post('/api/v1/orders', json={
        'items': [{'sku': 'WGT-001', 'quantity': 2}]
    })
    assert resp.status_code == 201
    data = resp.json()
    assert data['userId'] == 'test-user-id'
    assert 'id' in data

def test_create_order_validation_error(client: TestClient):
    resp = client.post('/api/v1/orders', json={'items': []})
    assert resp.status_code == 422
    assert resp.json()['detail'][0]['loc'] == ['body', 'items']
  • 将池与客户端挂到 app.state 或通过依赖工厂读取,保持单一创建点。
  • lifespan 中创建连接池;测试时用 dependency_overrides 注入 in-memory fake。
  • 与 ASGI 服务器(Uvicorn)的 worker 数对齐:每进程一套连接池。

路由前缀规范化预览

团队内常见草稿写法混用反斜杠、缺省前导 / 或多余尾部 /。下方将输入整理为适合 APIRouter(prefix=...)include_router(..., prefix=...) 的字符串:单一前导斜杠、合并重复斜杠、去掉末尾斜杠(根路径保留为 /。规则为教学用约定,发布前以网关与 OpenAPI 基路径为准。

规范化结果与粘贴片段


              

---
name: python-fastapi-service
description: 用 FastAPI 依赖注入与 Pydantic 实现 REST API
---
# 规则
- APIRouter 按功能域拆分(routers/items.py, routers/orders.py)
- 依赖注入:DB session 用 yield 依赖;认证用 Annotated[User, Depends(get_current_user)]
- Pydantic v2:field_validator + model_validator + computed_field
- 异步路由:所有 I/O 用 async def;阻塞库用 asyncio.to_thread() 或 run_in_executor
- 后台任务:轻量任务用 BackgroundTasks;耗时任务用 Celery/RQ
- 错误处理:已知错误 raise HTTPException;全局异常用 @app.exception_handler
- 测试:TestClient + dependency_overrides 覆盖 DB/认证依赖

# 步骤
1. 在 lifespan 中初始化 DB 引擎和连接池(避免 import 时创建)
2. 实现 get_db() yield 依赖 + get_current_user() + require_role() 工厂
3. 定义 Pydantic 模型:CreateSchema / ResponseSchema / UpdateSchema 分开
4. 路由中使用 Annotated 类型别名(DbSession, CurrentUser, AdminUser)
5. 后台任务用 BackgroundTasks.add_task();不得在端点 await 邮件/通知
6. 中间件:GZip + CORS + RequestLogMiddleware(注册顺序最后添加最先执行)
7. 测试:pytest fixture 覆盖 dependency_overrides,断言 status_code + 响应体

返回技能库 更多技能入口