Norns Streaming Architecture
Real-time event streaming architecture for the Norns agent hierarchy, enabling transparent visibility into agent execution from classification through tool execution.
Overview
The Norns streaming architecture provides real-time visibility into every phase of agent execution through structured events streamed via WebSocket. This enables rich UI experiences showing planning, reasoning, tool execution, and response generation as they happen.
| Component | Purpose |
|---|---|
| StreamEvent | Universal event type for all agent levels |
| WebSocket Transport | Real-time bidirectional communication |
| Event Forwarding | Hierarchical event propagation from workers → supervisors → conversation agent → frontend |
| Frontend Rendering | Collapsible sections for planning, tools, and response |
Architecture
Hierarchical Flow
┌──────────────────────────────────────────────────────────────┐
│ ConversationAgent │
│ (Entry Point) │
│ │
│ - Classifies complexity (simple/moderate/complex) │
│ - Handles simple requests directly │
│ - Delegates complex to NornsSupervisor │
│ - Forwards ALL events to WebSocket │
└──────────────────┬───────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ NornsSupervisor (Level 1) │
│ │
│ execute_streaming() yields: │
│ - thinking (start, classification, reasoning) │
│ - routing (domain selection) │
│ - tool_start/tool_end (supervisor execution) │
│ - Events from Domain Supervisors (forwarded) │
└──────────────────┬───────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Domain Supervisors (Level 2) │
│ Task, Calendar, Home, Navigation, etc. │
│ │
│ execute_streaming() yields: │
│ - tool_start/tool_end (domain supervisor execution) │
│ - routing (worker selection) │
│ - Events from Workers (forwarded) │
└──────────────────┬───────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Workers (Level 3) │
│ Specialized agents (TaskWorker, CalendarWorker...) │
│ │
│ execute_streaming() yields: │
│ - tool_start (worker begins) │
│ - tool_end (worker completes with result) │
│ - error (on failure) │
└──────────────────────────────────────────────────────────────┘
Event Flow Diagram
User Message
│
▼
┌─────────────────────┐
│ WebSocket Connection │
│ /ws/chat/:session │
└──────────┬───────────┘
│
▼
┌─────────────────────────────────────────┐
│ ConversationAgent │
│ │
│ 1. classify_only() │
│ StreamEvent(type="thinking", │
│ phase="classification") │
│ │
│ 2. handle_message_structured() │
│ • Simple → _handle_simple_structured│
│ • Complex → _handle_complex_structured
│ │
│ 3. Forward all events to WebSocket │
└─────────────┬───────────────────────────┘
│
▼
[Complex path only]
│
▼
┌──────────────────────────────────────────┐
│ NornsSupervisor │
│ │
│ execute_streaming(): │
│ • Classify intent │
│ • Emit routing decision │
│ • Delegate to domain(s) │
│ • Forward domain events │
│ • Synthesize response │
└─────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ DomainSupervisor │
│ │
│ execute_streaming(): │
│ • Route to workers │
│ • Execute workers │
│ • Forward worker events │
│ • Synthesize results │
└─────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Worker │
│ │
│ execute_streaming(): │
│ • Emit tool_start │
│ • Execute action │
│ • Emit tool_end with result │
└──────────────────────────────────────────┘
StreamEvent Schema
Defined in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/stream_events.py:
@dataclass
class StreamEvent:
"""Structured event for streaming agent execution."""
# Required
type: str
# Common fields
content: Optional[str] = None
# For thinking events
phase: Optional[str] = None # "start", "planning", "classification", "reasoning"
# For tool/worker events
id: Optional[str] = None # Correlation ID
name: Optional[str] = None # Tool or worker name
args: Optional[dict] = None # For tool_start
result: Optional[Any] = None # For tool_end
# Timing
duration_ms: Optional[int] = None
# Extra context
metadata: Optional[dict] = None
# Agent hierarchy info
agent_name: Optional[str] = None
agent_level: Optional[int] = None # 1=supervisor, 2=domain, 3=worker
domain: Optional[str] = None
def to_dict(self) -> dict:
"""Convert to dict for JSON serialization."""
# Converts to camelCase for frontend
...
Event Types
| Type | Purpose | Fields | Emitted By |
|---|---|---|---|
| thinking | Planning, classification, reasoning | phase, content, duration_ms | All levels |
| routing | Domain/worker routing decision | content, metadata | Supervisors |
| tool_start | Agent/tool execution begins | id, name, args | All levels |
| tool_end | Agent/tool execution completes | id, name, result, duration_ms | All levels |
| token | Response streaming (word by word) | content | ConversationAgent |
| done | Execution complete | duration_ms | ConversationAgent |
| error | Error occurred | content | All levels |
Thinking Phases
| Phase | When | Example Content |
|---|---|---|
| start | Beginning of processing | "Processing your request..." |
| planning | Breaking down complex task | "Step 1: Fetch list of open PRs..." |
| classification | Intent classification complete | "Classified as TASK: create reminder" |
| reasoning | LLM reasoning/synthesis | "The user wants to..." |
Internal Event Types
These events are used for result passing between hierarchy levels and are not forwarded to the frontend:
| Type | Purpose | Contains |
|---|---|---|
| worker_result | Worker → Domain Supervisor | WorkerResult object |
| domain_result | Domain Supervisor → Main Supervisor | Synthesis + results |
| supervisor_result | Main Supervisor → Conversation Agent | Final response |
Implementation Guide
Adding Streaming to a New Worker
- Inherit from BaseWorker:
from agents.base import BaseWorker, WorkerResult, HierarchicalState
from agents.stream_events import StreamEvent
from typing import AsyncIterator
class MyNewWorker(BaseWorker):
name = "my_new_worker"
domain = Domain.TASK
description = "Does something useful"
async def execute(
self,
state: HierarchicalState,
action: str,
**kwargs
) -> WorkerResult:
# Traditional non-streaming implementation
result = await self._do_work(action)
return WorkerResult(
worker_name=self.name,
domain=self.domain,
success=True,
result=result,
)
- Override execute_streaming() for custom streaming:
async def execute_streaming(
self,
state: HierarchicalState,
action: str,
**kwargs,
) -> AsyncIterator[StreamEvent]:
import time
worker_id = f"{self.name}-{int(time.time() * 1000)}"
# Emit start
yield StreamEvent(
type="tool_start",
id=worker_id,
name=self.name,
args={"action": action},
agent_name=self.name,
agent_level=3,
domain=self.domain.value,
)
start_time = time.time()
try:
# Do work (can yield progress events)
result = await self._do_work(action)
# Emit completion
duration_ms = int((time.time() - start_time) * 1000)
yield StreamEvent(
type="tool_end",
id=worker_id,
name=self.name,
result={"data": result},
duration_ms=duration_ms,
agent_name=self.name,
agent_level=3,
domain=self.domain.value,
)
# Return result for parent (not shown in UI)
yield StreamEvent(
type="worker_result",
result=WorkerResult(
worker_name=self.name,
domain=self.domain,
success=True,
result=result,
),
metadata={"is_final": True},
)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
yield StreamEvent(
type="error",
content=str(e),
agent_name=self.name,
agent_level=3,
)
- Default Implementation: If you don't override
execute_streaming(), the base class provides a default implementation that wraps yourexecute()method and emits basic start/end events.
Adding Streaming to a New Domain Supervisor
- Inherit from BaseDomainSupervisor:
from agents.base import BaseDomainSupervisor, WorkerResult, HierarchicalState
from agents.stream_events import StreamEvent
from typing import AsyncIterator
class MyDomainSupervisor(BaseDomainSupervisor):
name = "my_domain_supervisor"
domain = Domain.TASK
description = "Coordinates my domain workers"
def __init__(self, model=None):
super().__init__(model)
self.workers = {
"worker1": Worker1(),
"worker2": Worker2(),
}
async def route_to_workers(
self,
state: HierarchicalState,
action: str
) -> list[str]:
# Return list of worker names to invoke
return ["worker1"]
async def synthesize_results(
self,
state: HierarchicalState,
results: list[WorkerResult],
) -> str:
# Combine worker results into response
return "Worker completed successfully"
-
The base class execute_streaming() handles:
- Emitting supervisor start/end events
- Routing to workers
- Forwarding worker events
- Synthesis
- Error handling
-
Override execute_streaming() only if you need custom event emission.
Adding Streaming to Main Supervisor
The NornsSupervisor.execute_streaming() method is already implemented. It:
- Classifies user intent
- Emits thinking and routing events
- Delegates to domain supervisors
- Forwards all domain supervisor events
- Synthesizes final response
- Streams response tokens
Frontend Integration
WebSocket Protocol
Connect: ws://norns-pm.ravenhelm.dev/ws/chat/:sessionId
Send (user message):
{
"message": "What's on my calendar today?",
"user_id": "uuid-here"
}
Receive (events):
{"type": "thinking", "phase": "start", "content": "Processing..."}
{"type": "thinking", "phase": "classification", "content": "Classified as CALENDAR"}
{"type": "routing", "content": "Routing to calendar domain", "metadata": {"domain": "calendar"}}
{"type": "tool_start", "id": "tool-123", "name": "CalendarWorker", "args": {"action": "list events"}}
{"type": "tool_end", "id": "tool-123", "name": "CalendarWorker", "result": {...}, "durationMs": 543}
{"type": "token", "content": "You "}
{"type": "token", "content": "have "}
{"type": "token", "content": "3 "}
{"type": "token", "content": "events "}
{"type": "done", "durationMs": 1234}
React Implementation
From /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/app/(authenticated)/chat/page.tsx:
Key State:
const [thinkingPhase, setThinkingPhase] = useState<string>('');
const [thinkingContent, setThinkingContent] = useState<ThinkingContent[]>([]);
const [activeToolCalls, setActiveToolCalls] = useState<ToolCall[]>([]);
const [activeRouting, setActiveRouting] = useState<{domain: string; action: string} | null>(null);
WebSocket Handler:
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'thinking':
if (data.phase === 'start') {
setThinkingPhase(data.content);
} else if (data.phase === 'planning') {
// Planning steps shown in collapsible section
thinkingContent.push({content: data.content, phase: 'planning'});
} else if (data.phase === 'reasoning') {
thinkingContent.push({content: data.content, phase: 'reasoning'});
setThinkingPhase(''); // Clear spinner
}
break;
case 'routing':
setActiveRouting({
domain: data.metadata?.domain,
action: data.content,
});
break;
case 'tool_start':
activeToolCalls.push({
id: data.id,
name: data.name,
args: data.args,
status: 'running',
startTime: Date.now(),
});
break;
case 'tool_end':
const tool = activeToolCalls.find(t => t.id === data.id);
tool.status = 'complete';
tool.result = data.result;
tool.durationMs = data.durationMs;
break;
case 'token':
responseContent += data.content;
break;
case 'done':
// Save to message trace
message.trace = {
thinking: thinkingContent,
toolCalls: activeToolCalls,
routing: activeRouting,
duration: data.durationMs,
};
break;
}
};
Collapsible Sections:
<CollapsibleSection
title="Planning"
icon="💭"
summary="1.2s"
defaultOpen={!isMobile}
>
{/* Reasoning and planning steps */}
</CollapsibleSection>
<CollapsibleSection
title="Tools"
icon="🔧"
summary="2 calls, 543ms"
defaultOpen={!isMobile}
>
{/* Tool execution details */}
</CollapsibleSection>
Event Filtering
What Gets Shown
| Event Type | Shown to User | Where |
|---|---|---|
| thinking (planning) | ✅ Yes | Planning section |
| thinking (reasoning) | ✅ Yes | Planning section |
| thinking (classification) | ✅ Yes | Planning section |
| routing | ✅ Yes | Planning section |
| tool_start | ✅ Yes | Tools section |
| tool_end | ✅ Yes | Tools section |
| token | ✅ Yes | Response content |
| done | ✅ Yes (timestamp) | Footer |
| error | ✅ Yes | Error banner |
What Gets Filtered
| Event Type | Shown to User | Purpose |
|---|---|---|
| worker_result | ❌ No | Internal: Worker → Domain Supervisor |
| domain_result | ❌ No | Internal: Domain Supervisor → Main Supervisor |
| supervisor_result | ❌ No | Internal: Main Supervisor → Conversation Agent |
These internal events carry WorkerResult objects and synthesis strings between hierarchy levels but are not meaningful to end users.
Frontend Filter:
async for event in supervisor.execute_streaming(state):
// Skip internal result events
if event.type in ("worker_result", "domain_result", "supervisor_result"):
continue
yield event
Error Handling
Worker Level
try:
result = await self._do_work()
yield StreamEvent(type="tool_end", ...)
except Exception as e:
yield StreamEvent(
type="error",
content=f"Worker {self.name} failed: {str(e)}",
agent_name=self.name,
agent_level=3,
)
# Re-raise or return error WorkerResult
Supervisor Level
try:
async for event in worker.execute_streaming(...):
if event.type == "error":
# Log and continue to next worker or handle gracefully
logger.error(f"Worker error: {event.content}")
yield event
except Exception as e:
yield StreamEvent(
type="error",
content=f"Supervisor error: {str(e)}",
agent_name=self.name,
agent_level=2,
)
Frontend
case 'error':
setError(data.content);
setIsStreaming(false);
break;
Performance Considerations
Event Volume
For complex tasks with multiple workers, expect:
- 1-2 thinking events (classification, reasoning)
- 1 routing event per domain
- 2 events per worker (tool_start, tool_end)
- 1 event per response token (~20-200 tokens)
- 1 done event
Example: A task with 3 workers and 50-word response:
- ~12 structured events
- ~50 token events
- Total: ~62 WebSocket messages
Optimization Strategies
- Token Batching: Instead of word-by-word, batch tokens:
buffer = []
for word in response.split():
buffer.append(word)
if len(buffer) >= 5:
yield StreamEvent(type="token", content=" ".join(buffer))
buffer = []
- Event Sampling: For long-running workers, emit periodic progress:
for i, item in enumerate(items):
await process(item)
if i % 10 == 0:
yield StreamEvent(
type="thinking",
phase="reasoning",
content=f"Processed {i}/{len(items)} items",
)
- Compression: For large result payloads, truncate in events:
result_preview = str(result)[:500]
yield StreamEvent(
type="tool_end",
result={"preview": result_preview, "full": False},
)
Testing
Unit Test: Worker Streaming
import pytest
from agents.base import HierarchicalState
from agents.stream_events import StreamEvent
@pytest.mark.asyncio
async def test_worker_streaming():
worker = MyWorker()
state = HierarchicalState(...)
events = []
async for event in worker.execute_streaming(state, "test action"):
events.append(event)
# Verify event sequence
assert events[0].type == "tool_start"
assert events[-2].type == "tool_end"
assert events[-1].type == "worker_result"
# Verify timing
assert events[-2].duration_ms > 0
Integration Test: Full Flow
@pytest.mark.asyncio
async def test_full_streaming_flow():
from agents.conversation.agent import ConversationAgent
agent = ConversationAgent()
events = []
async for event in agent.handle_message_structured(
message="What's on my calendar?",
session_id="test",
user_id=UUID("..."),
):
events.append(event)
# Verify we got all event types
event_types = {e.type for e in events}
assert "thinking" in event_types
assert "routing" in event_types
assert "tool_start" in event_types
assert "tool_end" in event_types
assert "token" in event_types
assert "done" in event_types
Manual Test: WebSocket Client
import asyncio
import websockets
import json
async def test_websocket():
uri = "ws://norns-pm.ravenhelm.dev/ws/chat/test-session"
async with websockets.connect(uri) as ws:
# Send message
await ws.send(json.dumps({
"message": "Turn on the living room lights",
"user_id": "test-uuid",
}))
# Receive events
async for message in ws:
event = json.loads(message)
print(f"{event['type']}: {event.get('content', '')}")
if event['type'] == 'done':
break
asyncio.run(test_websocket())
Troubleshooting
Events Not Appearing
Symptom: Frontend shows loading spinner but no events.
Check:
- WebSocket connection established? (Browser console → Network → WS)
- Events being emitted? (Add logging in
execute_streaming()) - Events filtered? (Check internal event types like
worker_result)
Fix:
# Add debug logging
async for event in worker.execute_streaming(...):
logger.info(f"Event: {event.type} from {event.agent_name}")
yield event
Events Out of Order
Symptom: tool_end before tool_start, or responses before thinking.
Cause: Async execution not awaited properly.
Fix:
# BAD: Parallel execution without ordering
tasks = [worker1.execute_streaming(...), worker2.execute_streaming(...)]
for task in asyncio.as_completed(tasks):
async for event in await task:
yield event
# GOOD: Sequential execution
for worker in [worker1, worker2]:
async for event in worker.execute_streaming(...):
yield event
WebSocket Disconnects
Symptom: Connection drops mid-stream.
Check:
- Timeout settings (default 60s)
- Reverse proxy timeout (Traefik/nginx)
- Network stability
Fix (FastAPI):
@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str):
await websocket.accept()
try:
# Set keepalive
asyncio.create_task(send_keepalive(websocket))
async for event in agent.handle_message_structured(...):
await websocket.send_json(event.to_dict())
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await websocket.close()
async def send_keepalive(ws: WebSocket):
while True:
await asyncio.sleep(30)
await ws.send_json({"type": "ping"})
Related Documentation
- Norns Agent - Core agent architecture
- Norns Memory System - Memory and context
- Norns Admin - Web UI implementation
- LangGraph - Graph-based agent orchestration
- Langfuse - Observability and tracing
File Reference
| File | Purpose |
|---|---|
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/stream_events.py | StreamEvent dataclass |
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/base.py | BaseWorker, BaseDomainSupervisor with execute_streaming() |
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/supervisor.py | NornsSupervisor with execute_streaming() |
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/conversation/agent.py | ConversationAgent entry point, event forwarding |
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/app/(authenticated)/chat/page.tsx | Frontend WebSocket handling and UI rendering |
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/components/chat/CollapsibleSection.tsx | Collapsible sections for planning/tools |
Changelog
| Date | Author | Change |
|---|---|---|
| 2026-01-05 | Nate Walker | Initial documentation - streaming architecture refactor |