Message queues
Helps agents spell out idempotency keys, dedupe windows, and partition ordering under at-least-once delivery, and document retries, DLQ, and observability in one contract.
Version message bodies and include trace / correlation ids; avoid non-serializable handles or huge blobs in the queue—use object-storage references when needed.
Consumers must declare max concurrency, prefetch, visibility timeout, and poison handling; retry/backoff policy and DLQ human workflow must be executable.
With transactional outbox or CDC integration, the SKILL should state when a message is considered committed and how to recover from dual-write failures.
- Ordering: same entity id as partition key; when cross-entity order is not guaranteed, say so explicitly.
- Idempotency: business key + dedupe store or natural primary-key conflict handling.
- Backpressure: queue depth alerts tied to consumer scale-out/degradation playbooks.
At-least-once and idempotency
Most managed queues default to at-least-once: the same message may be delivered multiple times (network jitter, consumer crash before ack, partition rebalance, etc.). A SKILL must assume the handler can run again—use idempotency keys, dedupe tables, or natural PK conflicts—not a fantasy of exactly-once end-to-end.
In the contract, name the idempotency field, dedupe window (aligned with business SLA), and semantics on duplicate arrival (ignore / merge / fail to DLQ).
# Idempotency key design: Redis dedup + DB unique key double protection (Python)
import redis, json
from datetime import timedelta
r = redis.Redis(decode_responses=True)
DEDUP_WINDOW = timedelta(hours=24) # dedup window aligned with business SLA
def process_order_event(message: dict) -> bool:
"""
Message shape: { "idempotency_key": "evt-uuid-abc", "order_id": 123, ... }
Returns True=processed successfully, False=duplicate ignored
"""
idem_key = message.get("idempotency_key")
if not idem_key:
raise ValueError("missing idempotency_key")
redis_key = f"dedup:order_event:{idem_key}"
# 1. Redis SETNX: set on first arrival in window; already exists means duplicate
is_new = r.set(redis_key, "1", ex=int(DEDUP_WINDOW.total_seconds()), nx=True)
if not is_new:
# Duplicate message: ignore and ack, do not send to DLQ
return False
try:
# 2. DB unique key as fallback (secondary protection after Redis restart or key expiry)
db_insert_processed_event(
idempotency_key=idem_key,
payload=json.dumps(message)
)
# 3. Actual business processing
execute_order_business_logic(message)
return True
except UniqueConstraintError:
# DB unique key conflict: also a duplicate, ignore
return False
except Exception:
# Processing failed: delete Redis key so message can be retried
r.delete(redis_key)
raise
# DB unique key DDL (prevents duplicates after Redis key expiry)
# CREATE TABLE processed_events (
# idempotency_key VARCHAR(128) PRIMARY KEY,
# created_at TIMESTAMP DEFAULT NOW()
# );
# -- Periodically clean up expired records (keep only within window)
# DELETE FROM processed_events WHERE created_at < NOW() - INTERVAL '24 hours';
Retries, backoff, and DLQ
Use bounded retries with exponential backoff (plus jitter) for transient faults to avoid thundering herds and hot partitions; permanent errors (schema mismatch, invariant violation) should land in a DLQ (dead-letter queue) quickly with original headers, failure code, and attempt count for human or offline repair and replay.
- Define maxAttempts, backoff bounds, and which exceptions retry vs go straight to DLQ.
- DLQ consume permissions, alert routing, replay approval (who may requeue under what conditions).
- Poison message: isolate a repeatedly failing message so it does not block the whole consumer group.
# Exponential backoff with jitter (Python)
import time, random, math
def exponential_backoff_with_jitter(
attempt: int,
base_ms: int = 100,
max_ms: int = 30_000,
jitter_factor: float = 0.3,
) -> float:
"""
Full Jitter strategy: sleep = random(0, min(cap, base * 2^attempt))
Prevents thundering herd when multiple consumers retry simultaneously
"""
cap = min(max_ms, base_ms * (2 ** attempt))
jitter = random.uniform(0, cap * jitter_factor)
delay_ms = cap + jitter
return delay_ms / 1000 # return seconds
def process_with_retry(message: dict, handler, max_attempts: int = 5):
RETRYABLE = (ConnectionError, TimeoutError)
NON_RETRYABLE = (ValueError, KeyError) # schema errors go straight to DLQ
for attempt in range(max_attempts):
try:
handler(message)
return # success
except NON_RETRYABLE as e:
# Permanent error: send to DLQ immediately, no retry
send_to_dlq(message, error=str(e), attempt=attempt)
return
except RETRYABLE as e:
if attempt == max_attempts - 1:
send_to_dlq(message, error=str(e), attempt=attempt)
return
delay = exponential_backoff_with_jitter(attempt)
time.sleep(delay)
# Standard DLQ message format
def build_dlq_message(original: dict, error: str, attempt: int) -> dict:
return {
"dlq_metadata": {
"original_topic": original.get("_topic", "unknown"),
"original_partition": original.get("_partition"),
"original_offset": original.get("_offset"),
"failed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"attempt_count": attempt + 1,
"failure_reason": error,
"idempotency_key": original.get("idempotency_key"),
},
"original_message": original, # full original message for replay
}
# Kafka vs RabbitMQ vs SQS selection guide
# +-------------------+--------------+--------------+--------------+
# | Dimension | Kafka | RabbitMQ | AWS SQS |
# +-------------------+--------------+--------------+--------------+
# | Delivery semantics| at-least-once| at-least-once| at-least-once|
# | | (exactly-once| (configurable) |
# | | needs txns) | | |
# | Ordering | per-partition| per-queue | none (FIFO |
# | | | | queue only) |
# | Retention | durable, | deleted on | deleted on |
# | | replayable | ack | ack |
# | Throughput | very high | high | high |
# | | (millions/s) | (10k+/s) | (elastic) |
# | Latency | milliseconds | sub-ms | milliseconds |
# | Consumer model | Consumer | Push/Pull | Long polling |
# | | Group Pull | | |
# | Ops complexity | high (needs | medium | low (managed)|
# | | ZK/KRaft) | | |
# | Best for | event | task queues | simple task |
# | | sourcing, | RPC-over-MQ | queues, |
# | | log streaming| | stateless |
# +-------------------+--------------+--------------+--------------+
# Selection rules:
# - Need message replay / event sourcing → Kafka
# - Need flexible routing (exchange/binding) → RabbitMQ
# - Fully managed, AWS ecosystem, simple use case → SQS
# Concurrency formula: target queue latency / single message processing time = needed concurrency
# Example: target 2s latency, 200ms per message → concurrency = 2000ms / 200ms = 10
Topics, payloads, and observability
Document topic/queue names, partition keys, and consumer group ids in the SKILL; headers carry version, trace, and tenant (if applicable). Monitor depth, age, DLQ rate, and consumer lag, wired to scale-out or degradation playbooks.
Consumers declare concurrency, prefetch, visibility/lock timeout; align timeouts with downstream DB or API so you do not get “message visible again while business still thinks it is processing—ambiguity.
Outbox and commit boundary
Writing an outbox row in the same DB transaction as business data, then relaying to MQ, narrows the window between “DB committed but message not sent—and the reverse. With CDC, document ordering, initial snapshot, and schema-change policy. In the SKILL, state when the business considers an event “published.—
Processing path sketch
Map the following path to your stack’s ack/nack, visibility timeout, or delay-queue terminology so docs and code use one vocabulary.
[ Producer / Outbox relay ]
│
▼
[ Topic / queue ]
│
▼
[ Consumer ]
│
┌────┴────┐
▼ ▼
Success Failure
│ │
ack / commit retries < max?
│ │ yes ──► backoff redelivery (visibility / delay)
│ │ no ──► [ DLQ ]
│ │ │
│ │ └──► alert / human fix / controlled replay
▼
(at-least-once: success path may still redeliver → rely on idempotency)
---
name: message-queue-usage
description: Define topic, idempotency key, retry backoff, and DLQ contract
---
# Idempotency design
Idempotency key field: idempotency_key (UUID, generated by producer, carried in message header)
Redis SETNX dedup window = business SLA (typically 24h)
DB unique key fallback (PRIMARY KEY on processed_events.idempotency_key)
Duplicate arrival semantics: ignore and ack, do not send to DLQ
# Retry backoff (Full Jitter)
sleep = random(0, min(30000ms, 100ms * 2^attempt))
maxAttempts = 5; retryable: ConnectionError / TimeoutError
non-retryable (straight to DLQ): ValueError / KeyError / schema mismatch
# DLQ message format
dlq_metadata: original_topic, partition, offset, failed_at, attempt_count
dlq_metadata: failure_reason, idempotency_key
original_message: full original message (for replay after fix)
# Tool selection guide
Kafka: event sourcing / log streaming / message replay; per-partition ordering; high throughput
RabbitMQ: task queues / flexible routing (exchange) / RPC-over-MQ
SQS: fully managed / AWS ecosystem / simple use case / no ordering requirement
# Concurrency formula
required concurrency = target queue latency target (ms) / single message processing time (ms)
Example: target 2s latency, processing 200ms each → concurrency = 2000/200 = 10
Implementation checklist
Selections persist in this browser (localStorage) so you can work through the list over multiple sessions.