Skip to main content

Crash Safety Internals

This document provides an in-depth look at how IntentusNet implements crash-safe execution through recording, checkpointing, and recovery mechanisms.

Recording Architecture

Execution Record Structure

Every execution produces an ExecutionRecord:

@dataclass
class ExecutionRecord:
header: ExecutionHeader
envelope: Dict[str, Any]
routerDecision: Optional[Dict[str, Any]]
events: List[ExecutionEvent]
finalResponse: Optional[Dict[str, Any]]

Header Details

@dataclass
class ExecutionHeader:
executionId: str # Format: "exec-{16-hex-chars}"
createdUtcIso: str # ISO 8601 timestamp
envelopeHash: str # Format: "sha256:{64-hex-chars}"
replayable: bool # Safe to replay
replayableReason: Optional[str] # If not replayable, why

Envelope Hash Computation

The hash ensures envelope integrity:

import hashlib
import json

def compute_envelope_hash(envelope: dict) -> str:
# Canonical JSON: sorted keys, minimal separators
canonical = json.dumps(
envelope,
sort_keys=True,
separators=(',', ':'),
ensure_ascii=True
)
hash_bytes = hashlib.sha256(canonical.encode('utf-8')).hexdigest()
return f"sha256:{hash_bytes}"

Canonicalization rules:

  • Keys sorted alphabetically at all levels
  • No whitespace (separators=(',', ':'))
  • ASCII encoding (Unicode escaped)
  • Deterministic float formatting

Event Recording

Deterministic Clock

Events use sequence numbers, not wall-clock time:

class DeterministicClock:
def __init__(self):
self._seq = 0
self._lock = threading.Lock()

def next(self) -> int:
with self._lock:
self._seq += 1
return self._seq

def current(self) -> int:
return self._seq

Why sequence numbers?

  • Wall-clock can drift or go backwards
  • Sequence provides total ordering
  • Deterministic on replay

Event Types

class EventType(Enum):
INTENT_RECEIVED = "INTENT_RECEIVED"
AGENT_ATTEMPT_START = "AGENT_ATTEMPT_START"
AGENT_ATTEMPT_END = "AGENT_ATTEMPT_END"
FALLBACK_TRIGGERED = "FALLBACK_TRIGGERED"
ROUTER_DECISION = "ROUTER_DECISION"
FINAL_RESPONSE = "FINAL_RESPONSE"

Event Structure

@dataclass
class ExecutionEvent:
seq: int # Sequence number from clock
type: EventType # Event type
payload: Dict[str, Any] # Event-specific data
timestamp: Optional[str] # Wall-clock for debugging (not for ordering)

Flush Boundaries

Events are flushed to the recorder at specific boundaries:

Flush semantics:

  • Each event type triggers immediate flush to buffer
  • FINAL_RESPONSE triggers persist to storage
  • Crash between flushes loses at most one event

In-Memory Recorder

class InMemoryExecutionRecorder:
def __init__(self, clock: DeterministicClock):
self._clock = clock
self._events: List[ExecutionEvent] = []
self._header: Optional[ExecutionHeader] = None
self._envelope: Optional[dict] = None
self._finalized = False

def start(self, envelope: dict) -> str:
"""Start recording, return execution ID."""
execution_id = generate_execution_id()
self._header = ExecutionHeader(
executionId=execution_id,
createdUtcIso=datetime.utcnow().isoformat() + "Z",
envelopeHash=compute_envelope_hash(envelope),
replayable=True,
replayableReason=None
)
self._envelope = envelope
return execution_id

def record_event(self, event_type: EventType, payload: dict) -> None:
"""Record an event with next sequence number."""
if self._finalized:
raise RecorderError("Cannot record after finalization")

event = ExecutionEvent(
seq=self._clock.next(),
type=event_type,
payload=payload,
timestamp=datetime.utcnow().isoformat() + "Z"
)
self._events.append(event)

def finalize(self, final_response: dict) -> ExecutionRecord:
"""Finalize and return the complete record."""
self._finalized = True
return ExecutionRecord(
header=self._header,
envelope=self._envelope,
routerDecision=self._extract_router_decision(),
events=self._events,
finalResponse=final_response
)

def mark_not_replayable(self, reason: str) -> None:
"""Mark record as not replayable."""
if self._header:
self._header.replayable = False
self._header.replayableReason = reason

File-Based Persistence

FileExecutionStore

class FileExecutionStore:
def __init__(self, base_path: str = ".intentusnet/records"):
self._base_path = Path(base_path)
self._base_path.mkdir(parents=True, exist_ok=True)

def save(self, record: ExecutionRecord) -> str:
"""Save record atomically."""
path = self._base_path / f"{record.header.executionId}.json"
temp_path = path.with_suffix('.tmp')

