Skip to content

FR-001: Tenant-Scoped MCP Control

Status: Proposed Priority: High (Core Differentiator) Author: Tim McCrimmon Created: 2026-01-30 Target Version: v0.4


Executive Summary

Extend SCP to provide tenant isolation context and tool schema validation to MCP servers, ensuring that multi-tenant agent deployments cannot leak data across tenant boundaries and that all tool calls conform to registered schemas.

Two layers of control: 1. Tenant Isolation - Data scoping rules injected into every MCP call 2. Schema Validation - Parameter validation against registered tool schemas (admission control)

The model: MCP servers become consumers of SCP context, just like agents. SCP governs both sides of the interaction and validates every tool call before forwarding.


Problem Statement

The Multi-Tenant MCP Risk

Organizations deploying AI agents often have: - Multiple tenants (customers, business units, departments) - Shared infrastructure (databases, MCP servers, tools) - Agents that need tenant-specific data access

Current (uncontrolled):

Tenant A Agent ──→ Shared MCP Server ──→ Multi-Tenant Database
                         ↑
                   No tenant scoping
                   Can access ANY tenant's data

Without tenant isolation: - Tenant A's agent could query Tenant B's patient records - A misconfigured query returns cross-tenant data - One compromised agent exposes all tenants - No audit trail of tenant-scoped access

Real-World Scenarios

Healthcare SaaS: - Multiple hospital systems on shared platform - Each hospital's agents must only see their patients - HIPAA violation if Hospital A sees Hospital B's data

Financial Services: - Multiple client accounts on shared infrastructure - Each client's agents access only their portfolio - Regulatory violation if Client A sees Client B's positions

Enterprise Multi-BU: - Multiple business units sharing AI platform - Each BU's agents access only their data - Competitive intelligence risk if BU data leaks internally


Proposed Solution

Core Concept

SCP becomes the context provider for MCP servers, not just agents. Every MCP tool call includes tenant context that scopes data access.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Tenant A   │     │    SCP      │     │     MCP     │
│   Agent     │     │             │     │   Server    │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                   │
       │ 1. Tool call      │                   │
       │   + agent_id      │                   │
       │   + tenant_id     │                   │
       │──────────────────→│                   │
       │                   │                   │
       │                   │ 2. Resolve tenant │
       │                   │    context from   │
       │                   │    graph/rules    │
       │                   │                   │
       │                   │ 3. Forward call   │
       │                   │   + tenant scope  │
       │                   │──────────────────→│
       │                   │                   │
       │                   │     4. MCP        │
       │                   │     executes with │
       │                   │     tenant filter │
       │                   │                   │
       │                   │ 5. Response       │
       │                   │   (scoped data)   │
       │                   │←──────────────────│
       │                   │                   │
       │                   │ 6. Log + validate │
       │                   │                   │
       │ 7. Response       │                   │
       │   (Tenant A only) │                   │
       │←──────────────────│                   │
       │                   │                   │

Tenant Context Structure

When an MCP call is made, SCP resolves and injects tenant context:

# Tenant context injected into MCP call
scp_tenant_context:
  # Identity
  tenant_id: "tenant_a"
  agent_id: "agent:tenant-a:prior-auth-001"
  agent_role: "role:prior-auth-agent"

  # Data scoping rules
  data_scope:
    # Table-level filters (applied to all queries)
    default_filter: "tenant_id = 'tenant_a'"

    # Table-specific overrides
    tables:
      patients:
        filter: "tenant_id = 'tenant_a'"
        allowed_columns: ["id", "name", "dob", "diagnosis"]
        denied_columns: ["ssn", "full_address"]

      claims:
        filter: "tenant_id = 'tenant_a' AND status != 'internal_review'"
        allowed_columns: ["*"]
        denied_columns: []

      providers:
        filter: "tenant_id = 'tenant_a' OR is_public = true"
        allowed_columns: ["*"]
        denied_columns: ["contract_terms", "negotiated_rates"]

      formulary:
        filter: "tenant_id = 'tenant_a' OR is_standard = true"
        allowed_columns: ["*"]
        denied_columns: []

  # Resource boundaries
  resource_access:
    allowed_tables:
      - patients
      - claims
      - providers
      - formulary
    denied_tables:
      - audit_logs
      - system_config
      - tenant_settings
      - billing_internal

    allowed_operations:
      - SELECT
      - INSERT (claims, auth_requests)
    denied_operations:
      - UPDATE
      - DELETE
      - DROP
      - ALTER

  # Additional constraints
  constraints:
    max_rows_per_query: 1000
    max_queries_per_minute: 100
    require_audit_log: true
    pii_masking: true

