MCP server development

This page provides a minimal Python/TypeScript implementation of an MCP tool (tool registration + input schema + return format), comparison code for API key vs Bearer token authentication, the standard format for three return states (success/error/pending), and a complete tool JSON Schema design example.

Each tool needs a stable name, explicit parameter schema, and machine-parseable errors; avoid huge payloads—use resource URIs or pagination when needed. Declare capabilities at startup; version upgrades stay backward compatible or document breaking changes.

Observability: log tool call id, latency, and failure reason; never echo secrets or full private user fields.

  • Side-effecting tools: require idempotency keys or explicit confirmation to avoid accidental production changes.
  • Align with host permissions: files, network, subprocess—least privilege.
  • Integration checklist: sample requests, edge parameters, timeout and cancellation behavior.

Minimal MCP tool implementation

A complete MCP tool registration example (Python, using the mcp SDK), including tool registration, input schema validation, and three standard return formats:

# Python — minimal MCP server using the official mcp SDK
# pip install mcp

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, CallToolResult
import json

app = Server("my-mcp-server")

# 1. Register tool list (tools/list response)
@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="search_docs",
            description="Search internal docs by keyword; returns titles and summaries. Read-only; does not modify any data.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "minLength": 1,
                        "maxLength": 200,
                        "description": "Search keyword; do not include SQL or path-traversal characters"
                    },
                    "limit": {
                        "type": "integer",
                        "minimum": 1,
                        "maximum": 20,
                        "default": 5,
                        "description": "Maximum number of results to return"
                    },
                    "category": {
                        "type": "string",
                        "enum": ["api", "guide", "faq", "all"],
                        "default": "all",
                        "description": "Filter by document category"
                    }
                },
                "required": ["query"],
                "additionalProperties": False
            }
        )
    ]

# 2. Implement tool handler (tools/call response)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> CallToolResult:
    if name == "search_docs":
        return await handle_search_docs(arguments)
    return CallToolResult(
        content=[TextContent(type="text", text=json.dumps({
            "status": "error",
            "error": {"code": "UNKNOWN_TOOL", "message": f"Unknown tool: {name}"}
        }))],
        isError=True
    )

async def handle_search_docs(args: dict) -> CallToolResult:
    query = args["query"]
    limit = args.get("limit", 5)
    category = args.get("category", "all")

    # Server-side re-validation (do not trust model-supplied parameters)
    if len(query) > 200:
        return CallToolResult(
            content=[TextContent(type="text", text=json.dumps({
                "status": "error",
                "error": {"code": "VALIDATION_ERROR", "message": "query exceeds 200-character limit"}
            }))],
            isError=True
        )

    try:
        results = await search_engine.query(query, limit=limit, category=category)
        # Success return format
        return CallToolResult(
            content=[TextContent(type="text", text=json.dumps({
                "status": "success",
                "data": {"results": results, "total": len(results), "query": query}
            }))]
        )
    except TimeoutError:
        # Retryable error
        return CallToolResult(
            content=[TextContent(type="text", text=json.dumps({
                "status": "error",
                "error": {"code": "TIMEOUT", "message": "Search timed out; retry later", "retryable": True}
            }))],
            isError=True
        )

# 3. Start stdio transport
async def main():
    async with stdio_server() as streams:
        await app.run(*streams)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

TypeScript equivalent (using @modelcontextprotocol/sdk):

// TypeScript — minimal MCP tool implementation
// npm install @modelcontextprotocol/sdk

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server({ name: "my-mcp-server", version: "1.0.0" }, {
  capabilities: { tools: {} },
});

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [{
    name: "search_docs",
    description: "Search internal docs by keyword; read-only",
    inputSchema: {
      type: "object",
      properties: {
        query: { type: "string", minLength: 1, maxLength: 200 },
        limit: { type: "integer", minimum: 1, maximum: 20, default: 5 }
      },
      required: ["query"],
    },
  }],
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "search_docs") {
    const { query, limit = 5 } = request.params.arguments as { query: string; limit?: number };
    const results = await searchDocs(query, limit);
    return {
      content: [{ type: "text", text: JSON.stringify({ status: "success", data: results }) }],
    };
  }
  throw new Error(`Unknown tool: ${request.params.name}`);
});

const transport = new StdioServerTransport();
await server.connect(transport);

Transport (stdio / remote)

