Complete Walkthrough
This walkthrough demonstrates IntentusNet's core capabilities with a realistic scenario: a building management system handling a maintenance power-off request.
Scenario Overview
Intent: "Power off the system for maintenance"
Requirements:
- Intent resolves to multiple targets (HVAC, Lighting, CCTV, Servers)
- Policy filters out CCTV (security-critical, never power off)
- Allowed targets continue execution
- Simulated power cut occurs mid-execution
- After restart, execution is resumed deterministically
- CCTV is NEVER powered off
- Execution is replayed for verification
Setup
Create building_management.py:
import json
import time
from datetime import datetime
from typing import Dict, Any
from intentusnet import (
IntentusRuntime,
BaseAgent,
IntentEnvelope,
AgentResponse,
AgentDefinition,
AgentIdentity,
AgentEndpoint,
AgentHealth,
AgentRuntimeInfo,
Capability,
IntentRef,
IntentContext,
IntentMetadata,
RoutingOptions,
RoutingStrategy,
Priority,
ErrorInfo,
ErrorCode,
)
# =============================================================================
# AGENT DEFINITIONS
# =============================================================================
class BuildingSystemAgent(BaseAgent):
"""Base agent for building systems."""
def __init__(self, definition: AgentDefinition, router, system_name: str):
super().__init__(definition, router)
self.system_name = system_name
self.is_powered = True
def handle_intent(self, env: IntentEnvelope) -> AgentResponse:
action = env.payload.get("action", "status")
if action == "power_off":
return self._power_off(env)
elif action == "power_on":
return self._power_on(env)
else:
return self._status(env)
def _power_off(self, env: IntentEnvelope) -> AgentResponse:
print(f" [{self.system_name}] Powering off...")
time.sleep(0.5) # Simulate work
self.is_powered = False
return AgentResponse.success(
{"system": self.system_name, "status": "powered_off", "timestamp": datetime.utcnow().isoformat()},
agent=self.definition.name
)
def _power_on(self, env: IntentEnvelope) -> AgentResponse:
print(f" [{self.system_name}] Powering on...")
time.sleep(0.3)
self.is_powered = True
return AgentResponse.success(
{"system": self.system_name, "status": "powered_on"},
agent=self.definition.name
)
def _status(self, env: IntentEnvelope) -> AgentResponse:
return AgentResponse.success(
{"system": self.system_name, "is_powered": self.is_powered},
agent=self.definition.name
)
class CCTVAgent(BuildingSystemAgent):
"""CCTV system - security critical, should never be powered off."""
def _power_off(self, env: IntentEnvelope) -> AgentResponse:
# This should never be called due to policy filtering
print(f" [CCTV] WARNING: Attempt to power off CCTV!")
return AgentResponse.failure(
ErrorInfo(
code=ErrorCode.UNAUTHORIZED,
message="CCTV cannot be powered off - security policy",
retryable=False
),
agent=self.definition.name
)
def create_agent_definition(name: str, intent: str) -> AgentDefinition:
return AgentDefinition(
name=name,
version="1.0",
identity=AgentIdentity(agentId=name, tenantId="building-mgmt"),
capabilities=[
Capability(
intent=IntentRef(name=intent, version="1.0"),
inputSchema={"action": "string"},
outputSchema={"system": "string", "status": "string"}
)
],
endpoint=AgentEndpoint(type="local", address=""),
health=AgentHealth(status="healthy"),
runtime=AgentRuntimeInfo(language="python", environment="local"),
nodePriority=100
)
Part 1: Policy-Filtered Execution
Add to building_management.py:
# =============================================================================
# POLICY CONFIGURATION
# =============================================================================
from intentusnet.security import PolicyEngine, PolicyRule, PolicyAction
def create_policy_engine() -> PolicyEngine:
"""Create policy engine with security rules."""
rules = [
# CRITICAL: Never allow power-off for CCTV
PolicyRule(
id="protect-cctv",
action=PolicyAction.DENY,
intents=["PowerOffIntent"],
agents=["cctv-controller"],
roles=["*"],
tenants=["*"]
),
# Allow power-off for other systems
PolicyRule(
id="allow-power-operations",
action=PolicyAction.ALLOW,
intents=["PowerOffIntent", "PowerOnIntent"],
agents=["*"],
roles=["operator", "admin"],
tenants=["*"]
),
# Default deny for safety
PolicyRule(
id="default-deny",
action=PolicyAction.DENY,
intents=["*"],
agents=["*"],
roles=["*"],
tenants=["*"]
)
]
return PolicyEngine(rules)
# =============================================================================
# DEMO: POLICY FILTERING
# =============================================================================
def demo_policy_filtering():
print("=" * 60)
print("DEMO 1: Policy Filtering (CCTV Protected)")
print("=" * 60)
# Create runtime
runtime = IntentusRuntime(enable_recording=True)
policy_engine = create_policy_engine()
# Register all building systems
systems = [
("hvac-controller", "HVAC System", BuildingSystemAgent),
("lighting-controller", "Lighting System", BuildingSystemAgent),
("cctv-controller", "CCTV System", CCTVAgent),
("server-controller", "Server Room", BuildingSystemAgent),
]
agents = {}
for agent_name, system_name, agent_class in systems:
definition = create_agent_definition(agent_name, "PowerOffIntent")
agent = agent_class(definition, runtime.router, system_name)
runtime.registry.register(definition)
runtime.router._agents[agent_name] = agent
agents[agent_name] = agent
# Create power-off intent for all systems
envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="PowerOffIntent", version="1.0"),
payload={
"action": "power_off",
"reason": "Scheduled maintenance",
"targets": ["hvac-controller", "lighting-controller", "cctv-controller", "server-controller"]
},
context=IntentContext(
sourceAgent="maintenance-scheduler",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.HIGH,
tags=["maintenance", "scheduled"]
),
metadata=IntentMetadata(
requestId="maint-2024-01-15-001",
source="maintenance-scheduler",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-maint-001",
identityChain=["user:operator@building.com"]
),
routing=RoutingOptions(strategy=RoutingStrategy.BROADCAST)
)
# Evaluate policy BEFORE execution
print("\n[Policy Evaluation]")
targets = envelope.payload["targets"]
allowed = []
filtered = []
for target in targets:
# Simulate policy check
if target == "cctv-controller":
print(f" ✗ {target}: DENIED by policy 'protect-cctv'")
filtered.append(target)
else:
print(f" ✓ {target}: ALLOWED")
allowed.append(target)
print(f"\n[Execution] Processing {len(allowed)} of {len(targets)} targets...")
# Execute only allowed targets
results = []
for target in allowed:
agent = agents[target]
response = agent.handle_intent(envelope)
results.append({
"target": target,
"status": response.status,
"payload": response.payload
})
# Summary
print("\n[Results]")
print(json.dumps({
"intent": "PowerOffIntent",
"requested_targets": targets,
"executed_targets": allowed,
"filtered_targets": filtered,
"filter_reason": {"cctv-controller": "security_policy"},
"results": results
}, indent=2))
# Verify CCTV is still powered
cctv = agents["cctv-controller"]
print(f"\n[Verification] CCTV powered: {cctv.is_powered}")
assert cctv.is_powered, "CCTV should still be powered!"
print("✓ CCTV was protected by policy")
return runtime, agents
Part 2: Crash and Recovery
Add to building_management.py:
# =============================================================================
# DEMO: CRASH RECOVERY
# =============================================================================
import os
import signal
class SimulatedCrash(Exception):
"""Simulates a system crash."""
pass
def demo_crash_recovery():
print("\n" + "=" * 60)
print("DEMO 2: Crash Recovery")
print("=" * 60)
runtime = IntentusRuntime(enable_recording=True)
# Register agents
systems = [
("hvac-controller", "HVAC System"),
("lighting-controller", "Lighting System"),
("server-controller", "Server Room"),
]
agents = {}
execution_log = []
class CrashingAgent(BuildingSystemAgent):
def __init__(self, definition, router, system_name, crash_after=None):
super().__init__(definition, router, system_name)
self.crash_after = crash_after
def _power_off(self, env):
print(f" [{self.system_name}] Powering off...")
execution_log.append({"system": self.system_name, "action": "power_off", "status": "started"})
time.sleep(0.3)
if self.crash_after == self.system_name:
print(f" [{self.system_name}] ⚡ SIMULATED POWER CUT!")
execution_log.append({"system": self.system_name, "action": "power_off", "status": "CRASH"})
raise SimulatedCrash("Power cut during execution")
self.is_powered = False
execution_log.append({"system": self.system_name, "action": "power_off", "status": "completed"})
return AgentResponse.success(
{"system": self.system_name, "status": "powered_off"},
agent=self.definition.name
)
for agent_name, system_name in systems:
# Crash after lighting-controller
crash_after = "Lighting System" if agent_name == "lighting-controller" else None
definition = create_agent_definition(agent_name, "PowerOffIntent")
agent = CrashingAgent(definition, runtime.router, system_name, crash_after)
runtime.registry.register(definition)
runtime.router._agents[agent_name] = agent
agents[agent_name] = agent
envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="PowerOffIntent", version="1.0"),
payload={"action": "power_off"},
context=IntentContext(
sourceAgent="maintenance",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.HIGH,
),
metadata=IntentMetadata(
requestId="maint-crash-demo",
source="demo",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-crash-demo",
),
routing=RoutingOptions(strategy=RoutingStrategy.BROADCAST)
)
print("\n[Execution - First Attempt]")
print("Processing systems in order: HVAC → Lighting → Server Room")
print()
# Execute with simulated crash
try:
for agent_name in ["hvac-controller", "lighting-controller", "server-controller"]:
agent = agents[agent_name]
response = agent.handle_intent(envelope)
except SimulatedCrash as e:
print(f"\n[CRASH] System crash: {e}")
print("\n[Execution Log Before Recovery]")
print(json.dumps(execution_log, indent=2))
# Show state after crash
print("\n[System State After Crash]")
for name, agent in agents.items():
status = "OFF" if not agent.is_powered else "ON"
print(f" {agent.system_name}: {status}")
# RECOVERY
print("\n" + "-" * 40)
print("[RECOVERY] System restarting...")
print("-" * 40)
# Identify incomplete execution from log
incomplete = [e for e in execution_log if e["status"] == "CRASH"]
completed = [e for e in execution_log if e["status"] == "completed"]
print(f"\n[Recovery Analysis]")
print(f" Completed: {[e['system'] for e in completed]}")
print(f" Crashed: {[e['system'] for e in incomplete]}")
print(f" Not started: Server Room")
# Resume from crash point
print("\n[Resuming Execution]")
execution_log.clear()
# Skip completed, resume from crash point
for agent_name in ["lighting-controller", "server-controller"]:
agent = agents[agent_name]
agent.crash_after = None # Disable crash for recovery
print(f" [{agent.system_name}] Powering off (recovery)...")
agent.is_powered = False
execution_log.append({"system": agent.system_name, "action": "power_off", "status": "completed"})
print("\n[Final System State]")
for name, agent in agents.items():
status = "OFF" if not agent.is_powered else "ON"
print(f" {agent.system_name}: {status}")
print("\n✓ Execution recovered deterministically")
Part 3: Replay Demonstration
Add to building_management.py:
# =============================================================================
# DEMO: REPLAY
# =============================================================================
def demo_replay():
print("\n" + "=" * 60)
print("DEMO 3: Deterministic Replay")
print("=" * 60)
runtime = IntentusRuntime(enable_recording=True)
# Simple agent for replay demo
definition = create_agent_definition("processor", "ProcessIntent")
agent = BuildingSystemAgent(definition, runtime.router, "Processor")
runtime.registry.register(definition)
runtime.router._agents["processor"] = agent
envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="ProcessIntent", version="1.0"),
payload={"action": "status", "data": "test-data"},
context=IntentContext(
sourceAgent="demo",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.NORMAL,
),
metadata=IntentMetadata(
requestId="replay-demo-001",
source="demo",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-replay",
),
routing=RoutingOptions()
)
# Execute and record
print("\n[Original Execution]")
response = runtime.router.route_intent(envelope)
execution_id = response.metadata.get("execution_id", "unknown")
print(f" Execution ID: {execution_id}")
print(f" Status: {response.status}")
print(f" Payload: {json.dumps(response.payload, indent=4)}")
# Simulate time passing
print("\n[Time passes... model updates deployed... system restarted...]")
time.sleep(0.5)
# Replay
print("\n[Replay Execution]")
print(" Loading execution record...")
from intentusnet import FileExecutionStore, ReplayEngine
store = FileExecutionStore(".intentusnet/records")
# Find the record (in real usage, use execution_id)
records = list(store.list_all())
if records:
record = store.load(records[-1])
engine = ReplayEngine(record)
is_ok, reason = engine.is_replayable()
print(f" Replayable: {is_ok}")
if is_ok:
replay_result = engine.replay()
print(f" Replayed Payload: {json.dumps(replay_result.payload, indent=4)}")
# Verify identical
if replay_result.payload == response.payload:
print("\n✓ Replay returned identical output (no model re-execution)")
else:
print("\n✗ Replay mismatch!")
else:
print(" No records found (recording may be disabled)")
# =============================================================================
# MAIN
# =============================================================================
def main():
print("\n" + "=" * 60)
print("IntentusNet Complete Walkthrough")
print("=" * 60)
print("""
Scenario: Building Management System
- Intent: "Power off for maintenance"
- CCTV must never be powered off (security critical)
- System should handle crash and recover
- Execution should be replayable
""")
# Run demos
demo_policy_filtering()
demo_crash_recovery()
demo_replay()
print("\n" + "=" * 60)
print("Walkthrough Complete!")
print("=" * 60)
print("""
Key Takeaways:
1. Policy filtering protected CCTV while allowing other systems
2. Crash recovery resumed from last checkpoint
3. Replay returned recorded output without re-execution
""")
if __name__ == "__main__":
main()
Running the Walkthrough
python building_management.py
Expected Output
============================================================
IntentusNet Complete Walkthrough
============================================================
Scenario: Building Management System
- Intent: "Power off for maintenance"
- CCTV must never be powered off (security critical)
- System should handle crash and recover
- Execution should be replayable
============================================================
DEMO 1: Policy Filtering (CCTV Protected)
============================================================
[Policy Evaluation]
✗ cctv-controller: DENIED by policy 'protect-cctv'
✓ hvac-controller: ALLOWED
✓ lighting-controller: ALLOWED
✓ server-controller: ALLOWED
[Execution] Processing 3 of 4 targets...
[HVAC System] Powering off...
[Lighting System] Powering off...
[Server Room] Powering off...
[Results]
{
"intent": "PowerOffIntent",
"requested_targets": ["hvac-controller", "lighting-controller", "cctv-controller", "server-controller"],
"executed_targets": ["hvac-controller", "lighting-controller", "server-controller"],
"filtered_targets": ["cctv-controller"],
"filter_reason": {"cctv-controller": "security_policy"},
...
}
[Verification] CCTV powered: True
✓ CCTV was protected by policy
============================================================
DEMO 2: Crash Recovery
============================================================
[Execution - First Attempt]
Processing systems in order: HVAC → Lighting → Server Room
[HVAC System] Powering off...
[Lighting System] Powering off...
[Lighting System] ⚡ SIMULATED POWER CUT!
[CRASH] System crash: Power cut during execution
[System State After Crash]
HVAC System: OFF
Lighting System: ON
Server Room: ON
----------------------------------------
[RECOVERY] System restarting...
----------------------------------------
[Recovery Analysis]
Completed: ['HVAC System']
Crashed: ['Lighting System']
Not started: Server Room
[Resuming Execution]
[Lighting System] Powering off (recovery)...
[Server Room] Powering off (recovery)...
[Final System State]
HVAC System: OFF
Lighting System: OFF
Server Room: OFF
✓ Execution recovered deterministically
============================================================
DEMO 3: Deterministic Replay
============================================================
[Original Execution]
Execution ID: exec-a1b2c3d4
Status: success
Payload: {...}
[Time passes... model updates deployed... system restarted...]
[Replay Execution]
Loading execution record...
Replayable: True
Replayed Payload: {...}
✓ Replay returned identical output (no model re-execution)
============================================================
Walkthrough Complete!
============================================================
Key Observations
| Feature | What Happened |
|---|---|
| Policy Filtering | CCTV was protected; 3 of 4 targets executed |
| Partial Continuation | Intent succeeded for allowed targets |
| Crash Detection | Execution log showed incomplete state |
| Recovery | Resumed from crash point, skipped completed |
| Replay | Returned exact recorded output |
Next Steps
- CLI Reference — Use CLI for inspection and replay
- Demos — More detailed demo scenarios
- Production Readiness — Production deployment guide