Context window management

This page provides three practical recipes: a token budget allocation table for different models (recommended share for system prompt/tools/history/RAG), a Python rolling-window compression implementation (keep latest N messages + periodic summary injection), and the trigger logic for compression (fires when token usage exceeds 70% of the cap).

Token budget allocation table—recommended share per component for different models:

# Token budget allocation table (recommended ratios; adjust per product)
# Model                  Cap      Sys prompt  Tool output  History  RAG
# GPT-4o               128k         10%         30%         40%     20%
# Claude 3.5 Sonnet    200k          8%         25%         45%     22%
# Gemini 1.5 Pro      1000k          5%         20%         55%     20%
# GPT-3.5-turbo         16k         15%         35%         35%     15%
#
# Hard rules: never trim system prompt; trim oldest messages first when history overflows;
# RAG count calculated dynamically from remaining space (recommended max 512 tokens/chunk)

Context pipeline and trigger logic

Data from multiple sources feeds one budget pool: define priority and metering first, then assemble for the model; compression and trimming sit after metering and before assembly.

  [ User message / multimodal attachments ]
              │
              ▼
        [ System prompt + SKILL rules ] ──────┐
              │                         │
              ▼                         ▼
        [ RAG / memory retrieval ] ───► [ Priority queue & dedupe ]
              │                         │
              ▼                         │
        [ Raw tool output ] ──► [ Summarize / structured compression ] ──┐
              │                         │           │
              ▼                         ▼           ▼
        [ Conversation history scroll ] ──► [ Token metering & budget check ]
                                          │
                                          ▼
                                   [ Assemble window → model ]

Compression trigger logic—fires when token usage exceeds 70% of the cap:

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    enc = tiktoken.encoding_for_model(model)
    return len(enc.encode(text))

def should_compress(
    messages: list[dict],
    model_limit: int,
    threshold: float = 0.70,
    model: str = "gpt-4o",
) -> bool:
    """Return True when history message token share exceeds model_limit * threshold."""
    history_tokens = sum(
        count_tokens(m["content"], model)
        for m in messages
        if m["role"] != "system"
    )
    return history_tokens / model_limit > threshold

# Usage example
messages = [
    {"role": "system", "content": "You are a code review assistant."},
    {"role": "user", "content": "Please review the following PR... (long text)"},
]
if should_compress(messages, model_limit=128_000):
    messages = rolling_compress(messages, keep_recent=6)
  • Any pipeline step logs a one-line summary of discarded content for audit and follow-up turns.
  • Align RAG chunk count and per-chunk max characters with the model window so retrieval does not fill the pool first.

Compression and summarization

Rolling window compression: keep the latest keep_recent messages verbatim; call an LLM to generate a three-bullet summary (decided / open / user hard constraints) from older history and inject it.

from openai import OpenAI

client = OpenAI()

def rolling_compress(
    messages: list[dict],
    keep_recent: int = 6,
    model_limit: int = 128_000,
    summary_model: str = "gpt-4o-mini",
) -> list[dict]:
    """
    Rolling window compression strategy:
      - system messages are never deleted
      - keep the latest keep_recent user/assistant messages verbatim
      - generate a three-bullet summary from older messages and inject it
    """
    system_msgs = [m for m in messages if m["role"] == "system"]
    history = [m for m in messages if m["role"] != "system"]

    if not should_compress(history, model_limit):
        return messages  # below threshold, return as-is

    older = history[:-keep_recent] if len(history) > keep_recent else []
    recent = history[-keep_recent:]

    if not older:
        return messages  # not enough history to compress

    conversation_text = "\n".join(
        f"[{m['role'].upper()}] {m['content']}" for m in older
    )

    resp = client.chat.completions.create(
        model=summary_model,
        messages=[{
            "role": "system",
            "content": (
                "Compress the following conversation into three bullet points, "
                "preserving all decision-relevant details:\n"
                "• Decided: (list confirmed decisions and conclusions)\n"
                "• Open: (list unresolved issues)\n"
                "• User hard constraints: (list conditions the user explicitly said cannot change)"
            ),
        }, {
            "role": "user",
            "content": conversation_text,
        }],
        max_tokens=400,
        temperature=0,
    )
    summary_text = resp.choices[0].message.content

    summary_msg = {
        "role": "assistant",
        "content": f"[Conversation history summary ({len(older)} messages omitted)]\n{summary_text}",
    }
    return system_msgs + [summary_msg] + recent