MCP Server Integration

MCP servers that integrate with SCP apply the tenant context:

# MCP Server with SCP integration
class SCPAwareMCPServer:

    async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse:
        """
        Handle tool call with SCP tenant scoping.
        """
        # 1. Extract SCP context from request
        tenant_context = request.scp_tenant_context
        if not tenant_context:
            raise SecurityError("Missing tenant context")

        # 2. Validate agent is allowed to call this tool
        if not self.validate_tool_access(request.tool, tenant_context):
            raise SecurityError(f"Tool {request.tool} not allowed for tenant")

        # 3. Build query with tenant scoping
        query = self.build_query(request.tool, request.parameters)
        scoped_query = self.apply_tenant_scope(query, tenant_context)

        # 4. Execute scoped query
        results = await self.execute(scoped_query)

        # 5. Apply column filtering (remove denied columns)
        filtered_results = self.filter_columns(results, tenant_context)

        # 6. Apply PII masking if required
        if tenant_context.constraints.pii_masking:
            filtered_results = self.mask_pii(filtered_results)

        # 7. Log the access
        await self.log_access(request, tenant_context, len(filtered_results))

        return ToolCallResponse(data=filtered_results)

    def apply_tenant_scope(self, query: Query, context: TenantContext) -> Query:
        """
        Inject tenant filter into query.
        """
        table = query.table

        # Get table-specific filter or default
        if table in context.data_scope.tables:
            filter_clause = context.data_scope.tables[table].filter
        else:
            filter_clause = context.data_scope.default_filter

        # Inject filter
        return query.where(filter_clause)

    def filter_columns(self, results: list, context: TenantContext) -> list:
        """
        Remove columns the tenant isn't allowed to see.
        """
        table = results.source_table
        table_config = context.data_scope.tables.get(table)

        if not table_config:
            return results

        denied = set(table_config.denied_columns)
        return [
            {k: v for k, v in row.items() if k not in denied}
            for row in results
        ]

SCP Gateway Component

SCP acts as the intermediary, resolving tenant context and forwarding calls:

# SCP MCP Gateway
class MCPGateway:
    """
    Gateway that injects tenant context into MCP calls.
    """

    async def proxy_tool_call(
        self,
        agent_id: str,
        tenant_id: str,
        mcp_server: str,
        tool: str,
        parameters: dict
    ) -> ToolCallResponse:
        """
        Proxy a tool call through SCP with tenant scoping.
        """
        # 1. Authenticate agent
        agent = await self.agent_registry.get(agent_id)
        if not agent or agent.tenant_id != tenant_id:
            raise AuthenticationError("Agent not authorized for tenant")

        # 2. Verify MCP server is registered
        mcp = await self.mcp_registry.get(mcp_server)
        if not mcp:
            raise SecurityError(f"MCP server {mcp_server} not registered")

        # 3. Verify agent can use this MCP
        if mcp_server not in agent.allowed_mcp_servers:
            raise SecurityError(f"Agent not authorized for MCP {mcp_server}")

        # 4. Resolve tenant context from graph
        tenant_context = await self.resolve_tenant_context(
            tenant_id=tenant_id,
            agent_id=agent_id,
            agent_role=agent.role,
            mcp_server=mcp_server,
            tool=tool
        )

        # 5. Build request with tenant context
        request = ToolCallRequest(
            tool=tool,
            parameters=parameters,
            scp_tenant_context=tenant_context
        )

        # 6. Forward to MCP server
        response = await self.forward_to_mcp(mcp_server, request)

        # 7. Validate response (optional trust-but-verify)
        if self.config.validate_responses:
            violations = self.check_tenant_isolation(response, tenant_id)
            if violations:
                await self.log_isolation_violation(agent_id, tenant_id, violations)
                response = self.filter_violations(response, violations)

        # 8. Log complete transaction
        await self.audit_log.record(
            timestamp=now(),
            agent_id=agent_id,
            tenant_id=tenant_id,
            mcp_server=mcp_server,
            tool=tool,
            parameters=parameters,  # or hash for PII protection
            scope_applied=tenant_context.data_scope.default_filter,
            rows_returned=len(response.data),
            response_validated=self.config.validate_responses
        )

        return response

    async def resolve_tenant_context(
        self,
        tenant_id: str,
        agent_id: str,
        agent_role: str,
        mcp_server: str,
        tool: str
    ) -> TenantContext:
        """
        Query graph to resolve tenant-specific context.
        """
        # Query graph for tenant's data access rules
        query = """
        MATCH (t:Tenant {id: $tenant_id})-[:HAS_POLICY]->(p:DataPolicy)
        MATCH (p)-[:APPLIES_TO]->(table:Table)
        OPTIONAL MATCH (r:Role {id: $role})-[:HAS_OVERRIDE]->(o:PolicyOverride)
        RETURN p, table, o
        """

        results = await self.graph.query(query, {
            "tenant_id": tenant_id,
            "role": agent_role
        })

        # Build tenant context from graph results
        return self.build_tenant_context(tenant_id, agent_id, results)

