数据库迁移
本页给出可直接使用的迁移代码:三步向前兼容模式的 SQL、Flyway/Liquibase/Alembic 迁移文件格式、大表分批处理脚本、down migration 回滚实现,以及在 CI 中验证 up + down 完整流程的方法;帮助 Agent 写出零停机、可回滚、可审计的多阶段迁移计划。
大表变更优先「加列—回填—切读—切写—删旧列」的扩展收缩模式;在 SKILL 中强制列出持锁预期、预估行数与是否在维护窗执行。
每次迁移附带 down 或回滚说明:不仅是反向 SQL,还包括应用版本与特性开关如何配合回退。
与 ORM / migration 工具对齐:文件名、依赖顺序、是否在事务中执行长语句,都应在提示词里写死团队约定。
- 索引:优先 CONCURRENTLY(若引擎支持)或分阶段建索引避免阻塞写入。
- 默认值:大表加 NOT NULL 默认值时分步填充再收紧约束。
- 观测:迁移前后记录行数、采样校验与慢查询告警阈值。
扩展—收缩主流程
[ DDL:加列(可空或有安全默认)]
│
▼
[ 回填:批处理、限速、可重试 job ]
│
▼
[ 切读:双写或读新列;旧列仍保留 ]
│
▼
[ 切写:应用仅写新路径;监控错误率 ]
│
▼
[ 收紧:NOT NULL / 唯一约束 / 删旧列 ]
各阶段在 SKILL 中写清「可单独回滚到哪一步」:例如仅回滚应用读路径而保留新列,避免半状态无文档。
-- ① 第一步(Phase 1):加可空列,不锁表(PostgreSQL)
ALTER TABLE orders ADD COLUMN status_v2 VARCHAR(32) DEFAULT NULL;
-- ② 第二步(Phase 2):分批回填,避免长事务锁行
-- 每批 1000 行,WHERE 过滤未填充行,通过 LIMIT 限速
DO $$
DECLARE
batch_size INT := 1000;
updated INT;
BEGIN
LOOP
UPDATE orders
SET status_v2 = status::VARCHAR -- 旧列映射
WHERE status_v2 IS NULL
LIMIT batch_size;
GET DIAGNOSTICS updated = ROW_COUNT;
EXIT WHEN updated = 0;
PERFORM pg_sleep(0.05); -- 限速:50ms 间隔
END LOOP;
END $$;
-- 进度查询:统计未填充行数
SELECT COUNT(*) AS remaining
FROM orders
WHERE status_v2 IS NULL;
-- ③ 第三步(Phase 3):收紧约束,完成后删旧列
-- 先验证无 NULL(否则 NOT NULL 会报错)
ALTER TABLE orders ALTER COLUMN status_v2 SET NOT NULL;
-- 发布新代码读写 status_v2 后,再执行:
ALTER TABLE orders DROP COLUMN status;
ALTER TABLE orders RENAME COLUMN status_v2 TO status;
-- Flyway SQL 迁移文件:V20240315_01__add_order_status_v2.sql
-- (文件名格式:V{version}__{description}.sql)
-- up migration
ALTER TABLE orders ADD COLUMN IF NOT EXISTS status_v2 VARCHAR(32);
-- Liquibase YAML 格式示例(db/changelog/2024/03/add_status_v2.yaml)
-- databaseChangeLog:
-- - changeSet:
-- id: "2024031501"
-- author: "dev"
-- changes:
-- - addColumn:
-- tableName: orders
-- columns:
-- - column:
-- name: status_v2
-- type: VARCHAR(32)
-- rollback:
-- - dropColumn:
-- tableName: orders
-- columnName: status_v2
# Alembic 迁移文件(Python / SQLAlchemy)
# alembic/versions/2024031501_add_order_status_v2.py
from alembic import op
import sqlalchemy as sa
revision = '2024031501'
down_revision = '2024030101'
def upgrade():
op.add_column('orders',
sa.Column('status_v2', sa.String(32), nullable=True))
def downgrade():
op.drop_column('orders', 'status_v2')
回滚流水线
[ 触发:错误率、约束失败、业务熔断 ]
│
▼
[ 应用:关 feature flag / 回滚部署 ]
│
▼
[ 数据:down migration 或可逆 DDL 步骤 ]
│
▼
[ 验证:行数、采样、只读探针 / 影子读 ]
│
▼
[ 记录:事故单、是否需手工修补或重跑回填 ]
down 脚本须与 up 对称测试;若某步不可逆(如 DROP 无备份),在计划中显式标为「需快照或延迟执行」并单独审批。
-- Down migration 示例(与 up 对称)
-- 回滚顺序:先恢复旧列,再删新列
-- undo Phase 3:恢复旧列(如果已删则需要快照,此处假设尚未删)
ALTER TABLE orders ADD COLUMN IF NOT EXISTS status VARCHAR(32);
-- 回填旧列(反向映射)
UPDATE orders SET status = status_v2 WHERE status IS NULL;
ALTER TABLE orders ALTER COLUMN status SET NOT NULL;
-- 再删新列
ALTER TABLE orders DROP COLUMN status_v2;
-- CI 验证脚本(bash):测试 up + down 完整往返
#!/usr/bin/env bash
set -euo pipefail
DB_URL="${TEST_DATABASE_URL:-postgres://localhost/test_db}"
echo "=== Running up migration ==="
flyway -url="$DB_URL" migrate
echo "=== Verify row count after up ==="
ROW_COUNT=$(psql "$DB_URL" -tAc "SELECT COUNT(*) FROM orders WHERE status_v2 IS NOT NULL")
echo "Rows backfilled: $ROW_COUNT"
echo "=== Running down migration ==="
flyway -url="$DB_URL" undo
echo "=== Verify rollback ==="
COLUMN_EXISTS=$(psql "$DB_URL" -tAc \
"SELECT COUNT(*) FROM information_schema.columns \
WHERE table_name='orders' AND column_name='status_v2'")
[ "$COLUMN_EXISTS" -eq 0 ] && echo "Rollback verified: column removed" \
|| { echo "FAIL: column still exists"; exit 1; }
锁与维护窗
- 列出预期锁类型(表锁、元数据锁、行锁升级)与预估持锁时间。
- 长事务、大批量 UPDATE 与 DDL 同表并发时的死锁风险写进检查项。
- 低峰或维护窗执行时,注明通知对象与可中止条件。
-- 大表建索引:CONCURRENTLY 避免锁写入(PostgreSQL)
-- 注意:不能在事务块内执行
CREATE INDEX CONCURRENTLY idx_orders_status_v2
ON orders (status_v2)
WHERE status_v2 IS NOT NULL;
-- 验证索引创建状态(is_valid=false 表示并发创建失败需清理重建)
SELECT indexname, indexdef, pg_size_pretty(pg_relation_size(indexname::regclass)) AS size
FROM pg_indexes
WHERE tablename = 'orders' AND indexname = 'idx_orders_status_v2';
-- 检查是否有长事务阻塞迁移(超过 5 分钟)
SELECT pid, now() - xact_start AS duration, query
FROM pg_stat_activity
WHERE state != 'idle'
AND xact_start IS NOT NULL
AND now() - xact_start > interval '5 minutes'
ORDER BY duration DESC;
ORM / migration 约定
- 文件命名、版本号或时间戳顺序、依赖链(不得环形)。
- 是否在单事务中包裹整段迁移;长语句拆分为多文件时的执行顺序。
- 与 CI:迁移是否在合并前对影子库 dry-run,与生产引擎版本对齐。
# flyway.conf — 团队约定示例
flyway.url=jdbc:postgresql://localhost:5432/payments
flyway.locations=filesystem:db/migrations
flyway.validateOnMigrate=true
flyway.outOfOrder=false # 禁止乱序执行
flyway.baselineOnMigrate=false # 新库不自动 baseline
# 文件命名约定:V{YYYYMMDD}{序号}__{描述}.sql
# 例如:V20240315_01__add_order_status_v2.sql
# CI dry-run(仅校验,不执行)
flyway -url=jdbc:postgresql://shadow-db/payments \
-locations=filesystem:db/migrations \
validate
索引、默认值与 NOT NULL
- 大表建索引:CONCURRENTLY / ONLINE 或先建于副本再切换(按引擎能力)。
- 加 NOT NULL:先可空 + 回填 + 校验,再 ALTER 收紧。
- 默认值:避免一步锁全表;分阶段填充与约束生效分开记录。
-- PostgreSQL:安全添加 NOT NULL 列(三步)
-- 错误做法(锁全表重写):
-- ALTER TABLE orders ADD COLUMN priority INT NOT NULL DEFAULT 0;
-- ① 加可空列(瞬间完成,无锁)
ALTER TABLE orders ADD COLUMN priority INT;
-- ② 设置列级默认值(PG 11+ 元数据操作,无锁)
ALTER TABLE orders ALTER COLUMN priority SET DEFAULT 0;
-- ③ 回填存量数据(分批)
UPDATE orders SET priority = 0 WHERE priority IS NULL;
-- ④ 添加 NOT VALID 约束(不扫历史行,仅校验新写入)
ALTER TABLE orders ADD CONSTRAINT orders_priority_notnull
CHECK (priority IS NOT NULL) NOT VALID;
-- ⑤ 异步验证历史行(不阻塞写入)
ALTER TABLE orders VALIDATE CONSTRAINT orders_priority_notnull;
-- ⑥ 最终收紧为真正的 NOT NULL(PG 12+ 若 check constraint 验证通过,此步很快)
ALTER TABLE orders ALTER COLUMN priority SET NOT NULL;
观测与校验
- 迁移前后行数、checksum 或采样对比;关键业务查询的 p95 延迟。
- 慢查询与复制延迟阈值;超阈值时的暂停回填或扩容步骤。
- 审计日志:谁执行、哪条 migration id、是否与发版单关联。
-- 迁移前后行数与抽样校验脚本(PostgreSQL + psql)
-- 迁移前记录基线
SELECT 'pre-migration' AS phase,
COUNT(*) AS total_rows,
COUNT(status) AS old_col_rows,
NOW() AS recorded_at
FROM orders;
-- 迁移后校验
SELECT 'post-migration' AS phase,
COUNT(*) AS total_rows,
COUNT(status_v2) AS new_col_rows,
SUM(CASE WHEN status = status_v2 THEN 1 ELSE 0 END) AS matched,
NOW() AS recorded_at
FROM orders
TABLESAMPLE BERNOULLI(1); -- 1% 抽样,大表快速校验
-- 复制延迟监控(超过 30s 时暂停回填)
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::INT
AS replication_lag_secs;
-- 慢查询检测(迁移期间监控)
SELECT query, calls, mean_exec_time, total_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 1000 -- 超过 1 秒的查询
ORDER BY mean_exec_time DESC
LIMIT 10;
---
name: database-migration-plan
description: 起草零停机、可回滚的分阶段 DB 迁移计划
---
# 向前兼容三步模式
Phase 1: ALTER TABLE ADD COLUMN(可空,无默认锁表)
Phase 2: 分批回填 UPDATE ... LIMIT 1000 WHERE new_col IS NULL + 限速
Phase 3: 验证行数 → NOT NULL 约束 → 切读切写 → DROP 旧列
# 工具文件格式
Flyway: V{YYYYMMDD}{seq}__{desc}.sql(validateOnMigrate=true)
Liquibase: changeSet id + rollback 块对称
Alembic: upgrade() / downgrade() 函数,revision 链完整
# 大表安全策略
索引: CREATE INDEX CONCURRENTLY(不在事务块内)
NOT NULL: NOT VALID CHECK → VALIDATE → SET NOT NULL
分批: pg_sleep 限速 + 复制延迟监控 + 超阈值暂停
# 回滚实现
down migration 与 up 对称测试;不可逆步骤需快照审批
CI 验证: flyway migrate → 行数检查 → flyway undo → 列存在检查
# 观测
迁移前后: COUNT(*) + TABLESAMPLE 1% 抽样对比
慢查询: pg_stat_statements mean_exec_time > 1s 告警
复制延迟: pg_last_xact_replay_timestamp() > 30s 暂停