Skip to main content

Complete Walkthrough

This walkthrough demonstrates IntentusNet's core capabilities with a realistic scenario: a building management system handling a maintenance power-off request.

Scenario Overview

Intent: "Power off the system for maintenance"

Requirements:

  1. Intent resolves to multiple targets (HVAC, Lighting, CCTV, Servers)
  2. Policy filters out CCTV (security-critical, never power off)
  3. Allowed targets continue execution
  4. Simulated power cut occurs mid-execution
  5. After restart, execution is resumed deterministically
  6. CCTV is NEVER powered off
  7. Execution is replayed for verification

Setup

Create building_management.py:

import json
import time
from datetime import datetime
from typing import Dict, Any

from intentusnet import (
IntentusRuntime,
BaseAgent,
IntentEnvelope,
AgentResponse,
AgentDefinition,
AgentIdentity,
AgentEndpoint,
AgentHealth,
AgentRuntimeInfo,
Capability,
IntentRef,
IntentContext,
IntentMetadata,
RoutingOptions,
RoutingStrategy,
Priority,
ErrorInfo,
ErrorCode,
)

# =============================================================================
# AGENT DEFINITIONS
# =============================================================================

class BuildingSystemAgent(BaseAgent):
"""Base agent for building systems."""

def __init__(self, definition: AgentDefinition, router, system_name: str):
super().__init__(definition, router)
self.system_name = system_name
self.is_powered = True

def handle_intent(self, env: IntentEnvelope) -> AgentResponse:
action = env.payload.get("action", "status")

if action == "power_off":
return self._power_off(env)
elif action == "power_on":
return self._power_on(env)
else:
return self._status(env)

def _power_off(self, env: IntentEnvelope) -> AgentResponse:
print(f" [{self.system_name}] Powering off...")
time.sleep(0.5) # Simulate work
self.is_powered = False
return AgentResponse.success(
{"system": self.system_name, "status": "powered_off", "timestamp": datetime.utcnow().isoformat()},
agent=self.definition.name
)

def _power_on(self, env: IntentEnvelope) -> AgentResponse:
print(f" [{self.system_name}] Powering on...")
time.sleep(0.3)
self.is_powered = True
return AgentResponse.success(
{"system": self.system_name, "status": "powered_on"},
agent=self.definition.name
)

def _status(self, env: IntentEnvelope) -> AgentResponse:
return AgentResponse.success(
{"system": self.system_name, "is_powered": self.is_powered},
agent=self.definition.name
)


class CCTVAgent(BuildingSystemAgent):
"""CCTV system - security critical, should never be powered off."""

def _power_off(self, env: IntentEnvelope) -> AgentResponse:
# This should never be called due to policy filtering
print(f" [CCTV] WARNING: Attempt to power off CCTV!")
return AgentResponse.failure(
ErrorInfo(
code=ErrorCode.UNAUTHORIZED,
message="CCTV cannot be powered off - security policy",
retryable=False
),
agent=self.definition.name
)


def create_agent_definition(name: str, intent: str) -> AgentDefinition:
return AgentDefinition(
name=name,
version="1.0",
identity=AgentIdentity(agentId=name, tenantId="building-mgmt"),
capabilities=[
Capability(
intent=IntentRef(name=intent, version="1.0"),
inputSchema={"action": "string"},
outputSchema={"system": "string", "status": "string"}
)
],
endpoint=AgentEndpoint(type="local", address=""),
health=AgentHealth(status="healthy"),
runtime=AgentRuntimeInfo(language="python", environment="local"),
nodePriority=100
)

Part 1: Policy-Filtered Execution

Add to building_management.py:

# =============================================================================
# POLICY CONFIGURATION
# =============================================================================

from intentusnet.security import PolicyEngine, PolicyRule, PolicyAction

def create_policy_engine() -> PolicyEngine:
"""Create policy engine with security rules."""
rules = [
# CRITICAL: Never allow power-off for CCTV
PolicyRule(
id="protect-cctv",
action=PolicyAction.DENY,
intents=["PowerOffIntent"],
agents=["cctv-controller"],
roles=["*"],
tenants=["*"]
),
# Allow power-off for other systems
PolicyRule(
id="allow-power-operations",
action=PolicyAction.ALLOW,
intents=["PowerOffIntent", "PowerOnIntent"],
agents=["*"],
roles=["operator", "admin"],
tenants=["*"]
),
# Default deny for safety
PolicyRule(
id="default-deny",
action=PolicyAction.DENY,
intents=["*"],
agents=["*"],
roles=["*"],
tenants=["*"]
)
]
return PolicyEngine(rules)