# Write to temp file
with open(temp_path, 'w') as f:
json.dump(record.to_dict(), f, indent=2)

# Atomic rename (POSIX guarantee)
temp_path.rename(path)

return str(path)

def load(self, execution_id: str) -> ExecutionRecord:
"""Load record by ID."""
path = self._base_path / f"{execution_id}.json"
if not path.exists():
raise RecordNotFoundError(execution_id)

with open(path, 'r') as f:
data = json.load(f)

return ExecutionRecord.from_dict(data)

def list_all(self) -> Iterator[str]:
"""List all execution IDs."""
for path in self._base_path.glob("exec-*.json"):
yield path.stem

Atomic Write Pattern

def atomic_write(path: Path, data: bytes) -> None:
"""Write data atomically using rename."""
temp_path = path.with_suffix('.tmp.' + secrets.token_hex(4))

try:
# Write to temp file
with open(temp_path, 'wb') as f:
f.write(data)
f.flush()
os.fsync(f.fileno()) # Force to disk

# Atomic rename
temp_path.rename(path)

finally:
# Clean up temp file if rename failed
if temp_path.exists():
temp_path.unlink()

Why atomic writes?

  • Crash during write doesn't corrupt existing file
  • File either fully exists or doesn't exist
  • No partial writes visible

Recovery Process

Detecting Incomplete Executions

def detect_incomplete(store: FileExecutionStore) -> List[ExecutionRecord]:
"""Find executions that crashed mid-way."""
incomplete = []
for exec_id in store.list_all():
record = store.load(exec_id)
if not record.header.replayable:
if record.header.replayableReason == "execution_incomplete":
incomplete.append(record)
return incomplete

Analyzing Crash Point

def analyze_crash_point(record: ExecutionRecord) -> dict:
"""Determine where execution crashed."""
events = record.events

if not events:
return {"stage": "before_start", "recovery": "safe_to_retry"}

last_event = events[-1]

if last_event.type == EventType.INTENT_RECEIVED:
return {
"stage": "after_receive",
"recovery": "safe_to_retry"
}

if last_event.type == EventType.AGENT_ATTEMPT_START:
return {
"stage": "during_agent_execution",
"agent": last_event.payload.get("agent"),
"recovery": "check_agent_idempotency"
}

if last_event.type == EventType.AGENT_ATTEMPT_END:
status = last_event.payload.get("status")
if status == "success":
return {
"stage": "after_success",
"recovery": "response_may_be_lost"
}
else:
return {
"stage": "after_failure",
"recovery": "retry_next_agent"
}

return {"stage": "unknown", "recovery": "manual_inspection"}

Design Goal: WAL-Backed Recording

Future Enhancement

The following describes a planned WAL (Write-Ahead Log) based persistence layer for stronger durability guarantees during execution.

WAL Concept

WAL benefits:

  • Events durable before acknowledging
  • Recovery from any point in WAL
  • Reduced write amplification

Planned WAL Structure

.intentusnet/
├── wal/
│ ├── 00000001.wal
│ ├── 00000002.wal
│ └── current -> 00000002.wal
├── snapshots/
│ └── exec-a1b2c3d4.json
└── checkpoint

WAL Entry Format

[length:4][type:1][sequence:8][crc32:4][payload:variable]

Best Practices

Agent Idempotency

Design agents to handle replay safely:

class IdempotentAgent(BaseAgent):
def handle_intent(self, env: IntentEnvelope) -> AgentResponse:
request_id = env.metadata.requestId

# Check if already processed
existing = self.cache.get(request_id)
if existing:
return existing

# Process
result = self.process(env.payload)

# Cache before returning
response = AgentResponse.success(result, agent=self.name)
self.cache.set(request_id, response, ttl=3600)

return response

Handling Side Effects

Record side effects for recovery:

def process_with_side_effects(self, env: IntentEnvelope) -> AgentResponse:
request_id = env.metadata.requestId

# Record intention before side effect
self.record_intention(request_id, "send_email", env.payload["email"])

try:
# Perform side effect
send_email(env.payload["email"], env.payload["message"])

# Record completion
self.record_completion(request_id, "send_email")

except Exception as e:
# Side effect may have partially completed
self.record_failure(request_id, "send_email", str(e))
raise

return AgentResponse.success({"sent": True}, agent=self.name)

Summary

ComponentGuarantee
Envelope hashSHA-256, canonical JSON
Sequence numbersDeterministic ordering
Event flushAt each boundary
File writeAtomic via rename
RecoveryDetect incomplete, analyze crash point
WALPlanned for enhanced durability

See Also