FR-001: Tenant-Scoped MCP Control¶
Status: Proposed Priority: High (Core Differentiator) Author: Tim McCrimmon Created: 2026-01-30 Target Version: v0.4
Executive Summary¶
Extend SCP to provide tenant isolation context and tool schema validation to MCP servers, ensuring that multi-tenant agent deployments cannot leak data across tenant boundaries and that all tool calls conform to registered schemas.
Two layers of control: 1. Tenant Isolation - Data scoping rules injected into every MCP call 2. Schema Validation - Parameter validation against registered tool schemas (admission control)
The model: MCP servers become consumers of SCP context, just like agents. SCP governs both sides of the interaction and validates every tool call before forwarding.
Problem Statement¶
The Multi-Tenant MCP Risk¶
Organizations deploying AI agents often have: - Multiple tenants (customers, business units, departments) - Shared infrastructure (databases, MCP servers, tools) - Agents that need tenant-specific data access
Current (uncontrolled):
Tenant A Agent ──→ Shared MCP Server ──→ Multi-Tenant Database
↑
No tenant scoping
Can access ANY tenant's data
Without tenant isolation: - Tenant A's agent could query Tenant B's patient records - A misconfigured query returns cross-tenant data - One compromised agent exposes all tenants - No audit trail of tenant-scoped access
Real-World Scenarios¶
Healthcare SaaS: - Multiple hospital systems on shared platform - Each hospital's agents must only see their patients - HIPAA violation if Hospital A sees Hospital B's data
Financial Services: - Multiple client accounts on shared infrastructure - Each client's agents access only their portfolio - Regulatory violation if Client A sees Client B's positions
Enterprise Multi-BU: - Multiple business units sharing AI platform - Each BU's agents access only their data - Competitive intelligence risk if BU data leaks internally
Proposed Solution¶
Core Concept¶
SCP becomes the context provider for MCP servers, not just agents. Every MCP tool call includes tenant context that scopes data access.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Tenant A │ │ SCP │ │ MCP │
│ Agent │ │ │ │ Server │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 1. Tool call │ │
│ + agent_id │ │
│ + tenant_id │ │
│──────────────────→│ │
│ │ │
│ │ 2. Resolve tenant │
│ │ context from │
│ │ graph/rules │
│ │ │
│ │ 3. Forward call │
│ │ + tenant scope │
│ │──────────────────→│
│ │ │
│ │ 4. MCP │
│ │ executes with │
│ │ tenant filter │
│ │ │
│ │ 5. Response │
│ │ (scoped data) │
│ │←──────────────────│
│ │ │
│ │ 6. Log + validate │
│ │ │
│ 7. Response │ │
│ (Tenant A only) │ │
│←──────────────────│ │
│ │ │
Tenant Context Structure¶
When an MCP call is made, SCP resolves and injects tenant context:
# Tenant context injected into MCP call
scp_tenant_context:
# Identity
tenant_id: "tenant_a"
agent_id: "agent:tenant-a:prior-auth-001"
agent_role: "role:prior-auth-agent"
# Data scoping rules
data_scope:
# Table-level filters (applied to all queries)
default_filter: "tenant_id = 'tenant_a'"
# Table-specific overrides
tables:
patients:
filter: "tenant_id = 'tenant_a'"
allowed_columns: ["id", "name", "dob", "diagnosis"]
denied_columns: ["ssn", "full_address"]
claims:
filter: "tenant_id = 'tenant_a' AND status != 'internal_review'"
allowed_columns: ["*"]
denied_columns: []
providers:
filter: "tenant_id = 'tenant_a' OR is_public = true"
allowed_columns: ["*"]
denied_columns: ["contract_terms", "negotiated_rates"]
formulary:
filter: "tenant_id = 'tenant_a' OR is_standard = true"
allowed_columns: ["*"]
denied_columns: []
# Resource boundaries
resource_access:
allowed_tables:
- patients
- claims
- providers
- formulary
denied_tables:
- audit_logs
- system_config
- tenant_settings
- billing_internal
allowed_operations:
- SELECT
- INSERT (claims, auth_requests)
denied_operations:
- UPDATE
- DELETE
- DROP
- ALTER
# Additional constraints
constraints:
max_rows_per_query: 1000
max_queries_per_minute: 100
require_audit_log: true
pii_masking: true
MCP Server Integration¶
MCP servers that integrate with SCP apply the tenant context:
# MCP Server with SCP integration
class SCPAwareMCPServer:
async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse:
"""
Handle tool call with SCP tenant scoping.
"""
# 1. Extract SCP context from request
tenant_context = request.scp_tenant_context
if not tenant_context:
raise SecurityError("Missing tenant context")
# 2. Validate agent is allowed to call this tool
if not self.validate_tool_access(request.tool, tenant_context):
raise SecurityError(f"Tool {request.tool} not allowed for tenant")
# 3. Build query with tenant scoping
query = self.build_query(request.tool, request.parameters)
scoped_query = self.apply_tenant_scope(query, tenant_context)
# 4. Execute scoped query
results = await self.execute(scoped_query)
# 5. Apply column filtering (remove denied columns)
filtered_results = self.filter_columns(results, tenant_context)
# 6. Apply PII masking if required
if tenant_context.constraints.pii_masking:
filtered_results = self.mask_pii(filtered_results)
# 7. Log the access
await self.log_access(request, tenant_context, len(filtered_results))
return ToolCallResponse(data=filtered_results)
def apply_tenant_scope(self, query: Query, context: TenantContext) -> Query:
"""
Inject tenant filter into query.
"""
table = query.table
# Get table-specific filter or default
if table in context.data_scope.tables:
filter_clause = context.data_scope.tables[table].filter
else:
filter_clause = context.data_scope.default_filter
# Inject filter
return query.where(filter_clause)
def filter_columns(self, results: list, context: TenantContext) -> list:
"""
Remove columns the tenant isn't allowed to see.
"""
table = results.source_table
table_config = context.data_scope.tables.get(table)
if not table_config:
return results
denied = set(table_config.denied_columns)
return [
{k: v for k, v in row.items() if k not in denied}
for row in results
]
SCP Gateway Component¶
SCP acts as the intermediary, resolving tenant context and forwarding calls:
# SCP MCP Gateway
class MCPGateway:
"""
Gateway that injects tenant context into MCP calls.
"""
async def proxy_tool_call(
self,
agent_id: str,
tenant_id: str,
mcp_server: str,
tool: str,
parameters: dict
) -> ToolCallResponse:
"""
Proxy a tool call through SCP with tenant scoping.
"""
# 1. Authenticate agent
agent = await self.agent_registry.get(agent_id)
if not agent or agent.tenant_id != tenant_id:
raise AuthenticationError("Agent not authorized for tenant")
# 2. Verify MCP server is registered
mcp = await self.mcp_registry.get(mcp_server)
if not mcp:
raise SecurityError(f"MCP server {mcp_server} not registered")
# 3. Verify agent can use this MCP
if mcp_server not in agent.allowed_mcp_servers:
raise SecurityError(f"Agent not authorized for MCP {mcp_server}")
# 4. Resolve tenant context from graph
tenant_context = await self.resolve_tenant_context(
tenant_id=tenant_id,
agent_id=agent_id,
agent_role=agent.role,
mcp_server=mcp_server,
tool=tool
)
# 5. Build request with tenant context
request = ToolCallRequest(
tool=tool,
parameters=parameters,
scp_tenant_context=tenant_context
)
# 6. Forward to MCP server
response = await self.forward_to_mcp(mcp_server, request)
# 7. Validate response (optional trust-but-verify)
if self.config.validate_responses:
violations = self.check_tenant_isolation(response, tenant_id)
if violations:
await self.log_isolation_violation(agent_id, tenant_id, violations)
response = self.filter_violations(response, violations)
# 8. Log complete transaction
await self.audit_log.record(
timestamp=now(),
agent_id=agent_id,
tenant_id=tenant_id,
mcp_server=mcp_server,
tool=tool,
parameters=parameters, # or hash for PII protection
scope_applied=tenant_context.data_scope.default_filter,
rows_returned=len(response.data),
response_validated=self.config.validate_responses
)
return response
async def resolve_tenant_context(
self,
tenant_id: str,
agent_id: str,
agent_role: str,
mcp_server: str,
tool: str
) -> TenantContext:
"""
Query graph to resolve tenant-specific context.
"""
# Query graph for tenant's data access rules
query = """
MATCH (t:Tenant {id: $tenant_id})-[:HAS_POLICY]->(p:DataPolicy)
MATCH (p)-[:APPLIES_TO]->(table:Table)
OPTIONAL MATCH (r:Role {id: $role})-[:HAS_OVERRIDE]->(o:PolicyOverride)
RETURN p, table, o
"""
results = await self.graph.query(query, {
"tenant_id": tenant_id,
"role": agent_role
})
# Build tenant context from graph results
return self.build_tenant_context(tenant_id, agent_id, results)
Database Schema Additions¶
New tables to support tenant-scoped MCP:
-- Tenant data policies
CREATE TABLE tenant_data_policies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(255) NOT NULL,
policy_name VARCHAR(255) NOT NULL,
description TEXT,
-- Default scope applied to all queries
default_filter TEXT NOT NULL, -- e.g., "tenant_id = '{tenant_id}'"
-- Global constraints
max_rows_per_query INTEGER DEFAULT 1000,
max_queries_per_minute INTEGER DEFAULT 100,
require_audit_log BOOLEAN DEFAULT true,
pii_masking_enabled BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(tenant_id, policy_name)
);
-- Table-specific access rules
CREATE TABLE tenant_table_policies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
policy_id UUID REFERENCES tenant_data_policies(id),
table_name VARCHAR(255) NOT NULL,
-- Row filter for this table
row_filter TEXT NOT NULL, -- e.g., "tenant_id = 'X' AND status = 'active'"
-- Column access
allowed_columns TEXT[], -- NULL means all allowed
denied_columns TEXT[] DEFAULT '{}',
-- Operations allowed
allowed_operations TEXT[] DEFAULT '{SELECT}',
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(policy_id, table_name)
);
-- MCP server registry
CREATE TABLE mcp_servers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
server_id VARCHAR(255) UNIQUE NOT NULL, -- e.g., "mcp:acme:patient-data"
display_name VARCHAR(255) NOT NULL,
endpoint_url TEXT NOT NULL,
-- Which tenants can use this MCP
allowed_tenants TEXT[], -- NULL means all tenants
-- Tools exposed by this MCP
tools JSONB NOT NULL, -- Tool definitions
-- SCP integration
scp_integration_enabled BOOLEAN DEFAULT true,
requires_tenant_context BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Audit log for MCP calls
CREATE TABLE mcp_audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
-- Identity
agent_id VARCHAR(255) NOT NULL,
tenant_id VARCHAR(255) NOT NULL,
-- Request
mcp_server VARCHAR(255) NOT NULL,
tool VARCHAR(255) NOT NULL,
parameters_hash VARCHAR(64), -- SHA-256 of parameters (PII protection)
-- Scoping
scope_applied TEXT NOT NULL,
tables_accessed TEXT[],
-- Response
rows_returned INTEGER,
response_validated BOOLEAN,
isolation_violations INTEGER DEFAULT 0,
-- Indexing
INDEX idx_mcp_audit_tenant (tenant_id, timestamp),
INDEX idx_mcp_audit_agent (agent_id, timestamp)
);
Graph Materialization¶
Tenant policies materialize to the graph for efficient querying:
(Tenant:tenant_a)
│
├──[HAS_POLICY]──→ (DataPolicy:tenant_a_default)
│ │
│ ├──[APPLIES_TO]──→ (Table:patients)
│ │ filter: "tenant_id = 'tenant_a'"
│ │ denied_columns: ["ssn"]
│ │
│ ├──[APPLIES_TO]──→ (Table:claims)
│ │ filter: "tenant_id = 'tenant_a'"
│ │
│ └──[APPLIES_TO]──→ (Table:providers)
│ filter: "tenant_id = 'tenant_a' OR is_public"
│
└──[HAS_AGENT]──→ (Agent:agent:tenant-a:pa-001)
│
└──[CAN_USE]──→ (MCP:mcp:acme:patient-data)
Enforcement Models¶
Three levels of enforcement, deployable independently or together:
Level 1: MCP Server Enforces (Trust Model)¶
MCP server receives tenant context and applies it correctly.
Agent → SCP (inject context) → MCP (enforce) → Database
Pros: Simple, low latency, MCP has full context Cons: Trusting MCP to enforce correctly Use when: MCP server is under your control, trusted codebase
Level 2: SCP Validates Response (Trust but Verify)¶
SCP checks responses for tenant isolation violations before returning.
Agent → SCP → MCP → Database
↓
Response
↓
SCP validates
↓
Agent receives
Pros: Catches MCP bugs, defense in depth Cons: Additional latency, needs to understand data schema Use when: MCP server is third-party or less trusted
Level 3: Database-Level Enforcement (Belt + Suspenders)¶
Database itself enforces row-level security.
-- Postgres Row-Level Security
ALTER TABLE patients ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON patients
USING (tenant_id = current_setting('app.tenant_id')::text);
-- MCP server sets context before each query
SET app.tenant_id = 'tenant_a';
SELECT * FROM patients; -- RLS automatically filters
Pros: Cannot be bypassed by MCP bugs, enforcement at data layer Cons: Requires RLS-capable database, more setup Use when: Maximum security required, regulated industries
Recommended Deployment¶
| Environment | Enforcement Level |
|---|---|
| Development | Level 1 only |
| Staging | Level 1 + Level 2 |
| Production | Level 1 + Level 2 + Level 3 |
| Regulated (HIPAA, SOX) | All three required |
Tool Schema Validation (Admission Control)¶
Beyond tenant isolation, SCP validates tool parameters against registered schemas before forwarding calls. This completes the "Admission Controller" analogy from Kubernetes.
Why This Matters¶
Tenant scoping controls what data an agent can access. Schema validation controls how the agent calls tools:
| Without Schema Validation | With Schema Validation |
|---|---|
| Agent adds unexpected parameters | Unexpected parameters rejected |
| Malformed requests reach MCP | Malformed requests blocked at gateway |
| No visibility into parameter drift | All violations logged |
| MCP must handle bad input | MCP receives only valid requests |
Tool Schema Registration¶
When MCP servers register with SCP, they provide JSON schemas for each tool:
# Tool registration with schema
server_id: "mcp:acme:patient-data"
tools:
- name: get_patient_records
description: "Query patient records by criteria"
parameters_schema:
type: object
properties:
patient_id:
type: string
pattern: "^[A-Z0-9]{8,12}$"
description: "Patient identifier"
diagnosis:
type: string
maxLength: 100
limit:
type: integer
minimum: 1
maximum: 500
default: 100
include_history:
type: boolean
default: false
required:
- patient_id
additionalProperties: false # Blocks unexpected parameters
- name: submit_prior_auth
description: "Submit a prior authorization request"
parameters_schema:
type: object
properties:
patient_id:
type: string
pattern: "^[A-Z0-9]{8,12}$"
drug_ndc:
type: string
pattern: "^[0-9]{11}$"
diagnosis_code:
type: string
pattern: "^[A-Z][0-9]{2}\\.[0-9]{1,2}$"
urgency:
type: string
enum: ["routine", "urgent", "emergency"]
required:
- patient_id
- drug_ndc
- diagnosis_code
additionalProperties: false
# Tool-specific constraints
constraints:
max_calls_per_minute: 30
requires_hitl_above:
field: "urgency"
value: "emergency"
Schema Validation in Gateway¶
The gateway validates parameters before forwarding:
from jsonschema import validate, ValidationError
class MCPGateway:
async def proxy_tool_call(
self,
agent_id: str,
tenant_id: str,
mcp_server: str,
tool: str,
parameters: dict
) -> ToolCallResponse:
"""
Proxy a tool call with tenant scoping AND schema validation.
"""
# 1. Authenticate agent
agent = await self.agent_registry.get(agent_id)
if not agent or agent.tenant_id != tenant_id:
raise AuthenticationError("Agent not authorized for tenant")
# 2. Verify MCP server is registered
mcp = await self.mcp_registry.get(mcp_server)
if not mcp:
raise SecurityError(f"MCP server {mcp_server} not registered")
# 3. Get tool schema
tool_config = mcp.get_tool(tool)
if not tool_config:
await self.log_rejection(agent_id, mcp_server, tool, "unknown_tool")
raise SecurityError(f"Tool {tool} not registered for {mcp_server}")
# 4. NEW: Validate parameters against schema
schema_result = self.validate_parameters(parameters, tool_config.parameters_schema)
if not schema_result.valid:
await self.log_schema_violation(
agent_id=agent_id,
tenant_id=tenant_id,
mcp_server=mcp_server,
tool=tool,
parameters=parameters,
errors=schema_result.errors
)
raise ToolValidationError(
f"Schema violation: {schema_result.errors}",
violation_type="schema",
tool=tool,
errors=schema_result.errors
)
# 5. NEW: Check tool-specific constraints
constraint_result = self.check_tool_constraints(
agent_id, tool, parameters, tool_config.constraints
)
if constraint_result.action == "deny":
raise ToolValidationError(constraint_result.reason)
elif constraint_result.action == "hitl":
return await self.queue_for_hitl(agent_id, mcp_server, tool, parameters)
# 6. Verify agent can use this MCP
if mcp_server not in agent.allowed_mcp_servers:
raise SecurityError(f"Agent not authorized for MCP {mcp_server}")
# 7. Resolve tenant context from graph
tenant_context = await self.resolve_tenant_context(
tenant_id=tenant_id,
agent_id=agent_id,
agent_role=agent.role,
mcp_server=mcp_server,
tool=tool
)
# 8. Build request with tenant context
request = ToolCallRequest(
tool=tool,
parameters=parameters,
scp_tenant_context=tenant_context
)
# 9. Forward to MCP server
response = await self.forward_to_mcp(mcp_server, request)
# 10. Validate response (optional trust-but-verify)
if self.config.validate_responses:
violations = self.check_tenant_isolation(response, tenant_id)
if violations:
await self.log_isolation_violation(agent_id, tenant_id, violations)
response = self.filter_violations(response, violations)
# 11. Log complete transaction
await self.audit_log.record(
timestamp=now(),
agent_id=agent_id,
tenant_id=tenant_id,
mcp_server=mcp_server,
tool=tool,
parameters_hash=hash_parameters(parameters),
schema_validated=True,
scope_applied=tenant_context.data_scope.default_filter,
rows_returned=len(response.data),
response_validated=self.config.validate_responses
)
return response
def validate_parameters(self, parameters: dict, schema: dict) -> ValidationResult:
"""
Validate parameters against JSON schema.
"""
try:
validate(instance=parameters, schema=schema)
return ValidationResult(valid=True, errors=[])
except ValidationError as e:
return ValidationResult(
valid=False,
errors=[{
"path": list(e.path),
"message": e.message,
"validator": e.validator
}]
)
def check_tool_constraints(
self,
agent_id: str,
tool: str,
parameters: dict,
constraints: ToolConstraints
) -> ConstraintResult:
"""
Check tool-specific constraints (rate limits, HITL triggers).
"""
# Check rate limit
if constraints.max_calls_per_minute:
recent_calls = self.rate_counter.get(agent_id, tool, window_seconds=60)
if recent_calls >= constraints.max_calls_per_minute:
return ConstraintResult(action="deny", reason="Rate limit exceeded")
# Check HITL trigger
if constraints.requires_hitl_above:
field = constraints.requires_hitl_above["field"]
threshold = constraints.requires_hitl_above["value"]
if parameters.get(field) == threshold:
return ConstraintResult(action="hitl", reason=f"{field}={threshold} requires approval")
return ConstraintResult(action="allow")
Schema Violation Logging¶
All schema violations are logged for security monitoring:
-- Add to mcp_audit_log or create separate table
CREATE TABLE mcp_schema_violations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
-- Identity
agent_id VARCHAR(255) NOT NULL,
tenant_id VARCHAR(255) NOT NULL,
-- Request
mcp_server VARCHAR(255) NOT NULL,
tool VARCHAR(255) NOT NULL,
parameters JSONB, -- Full parameters for forensics
-- Violation details
violation_type VARCHAR(50) NOT NULL, -- 'schema', 'unknown_tool', 'forbidden_param'
errors JSONB NOT NULL, -- Array of validation errors
-- Indexing for security queries
INDEX idx_schema_violations_agent (agent_id, timestamp),
INDEX idx_schema_violations_tenant (tenant_id, timestamp),
INDEX idx_schema_violations_tool (mcp_server, tool, timestamp)
);
Circuit Breaker Integration¶
Repeated schema violations trigger the circuit breaker:
class CircuitBreaker:
"""
Suspend agents with repeated violations.
"""
SCHEMA_VIOLATION_THRESHOLD = 10
WINDOW_SECONDS = 300 # 5 minutes
async def record_schema_violation(self, agent_id: str, tool: str):
"""Record violation and check circuit."""
await self.violations.add(agent_id, "schema", now())
recent = await self.violations.count_since(
agent_id,
now() - self.WINDOW_SECONDS
)
if recent >= self.SCHEMA_VIOLATION_THRESHOLD:
await self.trip_circuit(
agent_id,
reason=f"Schema violations: {recent} in {self.WINDOW_SECONDS}s"
)
async def trip_circuit(self, agent_id: str, reason: str):
"""Suspend agent and alert."""
await self.agent_registry.suspend(agent_id, reason, duration_seconds=3600)
await self.alert_security_team(agent_id, reason)
The Complete Admission Control Flow¶
Agent makes tool call
↓
┌─────────────────────────────────────────────────────────┐
│ SCP MCP Gateway │
├─────────────────────────────────────────────────────────┤
│ 1. Authenticate agent │
│ 2. Look up tool in registry │
│ 3. VALIDATE PARAMETERS AGAINST SCHEMA ← NEW │
│ 4. CHECK TOOL CONSTRAINTS (rate limit, HITL) ← NEW │
│ 5. Resolve tenant context │
│ 6. Inject tenant scope into request │
│ 7. Forward to MCP server │
│ 8. Validate response (optional) │
│ 9. Log complete transaction │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────┬──────────────┬──────────────┐
│ ALLOW │ DENY │ ESCALATE │
│ Forward │ Block + │ Queue for │
│ to MCP │ Log + Alert │ HITL │
└─────────────┴──────────────┴──────────────┘
This completes the Kubernetes "Admission Controller" analogy:
| Kubernetes Admission Controller | SCP Gateway |
|---|---|
| Validates pod spec against policies | Validates tool parameters against schema |
| Mutates requests (inject sidecars) | Injects tenant context |
| Denies invalid requests | Denies malformed tool calls |
| Rate limiting via ResourceQuota | Rate limiting per tool/agent |
API Endpoints¶
MCP Gateway¶
POST /api/mcp/call
Description: Proxy a tool call through SCP with tenant scoping
Headers:
X-API-Key: <agent_api_key>
Request:
{
"mcp_server": "mcp:acme:patient-data",
"tool": "get_patient_records",
"parameters": {
"diagnosis": "diabetes",
"limit": 100
}
}
Response:
{
"data": [...],
"metadata": {
"rows_returned": 47,
"scope_applied": "tenant_id = 'tenant_a'",
"audit_id": "uuid"
}
}
Tenant Policy Management¶
POST /api/tenants/{tenant_id}/policies
Description: Create or update tenant data policy
Request:
{
"policy_name": "default",
"default_filter": "tenant_id = '{tenant_id}'",
"max_rows_per_query": 1000,
"pii_masking_enabled": true,
"tables": [
{
"table_name": "patients",
"row_filter": "tenant_id = '{tenant_id}'",
"denied_columns": ["ssn", "full_address"]
}
]
}
GET /api/tenants/{tenant_id}/policies
Description: List all policies for a tenant
DELETE /api/tenants/{tenant_id}/policies/{policy_id}
Description: Delete a tenant policy
MCP Server Registry¶
POST /api/mcp/servers
Description: Register an MCP server with tool schemas
Request:
{
"server_id": "mcp:acme:patient-data",
"display_name": "Acme Patient Data MCP",
"endpoint_url": "https://mcp.acme.internal:8080",
"allowed_tenants": ["tenant_a", "tenant_b"],
"tools": [
{
"name": "get_patient_records",
"description": "Query patient records",
"parameters_schema": {
"type": "object",
"properties": {
"patient_id": { "type": "string", "pattern": "^[A-Z0-9]{8,12}$" },
"diagnosis": { "type": "string", "maxLength": 100 },
"limit": { "type": "integer", "minimum": 1, "maximum": 500 }
},
"required": ["patient_id"],
"additionalProperties": false
},
"constraints": {
"max_calls_per_minute": 60
}
}
]
}
GET /api/mcp/servers
Description: List registered MCP servers
GET /api/mcp/servers/{server_id}
Description: Get MCP server details including tool schemas
GET /api/mcp/servers/{server_id}/tools/{tool_name}/schema
Description: Get JSON schema for a specific tool
Audit¶
GET /api/audit/mcp?tenant_id=X&start=Y&end=Z
Description: Query MCP audit log
Response:
{
"entries": [
{
"timestamp": "2026-01-30T10:15:00Z",
"agent_id": "agent:tenant-a:pa-001",
"tenant_id": "tenant_a",
"mcp_server": "mcp:acme:patient-data",
"tool": "get_patient_records",
"schema_validated": true,
"scope_applied": "tenant_id = 'tenant_a'",
"rows_returned": 47
}
]
}
GET /api/audit/violations?tenant_id=X&start=Y&end=Z
Description: Query schema violation log
Response:
{
"entries": [
{
"timestamp": "2026-01-30T10:16:00Z",
"agent_id": "agent:tenant-a:pa-001",
"tenant_id": "tenant_a",
"mcp_server": "mcp:acme:patient-data",
"tool": "get_patient_records",
"violation_type": "schema",
"errors": [
{
"path": ["patient_id"],
"message": "'invalid' does not match '^[A-Z0-9]{8,12}$'",
"validator": "pattern"
}
]
}
],
"summary": {
"total_violations": 12,
"by_type": { "schema": 10, "unknown_tool": 2 },
"by_agent": { "agent:tenant-a:pa-001": 12 }
}
}
Implementation Plan¶
Phase 1: Foundation (v0.4.0)¶
Scope: - Tenant data policy model (database tables) - MCP server registry with tool schemas - Basic gateway with context injection - Tool schema validation - Audit logging
Effort: ~2.5 weeks
Deliverables:
- /api/tenants/{id}/policies endpoints
- /api/mcp/servers endpoints (with tool schema support)
- /api/mcp/call gateway endpoint
- Schema validation in gateway
- mcp_audit_log table
- mcp_schema_violations table
Phase 2: Enforcement (v0.4.1)¶
Scope: - Response validation (Level 2) - Violation detection and logging - Column filtering - PII masking - Circuit breaker on repeated violations
Effort: ~2 weeks
Deliverables: - Response validator component - Violation alerting - Column/PII filtering in gateway - Circuit breaker logic
Phase 3: Graph Integration (v0.4.2)¶
Scope: - Materialize tenant policies to graph - Graph-based context resolution - Role-based policy overrides
Effort: ~1 week
Deliverables:
- Tenant policy graph nodes
- resolve_tenant_context() uses graph queries
Phase 4: Advanced Features (v0.5.0)¶
Scope: - Rate limiting per tenant/agent - Circuit breaker on violations - HITL for sensitive queries - Dashboard for tenant isolation monitoring
Effort: ~3 weeks
Deliverables: - Rate limiter component - Circuit breaker logic - HITL queue for flagged queries - Basic monitoring UI
Security Considerations¶
Threat Model¶
| Threat | Mitigation |
|---|---|
| Agent spoofs tenant_id | Agent's tenant_id comes from registry, not request |
| MCP ignores tenant context | Level 2 response validation catches leaks |
| SQL injection in filter | Parameterized queries, filter is template not raw SQL |
| MCP server compromised | Level 3 database RLS as final defense |
| Audit log tampering | Audit log is append-only, separate from MCP access |
Tenant ID Source of Truth¶
Critical: The tenant_id is resolved from the Agent Registry, NOT from the request:
# WRONG - tenant_id from request can be spoofed
tenant_id = request.tenant_id # DO NOT DO THIS
# RIGHT - tenant_id from authenticated agent
agent = await agent_registry.get_by_api_key(request.api_key)
tenant_id = agent.tenant_id # Source of truth
Filter Template Safety¶
Data scope filters use parameterized templates, not string concatenation:
# WRONG - SQL injection risk
filter = f"tenant_id = '{tenant_id}'"
# RIGHT - parameterized
filter_template = "tenant_id = :tenant_id"
params = {"tenant_id": tenant_id}
Compliance Mapping¶
| Regulation | Requirement | Feature |
|---|---|---|
| HIPAA | Access controls | Tenant isolation prevents cross-customer data access |
| HIPAA | Audit trail | Full logging of every MCP call with tenant scope |
| HIPAA | Minimum necessary | Column filtering limits data to what's needed |
| SOC 2 | Logical access | Tenant-scoped queries enforce logical separation |
| SOC 2 | Monitoring | Audit log enables access monitoring |
| GDPR | Data minimization | PII masking reduces exposure |
| GDPR | Access controls | Tenant isolation enforces data boundaries |
Success Metrics¶
| Metric | Target |
|---|---|
| Tenant isolation violations | 0 in production |
| Gateway latency overhead | < 50ms p99 |
| Audit log completeness | 100% of MCP calls logged |
| Policy resolution time | < 10ms from graph |
The Pitch¶
"Your AI agents are accessing shared infrastructure. Without tenant isolation, one misconfigured query leaks Customer B's data to Customer A.
SCP injects tenant context into every MCP call. The MCP server knows exactly whose data it can return. Every call is scoped, every access is logged, every violation is caught.
Multi-tenant AI agents. Zero cross-tenant risk."
References¶
- SCP v0.3.0 Architecture:
/control-plane/ARCHITECTURE.md - Agent Registry:
/control-plane/api_server/routers/agents.py - Context Orchestrator:
/control-plane/context_service/orchestrator.py - Graph Repository:
/control-plane/database/graph/
Appendix A: Example Tenant Policy¶
Complete example for a healthcare tenant:
tenant_id: "acme_health"
policy_name: "hipaa_compliant"
default_filter: "tenant_id = 'acme_health'"
constraints:
max_rows_per_query: 500
max_queries_per_minute: 60
require_audit_log: true
pii_masking_enabled: true
tables:
- name: patients
row_filter: "tenant_id = 'acme_health' AND consent_given = true"
allowed_columns:
- id
- mrn
- first_name
- last_name
- dob
- diagnosis_codes
- insurance_id
denied_columns:
- ssn
- full_address
- phone_number
- email
- name: claims
row_filter: "tenant_id = 'acme_health'"
allowed_columns: ["*"]
denied_columns:
- internal_notes
- adjudicator_id
- name: providers
row_filter: "tenant_id = 'acme_health' OR network_status = 'public'"
allowed_columns: ["*"]
denied_columns:
- negotiated_rates
- contract_terms
- name: formulary
row_filter: "tenant_id = 'acme_health' OR is_standard_formulary = true"
allowed_columns: ["*"]
denied_columns: []
allowed_mcp_servers:
- "mcp:acme:patient-data"
- "mcp:acme:claims-lookup"
- "mcp:shared:drug-database"
denied_mcp_servers:
- "mcp:internal:admin-tools"
- "mcp:internal:billing"
Appendix B: MCP Integration SDK¶
Minimal SDK for MCP servers to integrate with SCP:
# scp_mcp_sdk.py
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
@dataclass
class TenantContext:
tenant_id: str
agent_id: str
default_filter: str
tables: Dict[str, 'TablePolicy']
constraints: 'Constraints'
@dataclass
class TablePolicy:
row_filter: str
allowed_columns: Optional[List[str]]
denied_columns: List[str]
@dataclass
class Constraints:
max_rows_per_query: int
pii_masking: bool
class SCPMCPMiddleware:
"""
Middleware for MCP servers to handle SCP tenant context.
"""
def __init__(self, scp_endpoint: str, mcp_server_id: str):
self.scp_endpoint = scp_endpoint
self.mcp_server_id = mcp_server_id
def extract_context(self, request: dict) -> TenantContext:
"""Extract and validate tenant context from request."""
ctx = request.get('scp_tenant_context')
if not ctx:
raise SecurityError("Missing SCP tenant context")
return TenantContext(**ctx)
def scope_query(self, query: str, table: str, context: TenantContext) -> str:
"""Add tenant filter to query."""
table_policy = context.tables.get(table)
filter_clause = table_policy.row_filter if table_policy else context.default_filter
# Simple injection - real implementation would use SQL parser
if 'WHERE' in query.upper():
return query.replace('WHERE', f'WHERE ({filter_clause}) AND ')
else:
return f"{query} WHERE {filter_clause}"
def filter_response(self, rows: List[dict], table: str, context: TenantContext) -> List[dict]:
"""Remove denied columns from response."""
table_policy = context.tables.get(table)
if not table_policy:
return rows
denied = set(table_policy.denied_columns)
return [
{k: v for k, v in row.items() if k not in denied}
for row in rows
]
def enforce_limits(self, rows: List[dict], context: TenantContext) -> List[dict]:
"""Enforce row limits."""
max_rows = context.constraints.max_rows_per_query
if len(rows) > max_rows:
return rows[:max_rows]
return rows