def compress_tool_output(tool_result: dict, max_tokens: int = 1000) -> str:
    """When tool output exceeds max_tokens, extract top-level key paths + stats instead."""
    import json
    raw = json.dumps(tool_result, ensure_ascii=False)
    if count_tokens(raw) <= max_tokens:
        return raw
    summary_lines = []
    for k, v in tool_result.items():
        if isinstance(v, list):
            summary_lines.append(f"{k}: [list, {len(v)} items]")
        elif isinstance(v, dict):
            summary_lines.append(f"{k}: {{object, {len(v)} keys}}")
        else:
            summary_lines.append(f"{k}: {str(v)[:80]}")
    return "[Tool output compressed]\n" + "\n".join(summary_lines)
Debugging: log token share by source (system / tools / history / RAG) and fix the largest inflator first. Log number of dropped messages and summary generation latency.

Prioritization and retention

Trim order (last trimmed to first trimmed): system prompt never dropped > recent messages > older history > RAG docs. The code below implements this priority-based pruning:

def priority_prune(
    system_prompt: str,
    recent_messages: list[dict],
    older_messages: list[dict],
    retrieved_docs: list[str],
    model_limit: int = 128_000,
    reserved_output: int = 2000,
) -> dict:
    """
    Priority-based pruning (P0 trimmed last, P3 trimmed first):
      P0: system_prompt        → never dropped; raise exception if over budget
      P1: recent_messages      → trimmed last (keep newest first)
      P2: older_messages       → trimmed second
      P3: retrieved_docs       → trimmed first (add in relevance order)
    """
    budget = model_limit - reserved_output
    used = 0
    result = []
    dropped = {"older": 0, "docs": 0}

    # P0: system prompt must fit; raise on overflow
    sys_tokens = count_tokens(system_prompt)
    assert sys_tokens <= budget, f"System prompt exceeds budget: {sys_tokens} tokens"
    used += sys_tokens
    result.append({"role": "system", "content": system_prompt})

    # P1: keep recent messages from newest to oldest
    recent_kept = []
    for msg in reversed(recent_messages):
        t = count_tokens(msg["content"])
        if used + t <= budget:
            recent_kept.insert(0, msg)
            used += t
        else:
            break

    # P2: fill remaining budget with older messages
    older_kept = []
    for msg in reversed(older_messages):
        t = count_tokens(msg["content"])
        if used + t <= budget:
            older_kept.insert(0, msg)
            used += t
        else:
            dropped["older"] += 1

    # P3: fill remaining space with RAG docs
    docs_kept = []
    for doc in retrieved_docs:
        t = count_tokens(doc)
        if used + t <= budget:
            docs_kept.append({"role": "user", "content": f"[Reference document]\n{doc}"})
            used += t
        else:
            dropped["docs"] += 1

    result += older_kept + recent_kept + docs_kept
    return {"messages": result, "tokens_used": used, "budget": budget, "dropped": dropped}
  • Tooling: merge duplicate query results to avoid noise amplification loops.
  • Never push the same file in full multiple times—use a file summary + line range instead of full text.

Budget and ordering demo

Purely front-end: drag the slider to simulate a changing total context cap; bars scale at fixed ratios; use buttons to adjust retention priority order (does not affect a real model—helps align team language).

128k (illustrative)

    ↑ Raise retention priority (trimmed later); ↓ Lower retention priority (trimmed earlier). Use alongside the written strategy above.

    ---
    name: context-window-budget
    description: Manage context budget for dialogue and tool results; prevent critical instructions from being evicted; applies when multi-turn messages exceed 20 turns or token usage exceeds 70%
    triggers:
      - Multi-turn conversation exceeds 20 messages
      - Token usage exceeds 70% of model limit
      - Single tool output exceeds 1000 tokens
    steps:
      1. Use tiktoken.encoding_for_model() to count tokens for each component
      2. Call rolling_compress() when history token share exceeds 70%
      3. Place system message at messages[0]; use assert to ensure it never exceeds budget
      4. Keep latest 6 user/assistant messages verbatim (keep_recent=6)
      5. Call gpt-4o-mini to generate three-bullet summary for older messages (decided/open/constraints)
      6. Insert summary with "[Conversation history summary]" prefix at the start of history
      7. Sort RAG docs by relevance score descending; fill remaining budget sequentially
      8. Truncate RAG chunks exceeding 512 tokens server-side with "[truncated]" annotation
      9. Call compress_tool_output() to extract key paths for tool output exceeding 1000 tokens
      10. Keep only the latest result for repeated tool calls; replace older ones with "[merged into latest]"
      11. Log per turn: token count per component, dropped message count, summary latency
      12. Compare total tokens before/after compression; verify compression ratio >= 30%
    constraints:
      - Raise AssertionError when system prompt exceeds budget; do NOT silently truncate
      - Fall back to latest 6 verbatim messages if summary generation fails; do not interrupt main flow
      - Do NOT log full user text; log only token counts and SHA256-truncated hashes

    Back to skills More skills