Skip to main content

Norns Streaming Architecture

Real-time event streaming architecture for the Norns agent hierarchy, enabling transparent visibility into agent execution from classification through tool execution.


Overview

The Norns streaming architecture provides real-time visibility into every phase of agent execution through structured events streamed via WebSocket. This enables rich UI experiences showing planning, reasoning, tool execution, and response generation as they happen.

ComponentPurpose
StreamEventUniversal event type for all agent levels
WebSocket TransportReal-time bidirectional communication
Event ForwardingHierarchical event propagation from workers → supervisors → conversation agent → frontend
Frontend RenderingCollapsible sections for planning, tools, and response

Architecture

Hierarchical Flow

┌──────────────────────────────────────────────────────────────┐
│ ConversationAgent │
│ (Entry Point) │
│ │
│ - Classifies complexity (simple/moderate/complex) │
│ - Handles simple requests directly │
│ - Delegates complex to NornsSupervisor │
│ - Forwards ALL events to WebSocket │
└──────────────────┬───────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ NornsSupervisor (Level 1) │
│ │
│ execute_streaming() yields: │
│ - thinking (start, classification, reasoning) │
│ - routing (domain selection) │
│ - tool_start/tool_end (supervisor execution) │
│ - Events from Domain Supervisors (forwarded) │
└──────────────────┬───────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ Domain Supervisors (Level 2) │
│ Task, Calendar, Home, Navigation, etc. │
│ │
│ execute_streaming() yields: │
│ - tool_start/tool_end (domain supervisor execution) │
│ - routing (worker selection) │
│ - Events from Workers (forwarded) │
└──────────────────┬───────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ Workers (Level 3) │
│ Specialized agents (TaskWorker, CalendarWorker...) │
│ │
│ execute_streaming() yields: │
│ - tool_start (worker begins) │
│ - tool_end (worker completes with result) │
│ - error (on failure) │
└──────────────────────────────────────────────────────────────┘

Event Flow Diagram

User Message


┌─────────────────────┐
│ WebSocket Connection │
│ /ws/chat/:session │
└──────────┬───────────┘


┌─────────────────────────────────────────┐
│ ConversationAgent │
│ │
│ 1. classify_only() │
│ StreamEvent(type="thinking", │
│ phase="classification") │
│ │
│ 2. handle_message_structured() │
│ • Simple → _handle_simple_structured│
│ • Complex → _handle_complex_structured
│ │
│ 3. Forward all events to WebSocket │
└─────────────┬───────────────────────────┘


[Complex path only]


┌──────────────────────────────────────────┐
│ NornsSupervisor │
│ │
│ execute_streaming(): │
│ • Classify intent │
│ • Emit routing decision │
│ • Delegate to domain(s) │
│ • Forward domain events │
│ • Synthesize response │
└─────────────┬────────────────────────────┘


┌──────────────────────────────────────────┐
│ DomainSupervisor │
│ │
│ execute_streaming(): │
│ • Route to workers │
│ • Execute workers │
│ • Forward worker events │
│ • Synthesize results │
└─────────────┬────────────────────────────┘


┌──────────────────────────────────────────┐
│ Worker │
│ │
│ execute_streaming(): │
│ • Emit tool_start │
│ • Execute action │
│ • Emit tool_end with result │
└──────────────────────────────────────────┘

StreamEvent Schema

Defined in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/stream_events.py:

@dataclass
class StreamEvent:
"""Structured event for streaming agent execution."""

# Required
type: str

# Common fields
content: Optional[str] = None

# For thinking events
phase: Optional[str] = None # "start", "planning", "classification", "reasoning"

# For tool/worker events
id: Optional[str] = None # Correlation ID
name: Optional[str] = None # Tool or worker name
args: Optional[dict] = None # For tool_start
result: Optional[Any] = None # For tool_end

# Timing
duration_ms: Optional[int] = None

# Extra context
metadata: Optional[dict] = None

# Agent hierarchy info
agent_name: Optional[str] = None
agent_level: Optional[int] = None # 1=supervisor, 2=domain, 3=worker
domain: Optional[str] = None

def to_dict(self) -> dict:
"""Convert to dict for JSON serialization."""
# Converts to camelCase for frontend
...

Event Types

TypePurposeFieldsEmitted By
thinkingPlanning, classification, reasoningphase, content, duration_msAll levels
routingDomain/worker routing decisioncontent, metadataSupervisors
tool_startAgent/tool execution beginsid, name, argsAll levels
tool_endAgent/tool execution completesid, name, result, duration_msAll levels
tokenResponse streaming (word by word)contentConversationAgent
doneExecution completeduration_msConversationAgent
errorError occurredcontentAll levels

