Multimodal input
This page provides code and usage comparison for two image input methods (base64 vs URL), audio API call format (including format-conversion preprocessing), PDF/CSV-to-text processing code, a GPT-4V image token estimation table (with specific values for various resolutions), and prompt patterns for cross-modal referencing.
GPT-4V image token estimation table (specific token counts for different resolutions):
# GPT-4V / GPT-4o image token estimation (detail="auto" mode)
# Source: OpenAI official docs (2024)
#
# Resolution tiles tokens Notes
# Up to 512x512 (low) 1 85 detail="low", fixed 85 tokens
# 512x512 1 255 detail="high", 1 tile
# 1024x1024 4 765 2x2 tiles
# 1024x2048 8 1445 2x4 tiles
# 2048x2048 16 2833 4x4 tiles (near maximum)
# Maximum (any size) <=16 <=2833 auto-scaled if exceeded
#
# Formula (detail="high"):
# tiles_x = ceil(width / 512)
# tiles_y = ceil(height / 512)
# tiles = min(tiles_x * tiles_y, 16)
# tokens = 85 + 170 * tiles
#
# Example: 1280x720 image
# tiles_x = ceil(1280/512) = 3, tiles_y = ceil(720/512) = 2
# tiles = 6, tokens = 85 + 170*6 = 1105 tokens
def estimate_image_tokens(width: int, height: int, detail: str = "auto") -> int:
if detail == "low":
return 85
# detail="high" or "auto" (auto uses high calculation)
tiles_x = -(-width // 512) # ceil division
tiles_y = -(-height // 512)
tiles = min(tiles_x * tiles_y, 16)
return 85 + 170 * tiles
print(estimate_image_tokens(512, 512)) # 255
print(estimate_image_tokens(1024, 1024)) # 765
print(estimate_image_tokens(1280, 720)) # 1105
Ingestion pipeline
From upload to model-ready content, split responsibilities by stage: block obviously invalid input first, then security and metadata hygiene, then recognition and structuring.
[ Upload / URL fetch ]
│
▼
[ MIME · magic bytes · size cap ]
│
┌────┴────────────┐
▼ ▼
[ Security scan ] [ Metadata scrub ]
(stego/macros/sandbox) (EXIF/sensitive fields)
│ │
└────────┬────────┘
▼
[ Transcode / crop / normalize ]
│
┌────────┴────────┐
▼ ▼
[ OCR / vision tiles ] [ ASR / segments ]
│ │
└────────┬────────┘
▼
[ Unified message schema · token estimate ]
│
▼
[ Model / Agent ]
Image input: base64 vs URL
import base64, httpx
from openai import OpenAI
from pathlib import Path
client = OpenAI()
# === Method 1: base64 encoding ===
# Use when: local files, intranet images, need to guarantee availability, small images (<4MB)
# Pros: no public URL required, image lifecycle is controlled
# Cons: increases request body size (~1.37x), large images may exceed request size limit
def image_to_base64(image_path: str) -> str:
"""Convert a local image to a base64 data URL."""
path = Path(image_path)
# Validate by magic bytes rather than just extension
with open(path, "rb") as f:
header = f.read(12)
mime = detect_mime_from_magic(header) # see implementation below
if mime not in {"image/jpeg", "image/png", "image/gif", "image/webp"}:
raise ValueError(f"Unsupported image format: {mime}")
data = path.read_bytes()
b64 = base64.standard_b64encode(data).decode()
return f"data:{mime};base64,{b64}"
def call_vision_base64(image_path: str, question: str) -> str:
b64_url = image_to_base64(image_path)
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": b64_url, "detail": "high"}},
]
}],
max_tokens=1000,
)
return resp.choices[0].message.content
# === Method 2: pass URL directly ===
# Use when: publicly accessible images, large images (>4MB), CDN media assets
# Pros: small request body, model fetches directly
# Cons: requires public URL; URL expiry causes call failure; model IP must reach origin
def call_vision_url(image_url: str, question: str) -> str:
# HEAD check to verify accessibility before calling (avoid 404 at call time)
r = httpx.head(image_url, timeout=5, follow_redirects=True)
if r.status_code != 200:
raise ValueError(f"Image URL not accessible: {r.status_code} {image_url}")
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": image_url, "detail": "auto"}},
]
}],
max_tokens=1000,
)
return resp.choices[0].message.content
# === Multiple images with numbered captions (cross-modal referencing) ===
def call_vision_multi_image(images: list[dict], question: str) -> str:
"""
images: [{"url": "...", "caption": "Image 1: system architecture diagram"}, ...]
In question, you can reference "top-left of Image 1" or "red box in Image 2".
"""
content = [{"type": "text", "text": question + "\n\nImage captions:\n" +
"\n".join(f"- {img['caption']}" for img in images)}]
for img in images:
content.append({
"type": "image_url",
"image_url": {"url": img["url"], "detail": "high"}
})
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": content}],
max_tokens=2000,
)
return resp.choices[0].message.content
# Usage example: cross-modal referencing
result = call_vision_multi_image(
images=[
{"url": "https://cdn.example.com/arch.png", "caption": "Image 1: microservices architecture"},
{"url": "https://cdn.example.com/error.png", "caption": "Image 2: error log screenshot"},
],
question="What is the relationship between the API Gateway in the top-left of Image 1 and the 504 error in Image 2?"
)
def detect_mime_from_magic(header: bytes) -> str:
"""Detect MIME type from file magic bytes (more reliable than extension)."""
if header[:2] == b"\xff\xd8":
return "image/jpeg"
if header[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
if header[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
if header[:4] == b"RIFF" and header[8:12] == b"WEBP":
return "image/webp"
return "application/octet-stream"
Audio input: format conversion + API call
import subprocess, tempfile, os
from pathlib import Path
from openai import OpenAI
client = OpenAI()
# Step 1: Format conversion preprocessing (using ffmpeg)
# Whisper API accepts: flac, mp3, mp4, mpeg, mpga, m4a, ogg, wav, webm
# Max file size: 25MB
# Recommended format: mp3 (good compression) or wav (lossless, high accuracy)
def convert_audio_for_whisper(
input_path: str,
target_format: str = "mp3",
sample_rate: int = 16000,
channels: int = 1, # mono, reduces file size
) -> str:
"""Convert any audio to Whisper-compatible format. Requires ffmpeg installed."""
output_path = tempfile.mktemp(suffix=f".{target_format}")
cmd = [
"ffmpeg", "-i", input_path,
"-ar", str(sample_rate), # sample rate 16kHz (speech recognition standard)
"-ac", str(channels), # channel count
"-ab", "64k", # bitrate (mp3)
"-y", # overwrite output file
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg conversion failed: {result.stderr}")
return output_path
# Step 2: Check file size (>25MB needs chunking)
MAX_WHISPER_SIZE = 25 * 1024 * 1024 # 25MB
def chunk_audio_if_needed(audio_path: str, chunk_duration_sec: int = 600) -> list[str]:
"""Split oversized audio into fixed-duration chunks (default 10 min/chunk)."""
file_size = os.path.getsize(audio_path)
if file_size <= MAX_WHISPER_SIZE:
return [audio_path]
chunks = []
output_dir = tempfile.mkdtemp()
cmd = [
"ffmpeg", "-i", audio_path,
"-f", "segment",
"-segment_time", str(chunk_duration_sec),
"-c", "copy",
f"{output_dir}/chunk_%04d.mp3"
]
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
chunks = sorted(Path(output_dir).glob("chunk_*.mp3"))
return [str(c) for c in chunks]
# Step 3: Call Whisper API
def transcribe_audio(audio_path: str, language: str = "en") -> dict:
"""
Transcribe audio; return text with timestamps (for cross-modal referencing).
response_format="verbose_json" includes start/end timestamps per segment.
"""
converted = convert_audio_for_whisper(audio_path)
chunks = chunk_audio_if_needed(converted)
segments = []
offset = 0.0
for chunk_path in chunks:
with open(chunk_path, "rb") as f:
resp = client.audio.transcriptions.create(
model="whisper-1",
file=f,
language=language,
response_format="verbose_json", # includes timestamps
timestamp_granularities=["segment"],
)
# Adjust timestamp offsets within each chunk
for seg in resp.segments:
segments.append({
"start": seg.start + offset,
"end": seg.end + offset,
"text": seg.text.strip(),
})
# Estimate chunk duration as offset for next chunk
if chunks.index(chunk_path) < len(chunks) - 1:
offset += 600.0 # 600 seconds per chunk
return {
"full_text": " ".join(s["text"] for s in segments),
"segments": segments, # with timestamps for referencing "content at 2m 15s"
"language": language,
}
# Step 4: Cross-modal referencing — reference audio segments in a prompt
def ask_about_audio_segment(transcript: dict, question: str) -> str:
"""Build a prompt from transcript with timestamps; supports specific time references."""
formatted = "\n".join(
f"[{s['start']:.1f}s - {s['end']:.1f}s] {s['text']}"
for s in transcript["segments"]
)
prompt = f"""The following is an audio transcript with timestamps:
{formatted}
Question: {question}
When referencing audio content in your answer, use the "[X.Xs]" format to indicate the time point."""
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
max_tokens=1000,
)
return resp.choices[0].message.content
Structured payload: PDF/CSV to text
import csv, io
from pathlib import Path
# === PDF to text ===
# pip install pymupdf (fitz) — fast, supports text, image, and table extraction
def pdf_to_text(pdf_path: str, max_pages: int = 50) -> dict:
"""
Convert PDF to text, preserving page numbers for citation ("page 3").
Truncate when exceeding max_pages and annotate.
"""
import fitz # pymupdf
doc = fitz.open(pdf_path)
pages = []
total_chars = 0
MAX_CHARS = 100_000 # ~50k tokens; truncate if exceeded
for page_num in range(min(len(doc), max_pages)):
page = doc[page_num]
text = page.get_text("text").strip()
if not text:
continue # skip blank pages (likely scanned image pages)
pages.append({
"page": page_num + 1,
"text": text,
"char_count": len(text),
})
total_chars += len(text)
if total_chars > MAX_CHARS:
pages[-1]["text"] = pages[-1]["text"][:MAX_CHARS - (total_chars - len(text))]
pages[-1]["truncated"] = True
break
return {
"pages": pages,
"total_pages": len(doc),
"extracted_pages": len(pages),
"truncated": total_chars > MAX_CHARS,
"full_text": "\n\n---\n\n".join(
f"[Page {p['page']}]\n{p['text']}" for p in pages
),
}
# === CSV to text (structured summary) ===
def csv_to_text(
csv_content: str,
max_rows: int = 100,
include_stats: bool = True,
) -> dict:
"""
Convert CSV to text with statistical summary and top N rows.
Do NOT pass full large CSVs; generate summary + sample rows instead.
"""
reader = csv.DictReader(io.StringIO(csv_content))
rows = list(reader)
headers = reader.fieldnames or []
sample_rows = rows[:max_rows]
truncated = len(rows) > max_rows
# Build numeric column statistics
stats = {}
if include_stats:
for col in headers:
values = []
for row in rows:
try:
values.append(float(row[col]))
except (ValueError, TypeError):
pass
if values:
stats[col] = {
"min": min(values),
"max": max(values),
"avg": round(sum(values) / len(values), 2),
"count": len(values),
}
# Format as markdown table (top max_rows rows)
table_text = f"| {' | '.join(headers)} |\n| {' | '.join(['---'] * len(headers))} |\n" + "\n".join(
f"| {' | '.join(str(row.get(h, '')) for h in headers)} |"
for row in sample_rows
)
summary = f"CSV file: {len(rows)} rows x {len(headers)} columns"
if truncated:
summary += f" (showing first {max_rows} rows only)"
if stats:
summary += "\nNumeric column stats:\n" + "\n".join(
f" {col}: min={s['min']}, max={s['max']}, avg={s['avg']}, count={s['count']}"
for col, s in stats.items()
)
return {
"summary": summary,
"table": table_text,
"headers": headers,
"total_rows": len(rows),
"stats": stats,
"full_text": summary + "\n\n" + table_text,
}
# Cross-modal referencing: prompt pattern for referencing "top-left of Image 2"
CROSS_MODAL_PROMPT_TEMPLATE = """
Image information:
{image_captions}
Document content (summary):
{doc_summary}
Question: {question}
Response guidelines:
- Reference image regions using "Image N [location]" (e.g. "top-left of Image 1's flowchart")
- Reference document content using "Document page N" or "Table row N"
- If image and document content corroborate each other, explicitly state the connection
"""
MIME and size estimate
Browser-reported type may be empty (extension guessing is unreliable); server-side magic bytes are authoritative. Pick a local file below to see MIME hints, raw bytes, and approximate Base64-expanded length (rough JSON / data URL sizing).
Local try-it
- MIME (File.type):—
- Raw size:—
- Base64 char count (approx.):—
---
name: multimodal-input-pipeline
description: Design multimodal upload and preprocessing pipeline; input: image/audio/document files; output: normalized content + token estimate + cross-modal reference prompts; prohibit: passing unvalidated MIME or untranscoded raw files
version: "1.1.0"
triggers:
- "process.*image|image.*input|vision.*input"
- "audio.*transcri|PDF.*pars|multimodal.*upload"
steps:
1. Use detect_mime_from_magic() to detect MIME; never rely on file extension alone
2. Downsample with Pillow if image exceeds 20MB or max edge length exceeds 4096px
3. Image method: local/intranet files → base64; public CDN images → URL
4. Before base64: check file size; switch to URL method if > 4MB
5. Use estimate_image_tokens(w, h) to pre-check token count; switch to detail="low" if > 2000 tokens
6. For multiple images, use numbered image_captions list (Image 1/Image 2); reference as "Image N location"
7. Convert audio to mp3 first using ffmpeg (16kHz, 1ch, 64kbps)
8. Use chunk_audio_if_needed() to split audio > 25MB into 10-minute chunks
9. Call Whisper with response_format="verbose_json" to obtain timestamps
10. Extract PDF text with pymupdf; preserve page numbers; truncate at 100k characters with annotation
11. Use csv_to_text(): generate statistical summary + top 100 rows table; do NOT pass full CSV
12. Strip EXIF: remove GPS and device info using Pillow before passing images
13. Use CROSS_MODAL_PROMPT_TEMPLATE for cross-modal reference prompts
constraints:
- Do NOT pass SVG or macro-containing PDFs directly (parse in sandbox first)
- Do NOT log base64 image data (log file hash and size only)
- Audio transcripts must not contain un-redacted PII (run PII filter after transcription)