Database Schema Additions

New tables to support tenant-scoped MCP:

-- Tenant data policies
CREATE TABLE tenant_data_policies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id VARCHAR(255) NOT NULL,
    policy_name VARCHAR(255) NOT NULL,
    description TEXT,

    -- Default scope applied to all queries
    default_filter TEXT NOT NULL,  -- e.g., "tenant_id = '{tenant_id}'"

    -- Global constraints
    max_rows_per_query INTEGER DEFAULT 1000,
    max_queries_per_minute INTEGER DEFAULT 100,
    require_audit_log BOOLEAN DEFAULT true,
    pii_masking_enabled BOOLEAN DEFAULT false,

    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),

    UNIQUE(tenant_id, policy_name)
);

-- Table-specific access rules
CREATE TABLE tenant_table_policies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    policy_id UUID REFERENCES tenant_data_policies(id),
    table_name VARCHAR(255) NOT NULL,

    -- Row filter for this table
    row_filter TEXT NOT NULL,  -- e.g., "tenant_id = 'X' AND status = 'active'"

    -- Column access
    allowed_columns TEXT[],  -- NULL means all allowed
    denied_columns TEXT[] DEFAULT '{}',

    -- Operations allowed
    allowed_operations TEXT[] DEFAULT '{SELECT}',

    created_at TIMESTAMP DEFAULT NOW(),

    UNIQUE(policy_id, table_name)
);