Thinking Phases

PhaseWhenExample Content
startBeginning of processing"Processing your request..."
planningBreaking down complex task"Step 1: Fetch list of open PRs..."
classificationIntent classification complete"Classified as TASK: create reminder"
reasoningLLM reasoning/synthesis"The user wants to..."

Internal Event Types

These events are used for result passing between hierarchy levels and are not forwarded to the frontend:

TypePurposeContains
worker_resultWorker → Domain SupervisorWorkerResult object
domain_resultDomain Supervisor → Main SupervisorSynthesis + results
supervisor_resultMain Supervisor → Conversation AgentFinal response

Implementation Guide

Adding Streaming to a New Worker

  1. Inherit from BaseWorker:
from agents.base import BaseWorker, WorkerResult, HierarchicalState
from agents.stream_events import StreamEvent
from typing import AsyncIterator

class MyNewWorker(BaseWorker):
name = "my_new_worker"
domain = Domain.TASK
description = "Does something useful"

async def execute(
self,
state: HierarchicalState,
action: str,
**kwargs
) -> WorkerResult:
# Traditional non-streaming implementation
result = await self._do_work(action)
return WorkerResult(
worker_name=self.name,
domain=self.domain,
success=True,
result=result,
)
  1. Override execute_streaming() for custom streaming:
    async def execute_streaming(
self,
state: HierarchicalState,
action: str,
**kwargs,
) -> AsyncIterator[StreamEvent]:
import time
worker_id = f"{self.name}-{int(time.time() * 1000)}"

# Emit start
yield StreamEvent(
type="tool_start",
id=worker_id,
name=self.name,
args={"action": action},
agent_name=self.name,
agent_level=3,
domain=self.domain.value,
)

start_time = time.time()

try:
# Do work (can yield progress events)
result = await self._do_work(action)

# Emit completion
duration_ms = int((time.time() - start_time) * 1000)
yield StreamEvent(
type="tool_end",
id=worker_id,
name=self.name,
result={"data": result},
duration_ms=duration_ms,
agent_name=self.name,
agent_level=3,
domain=self.domain.value,
)

# Return result for parent (not shown in UI)
yield StreamEvent(
type="worker_result",
result=WorkerResult(
worker_name=self.name,
domain=self.domain,
success=True,
result=result,
),
metadata={"is_final": True},
)

except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
yield StreamEvent(
type="error",
content=str(e),
agent_name=self.name,
agent_level=3,
)
  1. Default Implementation: If you don't override execute_streaming(), the base class provides a default implementation that wraps your execute() method and emits basic start/end events.

Adding Streaming to a New Domain Supervisor

  1. Inherit from BaseDomainSupervisor:
from agents.base import BaseDomainSupervisor, WorkerResult, HierarchicalState
from agents.stream_events import StreamEvent
from typing import AsyncIterator

class MyDomainSupervisor(BaseDomainSupervisor):
name = "my_domain_supervisor"
domain = Domain.TASK
description = "Coordinates my domain workers"

def __init__(self, model=None):
super().__init__(model)
self.workers = {
"worker1": Worker1(),
"worker2": Worker2(),
}

async def route_to_workers(
self,
state: HierarchicalState,
action: str
) -> list[str]:
# Return list of worker names to invoke
return ["worker1"]

async def synthesize_results(
self,
state: HierarchicalState,
results: list[WorkerResult],
) -> str:
# Combine worker results into response
return "Worker completed successfully"
  1. The base class execute_streaming() handles:

    • Emitting supervisor start/end events
    • Routing to workers
    • Forwarding worker events
    • Synthesis
    • Error handling
  2. Override execute_streaming() only if you need custom event emission.

Adding Streaming to Main Supervisor

The NornsSupervisor.execute_streaming() method is already implemented. It:

  1. Classifies user intent
  2. Emits thinking and routing events
  3. Delegates to domain supervisors
  4. Forwards all domain supervisor events
  5. Synthesizes final response
  6. Streams response tokens

Frontend Integration

WebSocket Protocol

Connect: ws://norns-pm.ravenhelm.dev/ws/chat/:sessionId

Send (user message):

{
"message": "What's on my calendar today?",
"user_id": "uuid-here"
}

Receive (events):