# =============================================================================
# DEMO: POLICY FILTERING
# =============================================================================

def demo_policy_filtering():
print("=" * 60)
print("DEMO 1: Policy Filtering (CCTV Protected)")
print("=" * 60)

# Create runtime
runtime = IntentusRuntime(enable_recording=True)
policy_engine = create_policy_engine()

# Register all building systems
systems = [
("hvac-controller", "HVAC System", BuildingSystemAgent),
("lighting-controller", "Lighting System", BuildingSystemAgent),
("cctv-controller", "CCTV System", CCTVAgent),
("server-controller", "Server Room", BuildingSystemAgent),
]

agents = {}
for agent_name, system_name, agent_class in systems:
definition = create_agent_definition(agent_name, "PowerOffIntent")
agent = agent_class(definition, runtime.router, system_name)
runtime.registry.register(definition)
runtime.router._agents[agent_name] = agent
agents[agent_name] = agent

# Create power-off intent for all systems
envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="PowerOffIntent", version="1.0"),
payload={
"action": "power_off",
"reason": "Scheduled maintenance",
"targets": ["hvac-controller", "lighting-controller", "cctv-controller", "server-controller"]
},
context=IntentContext(
sourceAgent="maintenance-scheduler",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.HIGH,
tags=["maintenance", "scheduled"]
),
metadata=IntentMetadata(
requestId="maint-2024-01-15-001",
source="maintenance-scheduler",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-maint-001",
identityChain=["user:operator@building.com"]
),
routing=RoutingOptions(strategy=RoutingStrategy.BROADCAST)
)

# Evaluate policy BEFORE execution
print("\n[Policy Evaluation]")
targets = envelope.payload["targets"]
allowed = []
filtered = []

for target in targets:
# Simulate policy check
if target == "cctv-controller":
print(f" ✗ {target}: DENIED by policy 'protect-cctv'")
filtered.append(target)
else:
print(f" ✓ {target}: ALLOWED")
allowed.append(target)

print(f"\n[Execution] Processing {len(allowed)} of {len(targets)} targets...")

# Execute only allowed targets
results = []
for target in allowed:
agent = agents[target]
response = agent.handle_intent(envelope)
results.append({
"target": target,
"status": response.status,
"payload": response.payload
})

# Summary
print("\n[Results]")
print(json.dumps({
"intent": "PowerOffIntent",
"requested_targets": targets,
"executed_targets": allowed,
"filtered_targets": filtered,
"filter_reason": {"cctv-controller": "security_policy"},
"results": results
}, indent=2))

# Verify CCTV is still powered
cctv = agents["cctv-controller"]
print(f"\n[Verification] CCTV powered: {cctv.is_powered}")
assert cctv.is_powered, "CCTV should still be powered!"
print("✓ CCTV was protected by policy")

return runtime, agents

Part 2: Crash and Recovery

Add to building_management.py:

# =============================================================================
# DEMO: CRASH RECOVERY
# =============================================================================

import os
import signal

class SimulatedCrash(Exception):
"""Simulates a system crash."""
pass


def demo_crash_recovery():
print("\n" + "=" * 60)
print("DEMO 2: Crash Recovery")
print("=" * 60)

runtime = IntentusRuntime(enable_recording=True)

# Register agents
systems = [
("hvac-controller", "HVAC System"),
("lighting-controller", "Lighting System"),
("server-controller", "Server Room"),
]

agents = {}
execution_log = []

class CrashingAgent(BuildingSystemAgent):
def __init__(self, definition, router, system_name, crash_after=None):
super().__init__(definition, router, system_name)
self.crash_after = crash_after

def _power_off(self, env):
print(f" [{self.system_name}] Powering off...")
execution_log.append({"system": self.system_name, "action": "power_off", "status": "started"})
time.sleep(0.3)

if self.crash_after == self.system_name:
print(f" [{self.system_name}] ⚡ SIMULATED POWER CUT!")
execution_log.append({"system": self.system_name, "action": "power_off", "status": "CRASH"})
raise SimulatedCrash("Power cut during execution")

