Context window management
This page provides three practical recipes: a token budget allocation table for different models (recommended share for system prompt/tools/history/RAG), a Python rolling-window compression implementation (keep latest N messages + periodic summary injection), and the trigger logic for compression (fires when token usage exceeds 70% of the cap).
Token budget allocation table—recommended share per component for different models:
# Token budget allocation table (recommended ratios; adjust per product)
# Model Cap Sys prompt Tool output History RAG
# GPT-4o 128k 10% 30% 40% 20%
# Claude 3.5 Sonnet 200k 8% 25% 45% 22%
# Gemini 1.5 Pro 1000k 5% 20% 55% 20%
# GPT-3.5-turbo 16k 15% 35% 35% 15%
#
# Hard rules: never trim system prompt; trim oldest messages first when history overflows;
# RAG count calculated dynamically from remaining space (recommended max 512 tokens/chunk)
Context pipeline and trigger logic
Data from multiple sources feeds one budget pool: define priority and metering first, then assemble for the model; compression and trimming sit after metering and before assembly.
[ User message / multimodal attachments ]
│
▼
[ System prompt + SKILL rules ] ──────┐
│ │
▼ ▼
[ RAG / memory retrieval ] ───► [ Priority queue & dedupe ]
│ │
▼ │
[ Raw tool output ] ──► [ Summarize / structured compression ] ──┐
│ │ │
▼ ▼ ▼
[ Conversation history scroll ] ──► [ Token metering & budget check ]
│
▼
[ Assemble window → model ]
Compression trigger logic—fires when token usage exceeds 70% of the cap:
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def should_compress(
messages: list[dict],
model_limit: int,
threshold: float = 0.70,
model: str = "gpt-4o",
) -> bool:
"""Return True when history message token share exceeds model_limit * threshold."""
history_tokens = sum(
count_tokens(m["content"], model)
for m in messages
if m["role"] != "system"
)
return history_tokens / model_limit > threshold
# Usage example
messages = [
{"role": "system", "content": "You are a code review assistant."},
{"role": "user", "content": "Please review the following PR... (long text)"},
]
if should_compress(messages, model_limit=128_000):
messages = rolling_compress(messages, keep_recent=6)
- Any pipeline step logs a one-line summary of discarded content for audit and follow-up turns.
- Align RAG chunk count and per-chunk max characters with the model window so retrieval does not fill the pool first.
Compression and summarization
Rolling window compression: keep the latest keep_recent messages verbatim; call an LLM to generate a three-bullet summary (decided / open / user hard constraints) from older history and inject it.
from openai import OpenAI
client = OpenAI()
def rolling_compress(
messages: list[dict],
keep_recent: int = 6,
model_limit: int = 128_000,
summary_model: str = "gpt-4o-mini",
) -> list[dict]:
"""
Rolling window compression strategy:
- system messages are never deleted
- keep the latest keep_recent user/assistant messages verbatim
- generate a three-bullet summary from older messages and inject it
"""
system_msgs = [m for m in messages if m["role"] == "system"]
history = [m for m in messages if m["role"] != "system"]
if not should_compress(history, model_limit):
return messages # below threshold, return as-is
older = history[:-keep_recent] if len(history) > keep_recent else []
recent = history[-keep_recent:]
if not older:
return messages # not enough history to compress
conversation_text = "\n".join(
f"[{m['role'].upper()}] {m['content']}" for m in older
)
resp = client.chat.completions.create(
model=summary_model,
messages=[{
"role": "system",
"content": (
"Compress the following conversation into three bullet points, "
"preserving all decision-relevant details:\n"
"• Decided: (list confirmed decisions and conclusions)\n"
"• Open: (list unresolved issues)\n"
"• User hard constraints: (list conditions the user explicitly said cannot change)"
),
}, {
"role": "user",
"content": conversation_text,
}],
max_tokens=400,
temperature=0,
)
summary_text = resp.choices[0].message.content
summary_msg = {
"role": "assistant",
"content": f"[Conversation history summary ({len(older)} messages omitted)]\n{summary_text}",
}
return system_msgs + [summary_msg] + recent
def compress_tool_output(tool_result: dict, max_tokens: int = 1000) -> str:
"""When tool output exceeds max_tokens, extract top-level key paths + stats instead."""
import json
raw = json.dumps(tool_result, ensure_ascii=False)
if count_tokens(raw) <= max_tokens:
return raw
summary_lines = []
for k, v in tool_result.items():
if isinstance(v, list):
summary_lines.append(f"{k}: [list, {len(v)} items]")
elif isinstance(v, dict):
summary_lines.append(f"{k}: {{object, {len(v)} keys}}")
else:
summary_lines.append(f"{k}: {str(v)[:80]}")
return "[Tool output compressed]\n" + "\n".join(summary_lines)
Prioritization and retention
Trim order (last trimmed to first trimmed): system prompt never dropped > recent messages > older history > RAG docs. The code below implements this priority-based pruning:
def priority_prune(
system_prompt: str,
recent_messages: list[dict],
older_messages: list[dict],
retrieved_docs: list[str],
model_limit: int = 128_000,
reserved_output: int = 2000,
) -> dict:
"""
Priority-based pruning (P0 trimmed last, P3 trimmed first):
P0: system_prompt → never dropped; raise exception if over budget
P1: recent_messages → trimmed last (keep newest first)
P2: older_messages → trimmed second
P3: retrieved_docs → trimmed first (add in relevance order)
"""
budget = model_limit - reserved_output
used = 0
result = []
dropped = {"older": 0, "docs": 0}
# P0: system prompt must fit; raise on overflow
sys_tokens = count_tokens(system_prompt)
assert sys_tokens <= budget, f"System prompt exceeds budget: {sys_tokens} tokens"
used += sys_tokens
result.append({"role": "system", "content": system_prompt})
# P1: keep recent messages from newest to oldest
recent_kept = []
for msg in reversed(recent_messages):
t = count_tokens(msg["content"])
if used + t <= budget:
recent_kept.insert(0, msg)
used += t
else:
break
# P2: fill remaining budget with older messages
older_kept = []
for msg in reversed(older_messages):
t = count_tokens(msg["content"])
if used + t <= budget:
older_kept.insert(0, msg)
used += t
else:
dropped["older"] += 1
# P3: fill remaining space with RAG docs
docs_kept = []
for doc in retrieved_docs:
t = count_tokens(doc)
if used + t <= budget:
docs_kept.append({"role": "user", "content": f"[Reference document]\n{doc}"})
used += t
else:
dropped["docs"] += 1
result += older_kept + recent_kept + docs_kept
return {"messages": result, "tokens_used": used, "budget": budget, "dropped": dropped}
- Tooling: merge duplicate query results to avoid noise amplification loops.
- Never push the same file in full multiple times—use a file summary + line range instead of full text.
Budget and ordering demo
Purely front-end: drag the slider to simulate a changing total context cap; bars scale at fixed ratios; use buttons to adjust retention priority order (does not affect a real model—helps align team language).
↑ Raise retention priority (trimmed later); ↓ Lower retention priority (trimmed earlier). Use alongside the written strategy above.
---
name: context-window-budget
description: Manage context budget for dialogue and tool results; prevent critical instructions from being evicted; applies when multi-turn messages exceed 20 turns or token usage exceeds 70%
triggers:
- Multi-turn conversation exceeds 20 messages
- Token usage exceeds 70% of model limit
- Single tool output exceeds 1000 tokens
steps:
1. Use tiktoken.encoding_for_model() to count tokens for each component
2. Call rolling_compress() when history token share exceeds 70%
3. Place system message at messages[0]; use assert to ensure it never exceeds budget
4. Keep latest 6 user/assistant messages verbatim (keep_recent=6)
5. Call gpt-4o-mini to generate three-bullet summary for older messages (decided/open/constraints)
6. Insert summary with "[Conversation history summary]" prefix at the start of history
7. Sort RAG docs by relevance score descending; fill remaining budget sequentially
8. Truncate RAG chunks exceeding 512 tokens server-side with "[truncated]" annotation
9. Call compress_tool_output() to extract key paths for tool output exceeding 1000 tokens
10. Keep only the latest result for repeated tool calls; replace older ones with "[merged into latest]"
11. Log per turn: token count per component, dropped message count, summary latency
12. Compare total tokens before/after compression; verify compression ratio >= 30%
constraints:
- Raise AssertionError when system prompt exceeds budget; do NOT silently truncate
- Fall back to latest 6 verbatim messages if summary generation fails; do not interrupt main flow
- Do NOT log full user text; log only token counts and SHA256-truncated hashes