{"type": "thinking", "phase": "start", "content": "Processing..."}
{"type": "thinking", "phase": "classification", "content": "Classified as CALENDAR"}
{"type": "routing", "content": "Routing to calendar domain", "metadata": {"domain": "calendar"}}
{"type": "tool_start", "id": "tool-123", "name": "CalendarWorker", "args": {"action": "list events"}}
{"type": "tool_end", "id": "tool-123", "name": "CalendarWorker", "result": {...}, "durationMs": 543}
{"type": "token", "content": "You "}
{"type": "token", "content": "have "}
{"type": "token", "content": "3 "}
{"type": "token", "content": "events "}
{"type": "done", "durationMs": 1234}

React Implementation

From /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/app/(authenticated)/chat/page.tsx:

Key State:

const [thinkingPhase, setThinkingPhase] = useState<string>('');
const [thinkingContent, setThinkingContent] = useState<ThinkingContent[]>([]);
const [activeToolCalls, setActiveToolCalls] = useState<ToolCall[]>([]);
const [activeRouting, setActiveRouting] = useState<{domain: string; action: string} | null>(null);

WebSocket Handler:

ws.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'thinking':
if (data.phase === 'start') {
setThinkingPhase(data.content);
} else if (data.phase === 'planning') {
// Planning steps shown in collapsible section
thinkingContent.push({content: data.content, phase: 'planning'});
} else if (data.phase === 'reasoning') {
thinkingContent.push({content: data.content, phase: 'reasoning'});
setThinkingPhase(''); // Clear spinner
}
break;

case 'routing':
setActiveRouting({
domain: data.metadata?.domain,
action: data.content,
});
break;

case 'tool_start':
activeToolCalls.push({
id: data.id,
name: data.name,
args: data.args,
status: 'running',
startTime: Date.now(),
});
break;

case 'tool_end':
const tool = activeToolCalls.find(t => t.id === data.id);
tool.status = 'complete';
tool.result = data.result;
tool.durationMs = data.durationMs;
break;

case 'token':
responseContent += data.content;
break;

case 'done':
// Save to message trace
message.trace = {
thinking: thinkingContent,
toolCalls: activeToolCalls,
routing: activeRouting,
duration: data.durationMs,
};
break;
}
};

Collapsible Sections:

<CollapsibleSection 
title="Planning"
icon="💭"
summary="1.2s"
defaultOpen={!isMobile}
>
{/* Reasoning and planning steps */}
</CollapsibleSection>

<CollapsibleSection
title="Tools"
icon="🔧"
summary="2 calls, 543ms"
defaultOpen={!isMobile}
>
{/* Tool execution details */}
</CollapsibleSection>

Event Filtering

What Gets Shown

Event TypeShown to UserWhere
thinking (planning)✅ YesPlanning section
thinking (reasoning)✅ YesPlanning section
thinking (classification)✅ YesPlanning section
routing✅ YesPlanning section
tool_start✅ YesTools section
tool_end✅ YesTools section
token✅ YesResponse content
done✅ Yes (timestamp)Footer
error✅ YesError banner

What Gets Filtered

Event TypeShown to UserPurpose
worker_result❌ NoInternal: Worker → Domain Supervisor
domain_result❌ NoInternal: Domain Supervisor → Main Supervisor
supervisor_result❌ NoInternal: Main Supervisor → Conversation Agent

These internal events carry WorkerResult objects and synthesis strings between hierarchy levels but are not meaningful to end users.

Frontend Filter:

async for event in supervisor.execute_streaming(state):
// Skip internal result events
if event.type in ("worker_result", "domain_result", "supervisor_result"):
continue
yield event

Error Handling

Worker Level

try:
result = await self._do_work()
yield StreamEvent(type="tool_end", ...)
except Exception as e:
yield StreamEvent(
type="error",
content=f"Worker {self.name} failed: {str(e)}",
agent_name=self.name,
agent_level=3,
)
# Re-raise or return error WorkerResult

Supervisor Level

try:
async for event in worker.execute_streaming(...):
if event.type == "error":
# Log and continue to next worker or handle gracefully
logger.error(f"Worker error: {event.content}")
yield event
except Exception as e:
yield StreamEvent(
type="error",
content=f"Supervisor error: {str(e)}",
agent_name=self.name,
agent_level=2,
)

Frontend

case 'error':
setError(data.content);
setIsStreaming(false);
break;

Performance Considerations

Event Volume

For complex tasks with multiple workers, expect:

  • 1-2 thinking events (classification, reasoning)
  • 1 routing event per domain
  • 2 events per worker (tool_start, tool_end)
  • 1 event per response token (~20-200 tokens)
  • 1 done event

Example: A task with 3 workers and 50-word response:

  • ~12 structured events
  • ~50 token events
  • Total: ~62 WebSocket messages