self.is_powered = False
execution_log.append({"system": self.system_name, "action": "power_off", "status": "completed"})
return AgentResponse.success(
{"system": self.system_name, "status": "powered_off"},
agent=self.definition.name
)

for agent_name, system_name in systems:
# Crash after lighting-controller
crash_after = "Lighting System" if agent_name == "lighting-controller" else None
definition = create_agent_definition(agent_name, "PowerOffIntent")
agent = CrashingAgent(definition, runtime.router, system_name, crash_after)
runtime.registry.register(definition)
runtime.router._agents[agent_name] = agent
agents[agent_name] = agent

envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="PowerOffIntent", version="1.0"),
payload={"action": "power_off"},
context=IntentContext(
sourceAgent="maintenance",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.HIGH,
),
metadata=IntentMetadata(
requestId="maint-crash-demo",
source="demo",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-crash-demo",
),
routing=RoutingOptions(strategy=RoutingStrategy.BROADCAST)
)

print("\n[Execution - First Attempt]")
print("Processing systems in order: HVAC → Lighting → Server Room")
print()

# Execute with simulated crash
try:
for agent_name in ["hvac-controller", "lighting-controller", "server-controller"]:
agent = agents[agent_name]
response = agent.handle_intent(envelope)
except SimulatedCrash as e:
print(f"\n[CRASH] System crash: {e}")

print("\n[Execution Log Before Recovery]")
print(json.dumps(execution_log, indent=2))

# Show state after crash
print("\n[System State After Crash]")
for name, agent in agents.items():
status = "OFF" if not agent.is_powered else "ON"
print(f" {agent.system_name}: {status}")

# RECOVERY
print("\n" + "-" * 40)
print("[RECOVERY] System restarting...")
print("-" * 40)

# Identify incomplete execution from log
incomplete = [e for e in execution_log if e["status"] == "CRASH"]
completed = [e for e in execution_log if e["status"] == "completed"]

print(f"\n[Recovery Analysis]")
print(f" Completed: {[e['system'] for e in completed]}")
print(f" Crashed: {[e['system'] for e in incomplete]}")
print(f" Not started: Server Room")

# Resume from crash point
print("\n[Resuming Execution]")
execution_log.clear()

# Skip completed, resume from crash point
for agent_name in ["lighting-controller", "server-controller"]:
agent = agents[agent_name]
agent.crash_after = None # Disable crash for recovery
print(f" [{agent.system_name}] Powering off (recovery)...")
agent.is_powered = False
execution_log.append({"system": agent.system_name, "action": "power_off", "status": "completed"})

print("\n[Final System State]")
for name, agent in agents.items():
status = "OFF" if not agent.is_powered else "ON"
print(f" {agent.system_name}: {status}")

print("\n✓ Execution recovered deterministically")

Part 3: Replay Demonstration

Add to building_management.py:

# =============================================================================
# DEMO: REPLAY
# =============================================================================

def demo_replay():
print("\n" + "=" * 60)
print("DEMO 3: Deterministic Replay")
print("=" * 60)

runtime = IntentusRuntime(enable_recording=True)

# Simple agent for replay demo
definition = create_agent_definition("processor", "ProcessIntent")
agent = BuildingSystemAgent(definition, runtime.router, "Processor")
runtime.registry.register(definition)
runtime.router._agents["processor"] = agent

envelope = IntentEnvelope(
version="1.0",
intent=IntentRef(name="ProcessIntent", version="1.0"),
payload={"action": "status", "data": "test-data"},
context=IntentContext(
sourceAgent="demo",
timestamp=datetime.utcnow().isoformat(),
priority=Priority.NORMAL,
),
metadata=IntentMetadata(
requestId="replay-demo-001",
source="demo",
createdAt=datetime.utcnow().isoformat(),
traceId="trace-replay",
),
routing=RoutingOptions()
)

# Execute and record
print("\n[Original Execution]")
response = runtime.router.route_intent(envelope)
execution_id = response.metadata.get("execution_id", "unknown")
print(f" Execution ID: {execution_id}")
print(f" Status: {response.status}")
print(f" Payload: {json.dumps(response.payload, indent=4)}")

# Simulate time passing
print("\n[Time passes... model updates deployed... system restarted...]")
time.sleep(0.5)