-- MCP server registry
CREATE TABLE mcp_servers (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    server_id VARCHAR(255) UNIQUE NOT NULL,  -- e.g., "mcp:acme:patient-data"
    display_name VARCHAR(255) NOT NULL,
    endpoint_url TEXT NOT NULL,

    -- Which tenants can use this MCP
    allowed_tenants TEXT[],  -- NULL means all tenants

    -- Tools exposed by this MCP
    tools JSONB NOT NULL,  -- Tool definitions

    -- SCP integration
    scp_integration_enabled BOOLEAN DEFAULT true,
    requires_tenant_context BOOLEAN DEFAULT true,

    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Audit log for MCP calls
CREATE TABLE mcp_audit_log (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    timestamp TIMESTAMP NOT NULL DEFAULT NOW(),

    -- Identity
    agent_id VARCHAR(255) NOT NULL,
    tenant_id VARCHAR(255) NOT NULL,

    -- Request
    mcp_server VARCHAR(255) NOT NULL,
    tool VARCHAR(255) NOT NULL,
    parameters_hash VARCHAR(64),  -- SHA-256 of parameters (PII protection)

    -- Scoping
    scope_applied TEXT NOT NULL,
    tables_accessed TEXT[],

    -- Response
    rows_returned INTEGER,
    response_validated BOOLEAN,
    isolation_violations INTEGER DEFAULT 0,

    -- Indexing
    INDEX idx_mcp_audit_tenant (tenant_id, timestamp),
    INDEX idx_mcp_audit_agent (agent_id, timestamp)
);

Graph Materialization

Tenant policies materialize to the graph for efficient querying:

(Tenant:tenant_a)
    │
    ├──[HAS_POLICY]──→ (DataPolicy:tenant_a_default)
    │                        │
    │                        ├──[APPLIES_TO]──→ (Table:patients)
    │                        │                      filter: "tenant_id = 'tenant_a'"
    │                        │                      denied_columns: ["ssn"]
    │                        │
    │                        ├──[APPLIES_TO]──→ (Table:claims)
    │                        │                      filter: "tenant_id = 'tenant_a'"
    │                        │
    │                        └──[APPLIES_TO]──→ (Table:providers)
    │                                               filter: "tenant_id = 'tenant_a' OR is_public"
    │
    └──[HAS_AGENT]──→ (Agent:agent:tenant-a:pa-001)
                          │
                          └──[CAN_USE]──→ (MCP:mcp:acme:patient-data)

Enforcement Models

Three levels of enforcement, deployable independently or together:

Level 1: MCP Server Enforces (Trust Model)

MCP server receives tenant context and applies it correctly.

Agent → SCP (inject context) → MCP (enforce) → Database

Pros: Simple, low latency, MCP has full context Cons: Trusting MCP to enforce correctly Use when: MCP server is under your control, trusted codebase

Level 2: SCP Validates Response (Trust but Verify)

SCP checks responses for tenant isolation violations before returning.

Agent → SCP → MCP → Database
               ↓
           Response
               ↓
         SCP validates
               ↓
         Agent receives

Pros: Catches MCP bugs, defense in depth Cons: Additional latency, needs to understand data schema Use when: MCP server is third-party or less trusted

Level 3: Database-Level Enforcement (Belt + Suspenders)

Database itself enforces row-level security.

-- Postgres Row-Level Security
ALTER TABLE patients ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON patients
    USING (tenant_id = current_setting('app.tenant_id')::text);

-- MCP server sets context before each query
SET app.tenant_id = 'tenant_a';
SELECT * FROM patients;  -- RLS automatically filters

Pros: Cannot be bypassed by MCP bugs, enforcement at data layer Cons: Requires RLS-capable database, more setup Use when: Maximum security required, regulated industries

Environment Enforcement Level
Development Level 1 only
Staging Level 1 + Level 2
Production Level 1 + Level 2 + Level 3
Regulated (HIPAA, SOX) All three required

Tool Schema Validation (Admission Control)

Beyond tenant isolation, SCP validates tool parameters against registered schemas before forwarding calls. This completes the "Admission Controller" analogy from Kubernetes.

Why This Matters

Tenant scoping controls what data an agent can access. Schema validation controls how the agent calls tools:

Without Schema Validation With Schema Validation
Agent adds unexpected parameters Unexpected parameters rejected
Malformed requests reach MCP Malformed requests blocked at gateway
No visibility into parameter drift All violations logged
MCP must handle bad input MCP receives only valid requests

Tool Schema Registration

When MCP servers register with SCP, they provide JSON schemas for each tool:

# Tool registration with schema
server_id: "mcp:acme:patient-data"
tools:
  - name: get_patient_records
    description: "Query patient records by criteria"
    parameters_schema:
      type: object
      properties:
        patient_id:
          type: string
          pattern: "^[A-Z0-9]{8,12}$"
          description: "Patient identifier"
        diagnosis:
          type: string
          maxLength: 100
        limit:
          type: integer
          minimum: 1
          maximum: 500
          default: 100
        include_history:
          type: boolean
          default: false
      required:
        - patient_id
      additionalProperties: false  # Blocks unexpected parameters

  - name: submit_prior_auth
    description: "Submit a prior authorization request"
    parameters_schema:
      type: object
      properties:
        patient_id:
          type: string
          pattern: "^[A-Z0-9]{8,12}$"
        drug_ndc:
          type: string
          pattern: "^[0-9]{11}$"
        diagnosis_code:
          type: string
          pattern: "^[A-Z][0-9]{2}\\.[0-9]{1,2}$"
        urgency:
          type: string
          enum: ["routine", "urgent", "emergency"]
      required:
        - patient_id
        - drug_ndc
        - diagnosis_code
      additionalProperties: false

    # Tool-specific constraints
    constraints:
      max_calls_per_minute: 30
      requires_hitl_above:
        field: "urgency"
        value: "emergency"

Schema Validation in Gateway

The gateway validates parameters before forwarding:

from jsonschema import validate, ValidationError

class MCPGateway:

    async def proxy_tool_call(
        self,
        agent_id: str,
        tenant_id: str,
        mcp_server: str,
        tool: str,
        parameters: dict
    ) -> ToolCallResponse:
        """
        Proxy a tool call with tenant scoping AND schema validation.
        """
        # 1. Authenticate agent
        agent = await self.agent_registry.get(agent_id)
        if not agent or agent.tenant_id != tenant_id:
            raise AuthenticationError("Agent not authorized for tenant")

        # 2. Verify MCP server is registered
        mcp = await self.mcp_registry.get(mcp_server)
        if not mcp:
            raise SecurityError(f"MCP server {mcp_server} not registered")

        # 3. Get tool schema
        tool_config = mcp.get_tool(tool)
        if not tool_config:
            await self.log_rejection(agent_id, mcp_server, tool, "unknown_tool")
            raise SecurityError(f"Tool {tool} not registered for {mcp_server}")

        # 4. NEW: Validate parameters against schema
        schema_result = self.validate_parameters(parameters, tool_config.parameters_schema)
        if not schema_result.valid:
            await self.log_schema_violation(
                agent_id=agent_id,
                tenant_id=tenant_id,
                mcp_server=mcp_server,
                tool=tool,
                parameters=parameters,
                errors=schema_result.errors
            )
            raise ToolValidationError(
                f"Schema violation: {schema_result.errors}",
                violation_type="schema",
                tool=tool,
                errors=schema_result.errors
            )

        # 5. NEW: Check tool-specific constraints
        constraint_result = self.check_tool_constraints(
            agent_id, tool, parameters, tool_config.constraints
        )
        if constraint_result.action == "deny":
            raise ToolValidationError(constraint_result.reason)
        elif constraint_result.action == "hitl":
            return await self.queue_for_hitl(agent_id, mcp_server, tool, parameters)

        # 6. Verify agent can use this MCP
        if mcp_server not in agent.allowed_mcp_servers:
            raise SecurityError(f"Agent not authorized for MCP {mcp_server}")

        # 7. Resolve tenant context from graph
        tenant_context = await self.resolve_tenant_context(
            tenant_id=tenant_id,
            agent_id=agent_id,
            agent_role=agent.role,
            mcp_server=mcp_server,
            tool=tool
        )

        # 8. Build request with tenant context
        request = ToolCallRequest(
            tool=tool,
            parameters=parameters,
            scp_tenant_context=tenant_context
        )

        # 9. Forward to MCP server
        response = await self.forward_to_mcp(mcp_server, request)

        # 10. Validate response (optional trust-but-verify)
        if self.config.validate_responses:
            violations = self.check_tenant_isolation(response, tenant_id)
            if violations:
                await self.log_isolation_violation(agent_id, tenant_id, violations)
                response = self.filter_violations(response, violations)

        # 11. Log complete transaction
        await self.audit_log.record(
            timestamp=now(),
            agent_id=agent_id,
            tenant_id=tenant_id,
            mcp_server=mcp_server,
            tool=tool,
            parameters_hash=hash_parameters(parameters),
            schema_validated=True,
            scope_applied=tenant_context.data_scope.default_filter,
            rows_returned=len(response.data),
            response_validated=self.config.validate_responses
        )

        return response

    def validate_parameters(self, parameters: dict, schema: dict) -> ValidationResult:
        """
        Validate parameters against JSON schema.
        """
        try:
            validate(instance=parameters, schema=schema)
            return ValidationResult(valid=True, errors=[])
        except ValidationError as e:
            return ValidationResult(
                valid=False,
                errors=[{
                    "path": list(e.path),
                    "message": e.message,
                    "validator": e.validator
                }]
            )

    def check_tool_constraints(
        self,
        agent_id: str,
        tool: str,
        parameters: dict,
        constraints: ToolConstraints
    ) -> ConstraintResult:
        """
        Check tool-specific constraints (rate limits, HITL triggers).
        """
        # Check rate limit
        if constraints.max_calls_per_minute:
            recent_calls = self.rate_counter.get(agent_id, tool, window_seconds=60)
            if recent_calls >= constraints.max_calls_per_minute:
                return ConstraintResult(action="deny", reason="Rate limit exceeded")

        # Check HITL trigger
        if constraints.requires_hitl_above:
            field = constraints.requires_hitl_above["field"]
            threshold = constraints.requires_hitl_above["value"]
            if parameters.get(field) == threshold:
                return ConstraintResult(action="hitl", reason=f"{field}={threshold} requires approval")

        return ConstraintResult(action="allow")

Schema Violation Logging

All schema violations are logged for security monitoring:

-- Add to mcp_audit_log or create separate table
CREATE TABLE mcp_schema_violations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    timestamp TIMESTAMP NOT NULL DEFAULT NOW(),

    -- Identity
    agent_id VARCHAR(255) NOT NULL,
    tenant_id VARCHAR(255) NOT NULL,

    -- Request
    mcp_server VARCHAR(255) NOT NULL,
    tool VARCHAR(255) NOT NULL,
    parameters JSONB,  -- Full parameters for forensics

    -- Violation details
    violation_type VARCHAR(50) NOT NULL,  -- 'schema', 'unknown_tool', 'forbidden_param'
    errors JSONB NOT NULL,  -- Array of validation errors

    -- Indexing for security queries
    INDEX idx_schema_violations_agent (agent_id, timestamp),
    INDEX idx_schema_violations_tenant (tenant_id, timestamp),
    INDEX idx_schema_violations_tool (mcp_server, tool, timestamp)
);