stdio: child process talks to the host on the same machine—good for CLI and desktop agents; mind working directory, inherited env vars, and keep stdout strictly for protocol lines (logs go to stderr or a dedicated channel) to avoid corrupting the JSON-RPC stream.

HTTP / SSE / streamable HTTP: suited to remote deploys and multiple clients; need TLS, observable shutdown and backpressure, and URLs that match session/routing. For any transport, define timeouts, max message size, and concurrency limits, and document how the host starts and tears down the process.

Authentication: API key vs Bearer token implementation

# Method 1: API key in header (suitable for internal services, simple scenarios)
# Client request header: X-API-Key: sk-xxxx
# Server-side validation (FastAPI example, HTTP/SSE transport)

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import hmac, os

API_KEY = os.environ["MCP_API_KEY"]  # read from env var; never hardcode

class ApiKeyMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        key = request.headers.get("X-API-Key", "")
        # Use hmac.compare_digest to prevent timing attacks
        if not hmac.compare_digest(key.encode(), API_KEY.encode()):
            raise HTTPException(status_code=401, detail={
                "status": "error",
                "error": {"code": "UNAUTHORIZED", "message": "Invalid API key"}
            })
        return await call_next(request)


# Method 2: Bearer token (OAuth 2.0; suitable for multi-tenant, short-lived authorization)
# Client request header: Authorization: Bearer eyJhbGc...

import jwt  # pip install PyJWT

JWT_SECRET = os.environ["JWT_SECRET"]
JWT_ALGORITHM = "HS256"

def verify_bearer_token(authorization: str) -> dict:
    """Parse and validate a JWT Bearer token; return payload."""
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail={
            "status": "error",
            "error": {"code": "INVALID_AUTH_SCHEME", "message": "Bearer token required"}
        })
    token = authorization[7:]
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        return payload  # contains sub (user ID), exp (expiry), scope, etc.
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail={
            "status": "error",
            "error": {"code": "TOKEN_EXPIRED", "message": "Token expired; please refresh"}
        })
    except jwt.InvalidTokenError as e:
        raise HTTPException(status_code=401, detail={
            "status": "error",
            "error": {"code": "INVALID_TOKEN", "message": str(e)}
        })

# Use token payload to isolate tenant data inside tool handler
async def handle_search_docs_with_auth(args: dict, token_payload: dict) -> dict:
    tenant_id = token_payload["sub"]        # extract tenant ID from token
    scope = token_payload.get("scope", "")  # check permission scope
    if "docs:read" not in scope:
        raise HTTPException(status_code=403, detail={
            "status": "error",
            "error": {"code": "FORBIDDEN", "message": "Missing docs:read permission"}
        })
    # Filter query by tenant_id; never use a global unscoped query
    return await search_engine.query(args["query"], tenant_id=tenant_id)

Tool JSON Schema design and return formats

A complete JSON Schema should include parameter types, required fields, enum, description, and format constraints:

{
  "name": "create_ticket",
  "description": "Create a support ticket. Write operation; produces a database record. Confirm the environment parameter before calling.",
  "inputSchema": {
    "type": "object",
    "additionalProperties": false,
    "required": ["title", "priority", "environment"],
    "properties": {
      "title": {
        "type": "string",
        "minLength": 5,
        "maxLength": 200,
        "description": "Ticket title, 5-200 characters; no HTML tags"
      },
      "description": {
        "type": "string",
        "maxLength": 5000,
        "description": "Detailed description (optional); Markdown supported"
      },
      "priority": {
        "type": "string",
        "enum": ["low", "medium", "high", "critical"],
        "description": "Priority level. 'critical' triggers on-call notification"
      },
      "environment": {
        "type": "string",
        "enum": ["staging", "production"],
        "description": "Target environment. Production tickets require human confirmation"
      },
      "assignee_id": {
        "type": ["string", "null"],
        "pattern": "^usr_[a-z0-9]{8}$",
        "default": null,
        "description": "Assignee ID, format usr_xxxxxxxx; null means unassigned"
      },
      "due_date": {
        "type": ["string", "null"],
        "format": "date",
        "description": "Due date in ISO 8601 format (YYYY-MM-DD)"
      },
      "idempotency_key": {
        "type": "string",
        "minLength": 16,
        "maxLength": 64,
        "description": "Client-generated idempotency key; same key will not create duplicate tickets"
      }
    }
  }
}

