上下文窗口管理
本页给出三件实用配方:不同模型的 token 预算分配表(系统提示/工具/历史/RAG 推荐占比)、Python 滚动窗口压缩实现(保留最新 N 条 + 定期摘要注入),以及触发压缩的判断逻辑(token 占用超 70% 上限时执行)。
Token 预算分配表——各模型上限与推荐各部分占比:
# Token 预算分配表(推荐比例,可按产品调整)
# 模型 上限 系统提示 工具输出 对话历史 RAG检索
# GPT-4o 128k 10% 30% 40% 20%
# Claude 3.5 Sonnet 200k 8% 25% 45% 22%
# Gemini 1.5 Pro 1000k 5% 20% 55% 20%
# GPT-3.5-turbo 16k 15% 35% 35% 15%
#
# 硬规则:系统提示永不裁剪;历史超限时先裁最旧消息;
# RAG 条数按剩余空间动态计算(每条建议限 512 token)
上下文管线与触发逻辑
数据从多条来源汇入同一预算池:先定优先级与计量,再拼装进模型;压缩与裁剪发生在「计量之后、拼装之前」。
[ 用户消息 / 多模态附件 ]
│
▼
[ 系统提示 + SKILL 规则 ] ──────┐
│ │
▼ ▼
[ RAG / 记忆检索 ] ───► [ 优先级队列 & 去重 ]
│ │
▼ │
[ 工具原始输出 ] ──► [ 摘要 / 结构化压缩 ] ──┐
│ │ │
▼ ▼ ▼
[ 对话历史滚动区 ] ──────► [ Token 计量与预算检查 ]
│
▼
[ 窗口拼装 → 模型 ]
触发摘要注入的判断逻辑——token 占用超过 70% 时触发压缩:
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def should_compress(
messages: list[dict],
model_limit: int,
threshold: float = 0.70,
model: str = "gpt-4o",
) -> bool:
"""历史消息 token 占比超过 model_limit * threshold 时返回 True。"""
history_tokens = sum(
count_tokens(m["content"], model)
for m in messages
if m["role"] != "system"
)
return history_tokens / model_limit > threshold
# 使用示例
messages = [
{"role": "system", "content": "你是代码审查助手。"},
{"role": "user", "content": "请审查以下 PR...(长文本)"},
]
if should_compress(messages, model_limit=128_000):
messages = rolling_compress(messages, keep_recent=6)
- 管线中每一步记录「被丢弃内容的摘要句」,便于审计与续聊。
- RAG 检索条数与每条 max 字符与模型窗口对齐,避免检索层先把池子占满。
压缩与摘要
滚动窗口压缩实现:保留最新 keep_recent 条消息原文,对更旧的历史调用 LLM 生成「已决/未决/用户硬约束」三段摘要并注入。
from openai import OpenAI
client = OpenAI()
def rolling_compress(
messages: list[dict],
keep_recent: int = 6,
model_limit: int = 128_000,
summary_model: str = "gpt-4o-mini",
) -> list[dict]:
"""
滚动窗口压缩策略:
- system 消息永不删除
- 保留最新 keep_recent 条用户/助手消息原文
- 对更旧的消息生成三段 bullet 摘要注入
"""
system_msgs = [m for m in messages if m["role"] == "system"]
history = [m for m in messages if m["role"] != "system"]
if not should_compress(history, model_limit):
return messages # 未超阈值,直接返回
older = history[:-keep_recent] if len(history) > keep_recent else []
recent = history[-keep_recent:]
if not older:
return messages # 历史不够多,不压缩
conversation_text = "\n".join(
f"[{m['role'].upper()}] {m['content']}" for m in older
)
resp = client.chat.completions.create(
model=summary_model,
messages=[{
"role": "system",
"content": (
"将以下对话压缩为三段 bullet,保留所有决策相关细节:\n"
"• 已决事项:(列出已确认的决策和结论)\n"
"• 未决事项:(列出待解决的问题)\n"
"• 用户硬约束:(列出用户明确要求不可更改的条件)"
),
}, {
"role": "user",
"content": conversation_text,
}],
max_tokens=400,
temperature=0,
)
summary_text = resp.choices[0].message.content
summary_msg = {
"role": "assistant",
"content": f"[历史对话摘要(已省略 {len(older)} 条)]\n{summary_text}",
}
return system_msgs + [summary_msg] + recent
def compress_tool_output(tool_result: dict, max_tokens: int = 1000) -> str:
"""工具输出超 max_tokens 时提取顶层键路径+统计值代替全文。"""
import json
raw = json.dumps(tool_result, ensure_ascii=False)
if count_tokens(raw) <= max_tokens:
return raw
summary_lines = []
for k, v in tool_result.items():
if isinstance(v, list):
summary_lines.append(f"{k}: [列表, {len(v)} 项]")
elif isinstance(v, dict):
summary_lines.append(f"{k}: {{对象, {len(v)} 键}}")
else:
summary_lines.append(f"{k}: {str(v)[:80]}")
return "[工具输出已压缩]\n" + "\n".join(summary_lines)
调试:记录各来源 token 占比(系统 / 工具 / 历史 / RAG),优先治占比最大的那一类膨胀。日志中写入被丢弃消息数和摘要生成耗时。
优先级与保留策略
裁剪顺序(从最后裁剪到最先裁剪):系统提示永不删 > 最近消息 > 较旧历史 > RAG 文档。下列代码实现该优先级裁剪逻辑:
def priority_prune(
system_prompt: str,
recent_messages: list[dict],
older_messages: list[dict],
retrieved_docs: list[str],
model_limit: int = 128_000,
reserved_output: int = 2000,
) -> dict:
"""
优先级裁剪(P0 最晚裁,P3 最先裁):
P0: system_prompt → 永不删除,超限则抛出异常
P1: recent_messages → 最后裁剪(从最新往旧保留)
P2: older_messages → 次先裁剪
P3: retrieved_docs → 最先裁剪(按相关性排序后逐条加入)
"""
budget = model_limit - reserved_output
used = 0
result = []
dropped = {"older": 0, "docs": 0}
# P0:系统提示必须进入,超限则报错
sys_tokens = count_tokens(system_prompt)
assert sys_tokens <= budget, f"系统提示超出预算: {sys_tokens} tokens"
used += sys_tokens
result.append({"role": "system", "content": system_prompt})
# P1:从最新往旧保留近期消息
recent_kept = []
for msg in reversed(recent_messages):
t = count_tokens(msg["content"])
if used + t <= budget:
recent_kept.insert(0, msg)
used += t
else:
break
# P2:剩余预算填较旧历史
older_kept = []
for msg in reversed(older_messages):
t = count_tokens(msg["content"])
if used + t <= budget:
older_kept.insert(0, msg)
used += t
else:
dropped["older"] += 1
# P3:最后用剩余空间填 RAG 文档
docs_kept = []
for doc in retrieved_docs:
t = count_tokens(doc)
if used + t <= budget:
docs_kept.append({"role": "user", "content": f"[参考文档]\n{doc}"})
used += t
else:
dropped["docs"] += 1
result += older_kept + recent_kept + docs_kept
return {"messages": result, "tokens_used": used, "budget": budget, "dropped": dropped}
- 工具链:合并重复查询结果,避免噪声循环放大。
- 同一文件禁止多次全文进入上下文——用文件摘要 + 行号范围代替全文。
预算与排序演示
下方为纯前端示意:拖动滑块模拟「总上下文上限」变化时,各块按固定比例伸缩;列表用按钮调整保留优先级顺序(不影响真实模型,仅帮助对齐团队口径)。
↑ 提高保留优先级(更晚被裁);↓ 降低保留优先级(更早被裁)。与上一节文字策略对照使用。
---
name: context-window-budget
description: 管理对话与工具结果的上下文预算,防止关键指令被挤出;适用于多轮对话超 20 条或 token 占用超 70% 的场景
triggers:
- 多轮对话消息数超过 20 条
- token 占用超过模型上限的 70%
- 工具原始输出单次超过 1000 token
steps:
1. 用 tiktoken.encoding_for_model() 计算各部分 token 数
2. 历史 token 占比超 70% 时调用 rolling_compress()
3. system 消息放入 messages[0],用 assert 确保永不超预算
4. 保留最新 6 条用户/助手消息原文(keep_recent=6)
5. 对更旧消息调用 gpt-4o-mini 生成三段摘要(已决/未决/约束)
6. 摘要以 "[历史对话摘要]" 前缀插入历史消息开头
7. RAG 文档按相关性分数降序排列后逐条填入剩余预算
8. 每条 RAG 片段超 512 token 时在服务端截断并注明 "[已截断]"
9. 工具输出超 1000 token 时调用 compress_tool_output() 提取键路径
10. 重复工具调用结果仅保留最新一条,旧的替换为 "[已合并到最新结果]"
11. 每轮记录日志:各部分 token 数、被丢弃消息数、摘要生成耗时
12. 对比压缩前后总 token 数,验证压缩率不低于 30%
constraints:
- 系统提示超出预算时抛出 AssertionError,禁止静默截断
- 摘要生成失败时退回最近 6 条原文,不中断主流程
- 日志禁止记录完整用户原文,只记录 token 数和 SHA256 截断哈希