Circuit Breaker Integration

Repeated schema violations trigger the circuit breaker:

class CircuitBreaker:
    """
    Suspend agents with repeated violations.
    """
    SCHEMA_VIOLATION_THRESHOLD = 10
    WINDOW_SECONDS = 300  # 5 minutes

    async def record_schema_violation(self, agent_id: str, tool: str):
        """Record violation and check circuit."""
        await self.violations.add(agent_id, "schema", now())

        recent = await self.violations.count_since(
            agent_id,
            now() - self.WINDOW_SECONDS
        )

        if recent >= self.SCHEMA_VIOLATION_THRESHOLD:
            await self.trip_circuit(
                agent_id,
                reason=f"Schema violations: {recent} in {self.WINDOW_SECONDS}s"
            )

    async def trip_circuit(self, agent_id: str, reason: str):
        """Suspend agent and alert."""
        await self.agent_registry.suspend(agent_id, reason, duration_seconds=3600)
        await self.alert_security_team(agent_id, reason)

The Complete Admission Control Flow

Agent makes tool call
        ↓
┌─────────────────────────────────────────────────────────┐
│                    SCP MCP Gateway                      │
├─────────────────────────────────────────────────────────┤
│  1. Authenticate agent                                  │
│  2. Look up tool in registry                            │
│  3. VALIDATE PARAMETERS AGAINST SCHEMA ← NEW            │
│  4. CHECK TOOL CONSTRAINTS (rate limit, HITL) ← NEW     │
│  5. Resolve tenant context                              │
│  6. Inject tenant scope into request                    │
│  7. Forward to MCP server                               │
│  8. Validate response (optional)                        │
│  9. Log complete transaction                            │
└─────────────────────────────────────────────────────────┘
        ↓