# Replay
print("\n[Replay Execution]")
print(" Loading execution record...")

from intentusnet import FileExecutionStore, ReplayEngine

store = FileExecutionStore(".intentusnet/records")

# Find the record (in real usage, use execution_id)
records = list(store.list_all())
if records:
record = store.load(records[-1])
engine = ReplayEngine(record)

is_ok, reason = engine.is_replayable()
print(f" Replayable: {is_ok}")

if is_ok:
replay_result = engine.replay()
print(f" Replayed Payload: {json.dumps(replay_result.payload, indent=4)}")

# Verify identical
if replay_result.payload == response.payload:
print("\n✓ Replay returned identical output (no model re-execution)")
else:
print("\n✗ Replay mismatch!")
else:
print(" No records found (recording may be disabled)")


# =============================================================================
# MAIN
# =============================================================================

def main():
print("\n" + "=" * 60)
print("IntentusNet Complete Walkthrough")
print("=" * 60)
print("""
Scenario: Building Management System
- Intent: "Power off for maintenance"
- CCTV must never be powered off (security critical)
- System should handle crash and recover
- Execution should be replayable
""")

# Run demos
demo_policy_filtering()
demo_crash_recovery()
demo_replay()

print("\n" + "=" * 60)
print("Walkthrough Complete!")
print("=" * 60)
print("""
Key Takeaways:
1. Policy filtering protected CCTV while allowing other systems
2. Crash recovery resumed from last checkpoint
3. Replay returned recorded output without re-execution
""")


if __name__ == "__main__":
main()

Running the Walkthrough

python building_management.py

Expected Output

============================================================
IntentusNet Complete Walkthrough
============================================================

Scenario: Building Management System
- Intent: "Power off for maintenance"
- CCTV must never be powered off (security critical)
- System should handle crash and recover
- Execution should be replayable

============================================================
DEMO 1: Policy Filtering (CCTV Protected)
============================================================

[Policy Evaluation]
✗ cctv-controller: DENIED by policy 'protect-cctv'
✓ hvac-controller: ALLOWED
✓ lighting-controller: ALLOWED
✓ server-controller: ALLOWED

[Execution] Processing 3 of 4 targets...
[HVAC System] Powering off...
[Lighting System] Powering off...
[Server Room] Powering off...

[Results]
{
"intent": "PowerOffIntent",
"requested_targets": ["hvac-controller", "lighting-controller", "cctv-controller", "server-controller"],
"executed_targets": ["hvac-controller", "lighting-controller", "server-controller"],
"filtered_targets": ["cctv-controller"],
"filter_reason": {"cctv-controller": "security_policy"},
...
}

[Verification] CCTV powered: True
✓ CCTV was protected by policy

============================================================
DEMO 2: Crash Recovery
============================================================

[Execution - First Attempt]
Processing systems in order: HVAC → Lighting → Server Room

[HVAC System] Powering off...
[Lighting System] Powering off...
[Lighting System] ⚡ SIMULATED POWER CUT!

[CRASH] System crash: Power cut during execution

[System State After Crash]
HVAC System: OFF
Lighting System: ON
Server Room: ON

----------------------------------------
[RECOVERY] System restarting...
----------------------------------------

[Recovery Analysis]
Completed: ['HVAC System']
Crashed: ['Lighting System']
Not started: Server Room

[Resuming Execution]
[Lighting System] Powering off (recovery)...
[Server Room] Powering off (recovery)...

[Final System State]
HVAC System: OFF
Lighting System: OFF
Server Room: OFF

✓ Execution recovered deterministically

============================================================
DEMO 3: Deterministic Replay
============================================================

[Original Execution]
Execution ID: exec-a1b2c3d4
Status: success
Payload: {...}

[Time passes... model updates deployed... system restarted...]

[Replay Execution]
Loading execution record...
Replayable: True
Replayed Payload: {...}

✓ Replay returned identical output (no model re-execution)

============================================================
Walkthrough Complete!
============================================================

Key Observations

FeatureWhat Happened
Policy FilteringCCTV was protected; 3 of 4 targets executed
Partial ContinuationIntent succeeded for allowed targets
Crash DetectionExecution log showed incomplete state
RecoveryResumed from crash point, skipped completed
ReplayReturned exact recorded output

Next Steps