Optimization Strategies

  1. Token Batching: Instead of word-by-word, batch tokens:
buffer = []
for word in response.split():
buffer.append(word)
if len(buffer) >= 5:
yield StreamEvent(type="token", content=" ".join(buffer))
buffer = []
  1. Event Sampling: For long-running workers, emit periodic progress:
for i, item in enumerate(items):
await process(item)
if i % 10 == 0:
yield StreamEvent(
type="thinking",
phase="reasoning",
content=f"Processed {i}/{len(items)} items",
)
  1. Compression: For large result payloads, truncate in events:
result_preview = str(result)[:500]
yield StreamEvent(
type="tool_end",
result={"preview": result_preview, "full": False},
)

Testing

Unit Test: Worker Streaming

import pytest
from agents.base import HierarchicalState
from agents.stream_events import StreamEvent

@pytest.mark.asyncio
async def test_worker_streaming():
worker = MyWorker()
state = HierarchicalState(...)

events = []
async for event in worker.execute_streaming(state, "test action"):
events.append(event)

# Verify event sequence
assert events[0].type == "tool_start"
assert events[-2].type == "tool_end"
assert events[-1].type == "worker_result"

# Verify timing
assert events[-2].duration_ms > 0

Integration Test: Full Flow

@pytest.mark.asyncio
async def test_full_streaming_flow():
from agents.conversation.agent import ConversationAgent

agent = ConversationAgent()
events = []

async for event in agent.handle_message_structured(
message="What's on my calendar?",
session_id="test",
user_id=UUID("..."),
):
events.append(event)

# Verify we got all event types
event_types = {e.type for e in events}
assert "thinking" in event_types
assert "routing" in event_types
assert "tool_start" in event_types
assert "tool_end" in event_types
assert "token" in event_types
assert "done" in event_types

Manual Test: WebSocket Client

import asyncio
import websockets
import json

async def test_websocket():
uri = "ws://norns-pm.ravenhelm.dev/ws/chat/test-session"

async with websockets.connect(uri) as ws:
# Send message
await ws.send(json.dumps({
"message": "Turn on the living room lights",
"user_id": "test-uuid",
}))

# Receive events
async for message in ws:
event = json.loads(message)
print(f"{event['type']}: {event.get('content', '')}")

if event['type'] == 'done':
break

asyncio.run(test_websocket())

Troubleshooting

Events Not Appearing

Symptom: Frontend shows loading spinner but no events.

Check:

  1. WebSocket connection established? (Browser console → Network → WS)
  2. Events being emitted? (Add logging in execute_streaming())
  3. Events filtered? (Check internal event types like worker_result)

Fix:

# Add debug logging
async for event in worker.execute_streaming(...):
logger.info(f"Event: {event.type} from {event.agent_name}")
yield event

Events Out of Order

Symptom: tool_end before tool_start, or responses before thinking.

Cause: Async execution not awaited properly.

Fix:

# BAD: Parallel execution without ordering
tasks = [worker1.execute_streaming(...), worker2.execute_streaming(...)]
for task in asyncio.as_completed(tasks):
async for event in await task:
yield event

# GOOD: Sequential execution
for worker in [worker1, worker2]:
async for event in worker.execute_streaming(...):
yield event

WebSocket Disconnects

Symptom: Connection drops mid-stream.

Check:

  1. Timeout settings (default 60s)
  2. Reverse proxy timeout (Traefik/nginx)
  3. Network stability

Fix (FastAPI):

@router.websocket("/ws/chat/{session_id}")
async def chat_websocket(websocket: WebSocket, session_id: str):
await websocket.accept()

try:
# Set keepalive
asyncio.create_task(send_keepalive(websocket))

async for event in agent.handle_message_structured(...):
await websocket.send_json(event.to_dict())
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await websocket.close()

async def send_keepalive(ws: WebSocket):
while True:
await asyncio.sleep(30)
await ws.send_json({"type": "ping"})


File Reference

FilePurpose
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/stream_events.pyStreamEvent dataclass
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/base.pyBaseWorker, BaseDomainSupervisor with execute_streaming()
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/agents/supervisor.pyNornsSupervisor with execute_streaming()
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/conversation/agent.pyConversationAgent entry point, event forwarding
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/app/(authenticated)/chat/page.tsxFrontend WebSocket handling and UI rendering
/Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/admin/components/chat/CollapsibleSection.tsxCollapsible sections for planning/tools

Changelog

DateAuthorChange
2026-01-05Nate WalkerInitial documentation - streaming architecture refactor