┌─────────────┬──────────────┬──────────────┐
│   ALLOW     │    DENY      │   ESCALATE   │
│  Forward    │  Block +     │  Queue for   │
│  to MCP     │  Log + Alert │  HITL        │
└─────────────┴──────────────┴──────────────┘

This completes the Kubernetes "Admission Controller" analogy:

Kubernetes Admission Controller SCP Gateway
Validates pod spec against policies Validates tool parameters against schema
Mutates requests (inject sidecars) Injects tenant context
Denies invalid requests Denies malformed tool calls
Rate limiting via ResourceQuota Rate limiting per tool/agent

API Endpoints

MCP Gateway

POST /api/mcp/call
  Description: Proxy a tool call through SCP with tenant scoping
  Headers:
    X-API-Key: <agent_api_key>
  Request:
    {
      "mcp_server": "mcp:acme:patient-data",
      "tool": "get_patient_records",
      "parameters": {
        "diagnosis": "diabetes",
        "limit": 100
      }
    }
  Response:
    {
      "data": [...],
      "metadata": {
        "rows_returned": 47,
        "scope_applied": "tenant_id = 'tenant_a'",
        "audit_id": "uuid"
      }
    }

Tenant Policy Management

POST /api/tenants/{tenant_id}/policies
  Description: Create or update tenant data policy
  Request:
    {
      "policy_name": "default",
      "default_filter": "tenant_id = '{tenant_id}'",
      "max_rows_per_query": 1000,
      "pii_masking_enabled": true,
      "tables": [
        {
          "table_name": "patients",
          "row_filter": "tenant_id = '{tenant_id}'",
          "denied_columns": ["ssn", "full_address"]
        }
      ]
    }

GET /api/tenants/{tenant_id}/policies
  Description: List all policies for a tenant

DELETE /api/tenants/{tenant_id}/policies/{policy_id}
  Description: Delete a tenant policy

MCP Server Registry

POST /api/mcp/servers
  Description: Register an MCP server with tool schemas
  Request:
    {
      "server_id": "mcp:acme:patient-data",
      "display_name": "Acme Patient Data MCP",
      "endpoint_url": "https://mcp.acme.internal:8080",
      "allowed_tenants": ["tenant_a", "tenant_b"],
      "tools": [
        {
          "name": "get_patient_records",
          "description": "Query patient records",
          "parameters_schema": {
            "type": "object",
            "properties": {
              "patient_id": { "type": "string", "pattern": "^[A-Z0-9]{8,12}$" },
              "diagnosis": { "type": "string", "maxLength": 100 },
              "limit": { "type": "integer", "minimum": 1, "maximum": 500 }
            },
            "required": ["patient_id"],
            "additionalProperties": false
          },
          "constraints": {
            "max_calls_per_minute": 60
          }
        }
      ]
    }

