Skip to main content

Production Security

This guide covers security best practices for deploying IntentusNet in production.

Security Architecture

TLS Configuration

Minimum Requirements

  • TLS 1.2 or higher
  • Strong cipher suites
  • Valid certificates from trusted CA
  • Regular certificate rotation

Nginx TLS Example

server {
listen 443 ssl http2;
server_name intentusnet.example.com;

ssl_certificate /etc/ssl/certs/intentusnet.crt;
ssl_certificate_key /etc/ssl/private/intentusnet.key;

ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
ssl_prefer_server_ciphers on;

ssl_session_cache shared:SSL:10m;
ssl_session_timeout 1d;

add_header Strict-Transport-Security "max-age=31536000" always;

location / {
proxy_pass http://intentusnet:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}

Authentication Integration

JWT Validation Middleware

from jose import jwt
from intentusnet.middleware import RouterMiddleware

class JWTAuthMiddleware(RouterMiddleware):
def __init__(self, jwks_url: str, audience: str):
self.jwks_url = jwks_url
self.audience = audience
self._jwks = self._fetch_jwks()

def before_route(self, env):
token = env.metadata.identityChain[0] if env.metadata.identityChain else None

if not token or not token.startswith("Bearer "):
raise UnauthorizedError("Missing or invalid token")

try:
payload = jwt.decode(
token[7:],
self._jwks,
algorithms=["RS256"],
audience=self.audience
)
# Add validated identity to context
env.context.authenticated_user = payload.get("sub")
env.context.roles = payload.get("roles", [])

except jwt.JWTError as e:
raise UnauthorizedError(f"Invalid token: {e}")

Identity Chain Propagation

# Add authenticated identity to chain
env.metadata.identityChain = [
f"user:{authenticated_user}",
f"service:api-gateway",
*env.metadata.identityChain
]

Policy Engine Configuration

Production Policy Example

from intentusnet.security import PolicyEngine, PolicyRule, PolicyAction

policies = PolicyEngine([
# Block unauthenticated requests
PolicyRule(
id="require-auth",
action=PolicyAction.DENY,
intents=["*"],
agents=["*"],
roles=[], # Empty = no roles = unauthenticated
tenants=["*"]
),

# Protect admin operations
PolicyRule(
id="admin-only",
action=PolicyAction.DENY,
intents=["Admin*", "Delete*", "Config*"],
agents=["*"],
roles=["user", "viewer"], # Non-admin roles
tenants=["*"]
),

# Rate limit by tenant
PolicyRule(
id="tenant-rate-limit",
action=PolicyAction.ALLOW,
intents=["*"],
agents=["*"],
roles=["*"],
tenants=["*"],
rate_limit_per_minute=1000,
rate_limit_key_template="{tenant}"
),

# Default allow for authenticated users
PolicyRule(
id="default-allow",
action=PolicyAction.ALLOW,
intents=["*"],
agents=["*"],
roles=["user", "admin"],
tenants=["*"]
),
])

EMCL Encryption

Key Management

import os
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from intentusnet.security import AESGCMEMCLProvider

def get_encryption_key() -> bytes:
"""Get encryption key from secure source."""
# Option 1: Environment variable (development)
# key_hex = os.environ.get("INTENTUSNET_ENCRYPTION_KEY")

# Option 2: HashiCorp Vault (production)
# client = hvac.Client(url=os.environ["VAULT_ADDR"])
# secret = client.secrets.kv.read_secret_version(path="intentusnet/encryption")
# key_hex = secret["data"]["data"]["key"]

# Option 3: AWS KMS
# kms = boto3.client("kms")
# response = kms.decrypt(CiphertextBlob=encrypted_key)
# key_hex = response["Plaintext"].hex()

return bytes.fromhex(key_hex)

# Create EMCL provider
emcl = AESGCMEMCLProvider(key=get_encryption_key())
runtime = IntentusRuntime(emcl_provider=emcl)

Key Rotation

def rotate_encryption_key():
"""Rotate EMCL encryption key."""
# 1. Generate new key
new_key = os.urandom(32)

# 2. Create new provider
new_emcl = AESGCMEMCLProvider(key=new_key)

# 3. Re-encrypt existing records (if needed)
# Note: Execution records are typically not re-encrypted
# as they should be immutable

# 4. Store new key in vault
store_key_in_vault(new_key)

# 5. Update runtime
runtime.update_emcl_provider(new_emcl)

Input Validation

Envelope Validation

from pydantic import BaseModel, validator
from typing import Optional, List, Dict, Any

class SecureEnvelope(BaseModel):
version: str
intent: dict
payload: Dict[str, Any]
context: dict
metadata: dict
routing: Optional[dict]

@validator('payload')
def validate_payload_size(cls, v):
size = len(json.dumps(v))
if size > 1024 * 1024: # 1MB limit
raise ValueError("Payload too large")
return v

@validator('payload')
def sanitize_payload(cls, v):
# Remove potentially dangerous fields
dangerous_keys = ['__proto__', 'constructor', 'prototype']
return {k: v for k, v in v.items() if k not in dangerous_keys}

Secrets Management

Environment Variable Security

# Never in code or logs
export INTENTUSNET_DB_PASSWORD="..."
export INTENTUSNET_API_KEY="..."

# Use secrets manager in production
export VAULT_ADDR="https://vault.example.com"
export AWS_SECRETS_MANAGER_ENDPOINT="..."

Secrets in Agents

class SecureAgent(BaseAgent):
def __init__(self, definition, router, secrets_client):
super().__init__(definition, router)
self._secrets = secrets_client

def handle_intent(self, env):
# Fetch secret at runtime, not stored in memory
api_key = self._secrets.get("external-api-key")

# Use secret
result = call_external_api(api_key, env.payload)

# Don't log or record secrets
return AgentResponse.success(
{"result": result}, # Never include api_key
agent=self.name
)

Audit Logging

Security Events

import logging

security_logger = logging.getLogger("intentusnet.security")

def log_security_event(event_type: str, details: dict):
security_logger.info(
f"Security event: {event_type}",
extra={
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
**details
}
)

# Log authentication
log_security_event("auth_success", {"user": user_id, "method": "jwt"})
log_security_event("auth_failure", {"reason": "invalid_token", "ip": client_ip})

# Log policy decisions
log_security_event("policy_deny", {"intent": intent_name, "rule": rule_id})

# Log sensitive operations
log_security_event("admin_action", {"action": "delete_user", "target": user_id})

Network Security

Firewall Rules

# Only allow HTTPS from load balancer
iptables -A INPUT -p tcp --dport 8080 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 8080 -j DROP

# Allow metrics scraping from Prometheus
iptables -A INPUT -p tcp --dport 9090 -s 10.0.1.0/24 -j ACCEPT

Service Mesh (Istio)

apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: intentusnet-policy
spec:
selector:
matchLabels:
app: intentusnet
rules:
- from:
- source:
principals: ["cluster.local/ns/default/sa/api-gateway"]
to:
- operation:
methods: ["POST"]
paths: ["/api/v1/intent"]

Security Checklist

Pre-Deployment

  • TLS configured with strong ciphers
  • Authentication integrated
  • Policy engine configured
  • Secrets in secure storage
  • Input validation enabled
  • Audit logging configured

Ongoing

  • Regular certificate rotation
  • Key rotation schedule
  • Security patches applied
  • Penetration testing
  • Access review
  • Log monitoring

See Also