Three standard return states (success / error / pending) with complete formats:

# success: operation complete; data contains the result
{
  "status": "success",
  "data": {
    "ticket_id": "tkt_a1b2c3d4",
    "url": "https://app.example.com/tickets/tkt_a1b2c3d4",
    "created_at": "2026-04-11T10:30:00Z"
  }
}

# error: operation failed; Agent decides whether to retry based on retryable
{
  "status": "error",
  "error": {
    "code": "VALIDATION_ERROR",       # machine-readable error code
    "message": "due_date format invalid; expected YYYY-MM-DD",  # human-readable
    "fields": ["due_date"],            # fields with problems (for validation errors)
    "retryable": false                 # false = non-retryable (parameter must be fixed)
  }
}

# error (retryable): temporary failure; Agent may back off and retry
{
  "status": "error",
  "error": {
    "code": "UPSTREAM_TIMEOUT",
    "message": "Downstream service timed out; retry after 5 seconds",
    "retryable": true,
    "retry_after_ms": 5000
  }
}

# pending: async operation accepted; poll or wait for callback
{
  "status": "pending",
  "data": {
    "job_id": "job_x9y8z7",
    "poll_url": "/api/jobs/job_x9y8z7",
    "estimated_seconds": 30
  }
}

MCP request flow

Typical message order between host and server (method names depend on your SDK and protocol version). Core idea: capability negotiation first, tool listing next, execution and resources after.

  [ Host starts MCP server process or opens remote connection ]
        │
        ▼
  ┌─────────────────┐     Exchange protocol version, capabilities, serverInfo;
  │  initialize      │     agree on features (tools, resources, sampling, …)
  └─────────────────┘
        │
        ▼
  ┌─────────────────┐     Return prompts / resources / tools subsets
  │  Capability      │     (may be one or many messages per impl)
  │  discovery       │
  └─────────────────┘
        │
        ▼
  ┌─────────────────┐     When the model needs a tool, host sends name + validated args
  │  tools/call      │──── Server: validate inputSchema → run business logic → result or error
  └─────────────────┘
        │
        ▼
  ┌─────────────────┐     Optional: read resources, fetch prompt templates, progress/cancel
  │  Resources /     │
  │  extensions      │
  └─────────────────┘
        │
        ▼
  [ Close connection or exit process; clean up handles and temp credentials ]

Tool name sanitizer preview

Tool names must match between the manifest and tools/call. This preview lowercases input, maps non-[a-z0-9_] to underscores and collapses runs; if the result starts with a digit it prefixes t_. Teaching example only—validate against team rules and host limits before release.

Sanitized result


              

---
name: mcp-server-dev
description: Design or implement MCP server tools and resources; input: tool requirements description; output: complete tool definition + handler code; prohibit: hardcoding credentials or logging sensitive data
version: "1.1.0"
triggers:
  - "implement.*MCP|develop.*MCP server"
  - "add tool.*agent|tool.*registration"
steps:
  1. Register tools with @app.list_tools(); each tool needs name/description/inputSchema
  2. Include required, enum, minLength/maxLength, additionalProperties: false in inputSchema
  3. Re-validate parameters inside handler using assert or pydantic (do not trust model input)
  4. Standardize return format as {status, data} or {status, error:{code,message,retryable}}
  5. Distinguish three states: success / error (non-retryable) / error (retryable + retry_after_ms)
  6. Async operations return pending + job_id + poll_url
  7. Auth: read secrets from env vars; use hmac.compare_digest to prevent timing attacks
  8. Multi-tenant: extract tenant_id from JWT payload; add tenant filter to all queries
  9. Logging: record tool_name/call_id/latency/status; never log full request payload
  10. Large results: return resource URI for responses > 10KB; do not inline in tool response
  11. Tool names use snake_case, verb-first (search_/create_/delete_/get_)
  12. Write operations must have idempotency_key param; server-side dedup check required
  13. High-risk tools (delete_/bulk_) add environment param (staging/production)
  14. State in tools/list description whether the tool is read-only and idempotent
  15. CI-validate tool definitions with yamllint and JSON Schema validator
constraints:
  - Do NOT echo API keys, connection strings, or user passwords in tool results
  - Do NOT execute model-supplied strings directly in handlers (prevents SQL/command injection)
  - Tool name must be exactly identical in list and call (case-sensitive)

Back to skills More skills