GET /api/mcp/servers
  Description: List registered MCP servers

GET /api/mcp/servers/{server_id}
  Description: Get MCP server details including tool schemas

GET /api/mcp/servers/{server_id}/tools/{tool_name}/schema
  Description: Get JSON schema for a specific tool

Audit

GET /api/audit/mcp?tenant_id=X&start=Y&end=Z
  Description: Query MCP audit log
  Response:
    {
      "entries": [
        {
          "timestamp": "2026-01-30T10:15:00Z",
          "agent_id": "agent:tenant-a:pa-001",
          "tenant_id": "tenant_a",
          "mcp_server": "mcp:acme:patient-data",
          "tool": "get_patient_records",
          "schema_validated": true,
          "scope_applied": "tenant_id = 'tenant_a'",
          "rows_returned": 47
        }
      ]
    }

GET /api/audit/violations?tenant_id=X&start=Y&end=Z
  Description: Query schema violation log
  Response:
    {
      "entries": [
        {
          "timestamp": "2026-01-30T10:16:00Z",
          "agent_id": "agent:tenant-a:pa-001",
          "tenant_id": "tenant_a",
          "mcp_server": "mcp:acme:patient-data",
          "tool": "get_patient_records",
          "violation_type": "schema",
          "errors": [
            {
              "path": ["patient_id"],
              "message": "'invalid' does not match '^[A-Z0-9]{8,12}$'",
              "validator": "pattern"
            }
          ]
        }
      ],
      "summary": {
        "total_violations": 12,
        "by_type": { "schema": 10, "unknown_tool": 2 },
        "by_agent": { "agent:tenant-a:pa-001": 12 }
      }
    }

Implementation Plan

Phase 1: Foundation (v0.4.0)

Scope: - Tenant data policy model (database tables) - MCP server registry with tool schemas - Basic gateway with context injection - Tool schema validation - Audit logging

Effort: ~2.5 weeks

Deliverables: - /api/tenants/{id}/policies endpoints - /api/mcp/servers endpoints (with tool schema support) - /api/mcp/call gateway endpoint - Schema validation in gateway - mcp_audit_log table - mcp_schema_violations table

Phase 2: Enforcement (v0.4.1)

Scope: - Response validation (Level 2) - Violation detection and logging - Column filtering - PII masking - Circuit breaker on repeated violations

Effort: ~2 weeks

Deliverables: - Response validator component - Violation alerting - Column/PII filtering in gateway - Circuit breaker logic

Phase 3: Graph Integration (v0.4.2)

Scope: - Materialize tenant policies to graph - Graph-based context resolution - Role-based policy overrides

Effort: ~1 week

Deliverables: - Tenant policy graph nodes - resolve_tenant_context() uses graph queries

Phase 4: Advanced Features (v0.5.0)

Scope: - Rate limiting per tenant/agent - Circuit breaker on violations - HITL for sensitive queries - Dashboard for tenant isolation monitoring

Effort: ~3 weeks

Deliverables: - Rate limiter component - Circuit breaker logic - HITL queue for flagged queries - Basic monitoring UI


Security Considerations

Threat Model

Threat Mitigation
Agent spoofs tenant_id Agent's tenant_id comes from registry, not request
MCP ignores tenant context Level 2 response validation catches leaks
SQL injection in filter Parameterized queries, filter is template not raw SQL
MCP server compromised Level 3 database RLS as final defense
Audit log tampering Audit log is append-only, separate from MCP access

Tenant ID Source of Truth

Critical: The tenant_id is resolved from the Agent Registry, NOT from the request:

# WRONG - tenant_id from request can be spoofed
tenant_id = request.tenant_id  # DO NOT DO THIS

# RIGHT - tenant_id from authenticated agent
agent = await agent_registry.get_by_api_key(request.api_key)
tenant_id = agent.tenant_id  # Source of truth

Filter Template Safety

Data scope filters use parameterized templates, not string concatenation:

# WRONG - SQL injection risk
filter = f"tenant_id = '{tenant_id}'"

# RIGHT - parameterized
filter_template = "tenant_id = :tenant_id"
params = {"tenant_id": tenant_id}

Compliance Mapping

Regulation Requirement Feature
HIPAA Access controls Tenant isolation prevents cross-customer data access
HIPAA Audit trail Full logging of every MCP call with tenant scope
HIPAA Minimum necessary Column filtering limits data to what's needed
SOC 2 Logical access Tenant-scoped queries enforce logical separation
SOC 2 Monitoring Audit log enables access monitoring
GDPR Data minimization PII masking reduces exposure
GDPR Access controls Tenant isolation enforces data boundaries

Success Metrics

Metric Target
Tenant isolation violations 0 in production
Gateway latency overhead < 50ms p99
Audit log completeness 100% of MCP calls logged
Policy resolution time < 10ms from graph

The Pitch

"Your AI agents are accessing shared infrastructure. Without tenant isolation, one misconfigured query leaks Customer B's data to Customer A.

SCP injects tenant context into every MCP call. The MCP server knows exactly whose data it can return. Every call is scoped, every access is logged, every violation is caught.

Multi-tenant AI agents. Zero cross-tenant risk."


References

  • SCP v0.3.0 Architecture: /control-plane/ARCHITECTURE.md
  • Agent Registry: /control-plane/api_server/routers/agents.py
  • Context Orchestrator: /control-plane/context_service/orchestrator.py
  • Graph Repository: /control-plane/database/graph/

Appendix A: Example Tenant Policy

Complete example for a healthcare tenant:

tenant_id: "acme_health"
policy_name: "hipaa_compliant"

default_filter: "tenant_id = 'acme_health'"

constraints:
  max_rows_per_query: 500
  max_queries_per_minute: 60
  require_audit_log: true
  pii_masking_enabled: true

tables:
  - name: patients
    row_filter: "tenant_id = 'acme_health' AND consent_given = true"
    allowed_columns:
      - id
      - mrn
      - first_name
      - last_name
      - dob
      - diagnosis_codes
      - insurance_id
    denied_columns:
      - ssn
      - full_address
      - phone_number
      - email

  - name: claims
    row_filter: "tenant_id = 'acme_health'"
    allowed_columns: ["*"]
    denied_columns:
      - internal_notes
      - adjudicator_id

  - name: providers
    row_filter: "tenant_id = 'acme_health' OR network_status = 'public'"
    allowed_columns: ["*"]
    denied_columns:
      - negotiated_rates
      - contract_terms

  - name: formulary
    row_filter: "tenant_id = 'acme_health' OR is_standard_formulary = true"
    allowed_columns: ["*"]
    denied_columns: []

allowed_mcp_servers:
  - "mcp:acme:patient-data"
  - "mcp:acme:claims-lookup"
  - "mcp:shared:drug-database"

denied_mcp_servers:
  - "mcp:internal:admin-tools"
  - "mcp:internal:billing"

Appendix B: MCP Integration SDK

Minimal SDK for MCP servers to integrate with SCP:

# scp_mcp_sdk.py

from dataclasses import dataclass
from typing import Optional, List, Dict, Any

@dataclass
class TenantContext:
    tenant_id: str
    agent_id: str
    default_filter: str
    tables: Dict[str, 'TablePolicy']
    constraints: 'Constraints'

@dataclass
class TablePolicy:
    row_filter: str
    allowed_columns: Optional[List[str]]
    denied_columns: List[str]

@dataclass
class Constraints:
    max_rows_per_query: int
    pii_masking: bool

class SCPMCPMiddleware:
    """
    Middleware for MCP servers to handle SCP tenant context.
    """

    def __init__(self, scp_endpoint: str, mcp_server_id: str):
        self.scp_endpoint = scp_endpoint
        self.mcp_server_id = mcp_server_id

    def extract_context(self, request: dict) -> TenantContext:
        """Extract and validate tenant context from request."""
        ctx = request.get('scp_tenant_context')
        if not ctx:
            raise SecurityError("Missing SCP tenant context")
        return TenantContext(**ctx)

    def scope_query(self, query: str, table: str, context: TenantContext) -> str:
        """Add tenant filter to query."""
        table_policy = context.tables.get(table)
        filter_clause = table_policy.row_filter if table_policy else context.default_filter

        # Simple injection - real implementation would use SQL parser
        if 'WHERE' in query.upper():
            return query.replace('WHERE', f'WHERE ({filter_clause}) AND ')
        else:
            return f"{query} WHERE {filter_clause}"

    def filter_response(self, rows: List[dict], table: str, context: TenantContext) -> List[dict]:
        """Remove denied columns from response."""
        table_policy = context.tables.get(table)
        if not table_policy:
            return rows

        denied = set(table_policy.denied_columns)
        return [
            {k: v for k, v in row.items() if k not in denied}
            for row in rows
        ]

    def enforce_limits(self, rows: List[dict], context: TenantContext) -> List[dict]:
        """Enforce row limits."""
        max_rows = context.constraints.max_rows_per_query
        if len(rows) > max_rows:
            return rows[:max_rows]
        return rows