Multi-Channel Session Memory System
Real-time event-driven conversation continuity across voice, web chat, and messaging channels
The Multi-Channel Session Memory System enables Norns to maintain conversation context across different communication channels with real-time concurrent multi-channel support. Users can have a voice conversation while simultaneously chatting on the web, with both channels receiving live updates from each other.
Table of Contents
- Overview
- The Florist Scenario
- Architecture
- Database Schema
- Configuration
- API Endpoints
- SessionManager Service
- Channel Integration
- Session Lifecycle
- Event Types
- Data Flow Diagrams
- Enabling the System
- Operational Guide
Overview
What It Does
The Multi-Channel Session Memory System provides:
- One Session Per User: Each user has a single active session at any time
- Real-Time Multi-Channel Communication: Concurrent conversations across multiple channels with live synchronization
- Event-Driven Architecture: Redis pub/sub broadcasts events to all active channels
- Cross-Channel Memory: Context persists across voice, web chat, SMS, and other channels
- Automatic Resume: Sessions automatically resume within a configurable time window (default: 24 hours)
- Live Context Updates: Changes in one channel immediately visible in all other active channels
- Channel Consent Tracking: Manages opt-in/opt-out for proactive messaging
Key Capabilities
| Feature | Description |
|---|---|
| Session Resolution | Automatically find or create sessions based on user identity |
| Real-Time Events | SSE endpoint streams events to all connected channels |
| Redis Pub/Sub | Central message bus for cross-channel communication |
| Context Preservation | Store and retrieve conversation state, preferences, and history |
| Interaction Tracking | Track individual channel interactions within a session |
| Live History Sync | Conversation history updates in real-time across channels |
| Concurrent Channels | Multiple channels can be active simultaneously on one session |
| Consent Management | Track user consent for different communication channels |
| Event Logging | Audit trail of all session events |
The Florist Scenario
NEW: Real-time concurrent multi-channel conversation
Scenario: Voice Call + Web Chat Simultaneously
8:00 AM - User calls Norns via phone:
User (voice): "Hey Norns, I need to send flowers to my mom for her birthday."
Norns (voice): "I'd be happy to help! What type of flowers does she like?"
8:01 AM - While still on the phone, user opens web chat:
User (web): "Can you show me photos of tulip arrangements?"
What happens:
- Web chat publishes message to Redis:
session:{id}:events - Voice channel receives real-time event via SSE
- Voice channel injects into LLMContext:
[Real-time update from chat] user: Can you show me photos... - Norns (voice) responds with context from both channels
8:02 AM - Voice assistant responds:
Norns (voice): "I see you're also looking at the web chat. I'll display some tulip
arrangements there for you. Which color does she prefer?"
User (voice): "Pink tulips would be perfect."
8:02 AM - Voice message published to Redis:
[Voice channel publishes to session:{id}:events]
Web chat receives event and displays:
User (voice): "Pink tulips would be perfect."
8:03 AM - Web shows search results:
Norns (web chat): Here are three florists with pink tulips:
[images of arrangements displayed]
8:05 AM - User continues on voice:
User (voice): "I'll go with Bloom & Petal."
Norns (voice): "Perfect! I've noted that. You can complete the order on the web
when you're ready."
Both channels see full conversation history in real-time. The session persists across voice and web simultaneously, with each channel receiving live updates from the other.
Architecture
Real-Time Event System
The system uses an event-driven architecture with Redis pub/sub for real-time cross-channel communication.
Key Design:
- Each session has a Redis channel:
session:{session_id}:events - All active channels subscribe to the session's event stream via SSE
- When any channel publishes a message, all other channels receive it instantly
- Messages are both broadcast via Redis AND persisted to the database
Redis Pub/Sub
Channel Format: session:{session_id}:events
Event Flow:
Web Chat publishes message
│
▼
session_manager.publish_message()
│
├─► Redis PUBLISH → session:{session_id}:events
│
└─► Database INSERT → conversation_history
│
▼
Voice channel (subscribed via SSE)
│
▼
Receives SessionEvent
│
▼
Injects into LLMContext as:
"[Real-time update from chat] user: ..."
Benefits:
- No polling: SSE provides efficient server-push
- Real-time: Sub-second latency for cross-channel updates
- Scalable: Redis pub/sub handles thousands of concurrent sessions
- Persistent: All events are also written to database for history
Component Overview
┌─────────────────────────────────────────────────────────────────────┐
│ Channel Interfaces │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Telephony │ │ Web Chat │ │ SMS │ │
│ │ Service │ │ /api/chat │ │ (future) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼────────────────────┘
│ │ │
│ SSE subscribe │ publish │
│ GET /events │ POST /messages │
│ │ │
└──────────┬───────┴─────────┬────────┘
│ │
▼ ▼
┌─────────────────────────────────┐
│ SessionManager Service │
│ (session_manager.py) │
│ │
│ • publish_event() │
│ • publish_message() │
│ • subscribe_to_session() │
│ • create_event_listener() │
└─────────────┬───────────────────┘
│
┌─────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Redis │ │ sessions │ │ session_ │
│ pub/sub │ │ DB │ │ events │
│ │ │ │ │ DB │
└──────────┘ └──────────┘ └──────────┘
│
│ session:{id}:events
│
▼
All subscribed channels
Database Schema
Core Tables
sessions
Stores user sessions with context and resume windows.
CREATE TABLE sessions (
session_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
status VARCHAR(20) NOT NULL DEFAULT 'active',
context JSONB NOT NULL DEFAULT '{}',
resume_window INTERVAL NOT NULL DEFAULT '24 hours',
created_at TIMESTAMPTZ DEFAULT NOW(),
last_activity TIMESTAMPTZ DEFAULT NOW(),
closed_at TIMESTAMPTZ
);
-- Indexes
CREATE INDEX idx_sessions_user_active
ON sessions(user_id)
WHERE status IN ('active', 'idle', 'resumable');
CREATE INDEX idx_sessions_last_activity
ON sessions(last_activity DESC);
-- Constraint: Only one active/idle/resumable session per user
CREATE UNIQUE INDEX one_active_session_per_user
ON sessions(user_id)
WHERE status IN ('active', 'idle', 'resumable');
Session States:
active: Has one or more active channel interactionsidle: No active interactions, but within resume windowresumable: Alias foridle(used interchangeably)closed: Explicitly closed or expired beyond resume window
Context Schema:
{
"conversation_history": [
{"role": "user", "content": "Hey Norns, I need flowers", "channel": "voice"},
{"role": "assistant", "content": "I'd be happy to help!", "channel": "voice"},
{"role": "user", "content": "Show me photos", "channel": "chat"}
],
"last_channel": "voice",
"preferences": {},
"custom_data": {}
}
Note: Messages in history now include optional channel field to track origin.
session_interactions
Tracks individual channel interactions within a session.
CREATE TABLE session_interactions (
interaction_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(session_id) ON DELETE CASCADE,
channel VARCHAR(50) NOT NULL,
channel_ref VARCHAR(255), -- External ID (CallSid, message ID)
status VARCHAR(20) NOT NULL DEFAULT 'active',
started_at TIMESTAMPTZ DEFAULT NOW(),
ended_at TIMESTAMPTZ,
legacy_interaction_id UUID REFERENCES agent_interactions(interaction_id),
metadata JSONB DEFAULT '{}'
);
-- Indexes
CREATE INDEX idx_session_interactions_session
ON session_interactions(session_id, status);
CREATE INDEX idx_session_interactions_channel_ref
ON session_interactions(channel_ref)
WHERE channel_ref IS NOT NULL;
Channels:
voice: Phone calls via telephony servicechat: Web chat interfacesms: Text messagesslack: Slack messagesemail: Email interactions
Multiple Concurrent Interactions:
A session can have multiple active interactions simultaneously (e.g., voice + chat).
session_events
Audit log for all session events.
CREATE TABLE session_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(session_id) ON DELETE CASCADE,
interaction_id UUID REFERENCES session_interactions(interaction_id),
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
timestamp TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_session_events_session
ON session_events(session_id, timestamp DESC);
CREATE INDEX idx_session_events_type
ON session_events(event_type);
Event Types: See Event Types section.
channel_consent
Tracks user consent for different communication channels.
CREATE TABLE channel_consent (
consent_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
channel_type VARCHAR(50) NOT NULL,
opted_in BOOLEAN NOT NULL DEFAULT FALSE,
opted_in_at TIMESTAMPTZ,
opted_out_at TIMESTAMPTZ,
consent_source VARCHAR(100), -- 'web_form', 'voice_prompt', 'sms_reply'
proactive_allowed BOOLEAN NOT NULL DEFAULT FALSE,
proactive_allowed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(user_id, channel_type)
);
-- Index
CREATE INDEX idx_channel_consent_user ON channel_consent(user_id);
Configuration
Environment variables control the session system behavior.
| Variable | Default | Description |
|---|---|---|
NORNS_SESSION_SYSTEM | false | Feature flag to enable session system |
NORNS_SESSION_RESUME_WINDOW | 24 hours | How long sessions remain resumable |
NORNS_SESSION_MAX_HISTORY | 50 | Max conversation messages to keep |
REDIS_HOST | localhost | Redis server host |
REDIS_PORT | 6379 | Redis server port |
REDIS_PASSWORD | (empty) | Redis authentication password |
Example Configuration:
# In ~/ravenhelm/.env.hlidskjalf
NORNS_SESSION_SYSTEM=true
NORNS_SESSION_RESUME_WINDOW=24 hours
NORNS_SESSION_MAX_HISTORY=50
# Redis (required for real-time events)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=your_redis_password
Resume Window Formats:
- PostgreSQL interval syntax:
'24 hours','30 minutes','7 days' - Can be overridden per-session in API calls
API Endpoints
Session Management
POST /api/sessions/resolve
Resolve or create a session for a user.
curl -X POST https://norns.ravenhelm.dev/api/sessions/resolve \
-H "Content-Type: application/json" \
-d '{
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"resume_window": "12 hours"
}'
Response:
{
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "active",
"created_at": "2026-01-04T10:00:00Z",
"last_activity": "2026-01-04T10:00:00Z",
"resumed": false
}
GET /api/users/by-phone/{phone}
Resolve user by phone number.
curl https://norns.ravenhelm.dev/api/users/by-phone/+15125550123
Response:
{
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"display_name": "Nate Walker",
"phone_number": "+15125550123",
"email": "nate@example.com"
}
POST /api/sessions/{session_id}/interactions
Attach a new channel interaction to a session.
curl -X POST https://norns.ravenhelm.dev/api/sessions/{session_id}/interactions \
-H "Content-Type: application/json" \
-d '{
"channel": "voice",
"channel_ref": "CA1234567890abcdef",
"metadata": {
"call_duration": 120,
"from_number": "+15125550123"
}
}'
Response:
{
"interaction_id": "a1b2c3d4-e5f6-4789-a012-345678901234",
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"channel": "voice",
"status": "active",
"started_at": "2026-01-04T10:05:00Z"
}
POST /api/sessions/interactions/{interaction_id}/close
Close an interaction and optionally sync final conversation history.
curl -X POST https://norns.ravenhelm.dev/api/sessions/interactions/{interaction_id}/close \
-H "Content-Type: application/json" \
-d '{
"final_history": [
{"role": "user", "content": "Thanks!"},
{"role": "assistant", "content": "You're welcome!"}
]
}'
GET /api/sessions/{session_id}/context
Get the full session context.
curl https://norns.ravenhelm.dev/api/sessions/{session_id}/context
Response:
{
"conversation_history": [
{"role": "user", "content": "I need flowers", "channel": "voice"},
{"role": "assistant", "content": "I'd be happy to help!", "channel": "voice"}
],
"last_channel": "voice",
"preferences": {}
}
Real-Time Events
GET /api/sessions/{session_id}/events
Server-Sent Events (SSE) endpoint for real-time session updates.
This endpoint streams events to connected clients whenever any channel publishes to the session.
curl -N https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=voice
Query Parameters:
channel(required): The subscribing channel name (e.g., "voice", "chat", "sms")
Event Stream Format (SSE):
data: {"event_type": "MESSAGE", "session_id": "7c9e...", "channel": "chat", "data": {"role": "user", "content": "Hello", "interaction_id": "a1b2...", "metadata": {}}}
data: {"event_type": "TYPING", "session_id": "7c9e...", "channel": "voice", "data": {"is_typing": true}}
data: {"event_type": "CONTEXT_UPDATE", "session_id": "7c9e...", "channel": "chat", "data": {"preferences": {"florist": "Bloom & Petal"}}}
Filtering:
- Events published by the subscribing channel are filtered out (you don't receive your own events)
- Only events from OTHER channels are delivered
Connection Management:
- Clients should reconnect on disconnect
- Timeouts handled by SSE keep-alive (sent every 15 seconds)
Example JavaScript Client:
const eventSource = new EventSource(
`https://norns.ravenhelm.dev/api/sessions/${sessionId}/events?channel=chat`
);
eventSource.onmessage = (event) => {
const sessionEvent = JSON.parse(event.data);
console.log(`Received from ${sessionEvent.channel}:`, sessionEvent);
if (sessionEvent.event_type === 'MESSAGE') {
displayMessage(sessionEvent.data.role, sessionEvent.data.content);
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
// Reconnect logic
};
Message Publishing
POST /api/sessions/{session_id}/messages
Publish a message to all channels subscribed to the session.
This is the primary API for sending messages in a multi-channel session.
curl -X POST https://norns.ravenhelm.dev/api/sessions/{session_id}/messages \
-H "Content-Type: application/json" \
-d '{
"channel": "chat",
"role": "user",
"content": "Can you show me photos of tulips?",
"interaction_id": "a1b2c3d4-e5f6-4789-a012-345678901234",
"metadata": {
"timestamp": "2026-01-04T10:05:30Z"
}
}'
Request Body:
channel(required): Source channel (e.g., "voice", "chat", "sms")role(required): Message role ("user" or "assistant")content(required): Message textinteraction_id(required): Interaction ID from the channelmetadata(optional): Additional context (timestamps, references, etc.)
Response:
{
"success": true,
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"message_published": true
}
What Happens:
- Message is added to session's
conversation_historyin database SessionEventwith typeMESSAGEis published to Redis channelsession:{session_id}:events- All channels subscribed via
/eventsendpoint receive the event (except the publishing channel) session_eventstable logs the message
SessionManager Service
Located at /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/session_manager.py
The SessionManager is the core service managing all multi-channel session logic.
Key Methods
async publish_event(event: SessionEvent) -> None
Publish a session event to Redis pub/sub.
event = SessionEvent(
event_type=SessionEventType.MESSAGE,
session_id=session_id,
channel="chat",
data={
"role": "user",
"content": "Hello",
"interaction_id": str(interaction_id),
"metadata": {}
}
)
await session_manager.publish_event(event)
Broadcast to: session:{session_id}:events
async publish_message(...) -> None
Publish a message AND persist to conversation history.
await session_manager.publish_message(
session_id=session_id,
channel="voice",
role="assistant",
content="I'd be happy to help!",
interaction_id=interaction_id,
metadata={"timestamp": "2026-01-04T10:00:00Z"}
)
Does two things:
- Calls
_append_to_history()to persist in database - Calls
publish_event()to broadcast via Redis
async subscribe_to_session(session_id, my_channel) -> AsyncGenerator
Subscribe to a session's event stream, filtering out own channel's events.
async for event in session_manager.subscribe_to_session(session_id, my_channel="voice"):
if event.event_type == SessionEventType.MESSAGE:
print(f"Received message from {event.channel}: {event.data['content']}")
Yields: SessionEvent objects from other channels
Filters: Events where event.channel == my_channel
async create_event_listener(session_id, my_channel, callback) -> asyncio.Task
Start a background task that listens for events and calls a callback.
async def on_event(event: SessionEvent):
print(f"Event from {event.channel}: {event.event_type}")
task = await session_manager.create_event_listener(
session_id=session_id,
my_channel="voice",
callback=on_event
)
Returns: asyncio.Task that runs until stopped
Used by: Telephony service to inject events into LLMContext
async stop_event_listener(session_id, channel) -> None
Stop a running event listener task.
await session_manager.stop_event_listener(session_id, channel="voice")
Cleanup: Cancels the background task and removes from tracking dict
async _append_to_history(session_id, role, content, channel) -> None
Real-time history persistence.
Immediately appends a message to the session's conversation_history in the database.
await session_manager._append_to_history(
session_id=session_id,
role="user",
content="Show me tulips",
channel="chat"
)
Database update:
UPDATE sessions
SET context = jsonb_set(
context,
'{conversation_history}',
context->'conversation_history' || '[{"role": "user", "content": "Show me tulips", "channel": "chat"}]'::jsonb
),
last_activity = NOW()
WHERE session_id = $1
History limit: Trims to NORNS_SESSION_MAX_HISTORY messages
Channel Integration
Voice Channel (Telephony)
Located in /Users/ravenhelm/ravenhelm/services/telephony/main.py
start_session_event_listener(session_id, call_sid, context)
Subscribes to the session's SSE event stream and injects events into the LLMContext.
When called: At start of voice call, after session resolution
What it does:
- Opens SSE connection to
GET /api/sessions/{session_id}/events?channel=voice - Listens for events from other channels
- Injects messages into
contextas:[Real-time update from {channel}] {role}: {content} - Runs in background task until call ends
Code:
event_listener_task = await start_session_event_listener(
session_id=session_id,
call_sid=call_sid,
context=context # LLMContext object
)
Injection format:
if event.event_type == SessionEventType.MESSAGE:
formatted_msg = f"[Real-time update from {event.channel}] {role}: {content}"
context.add_user_message(formatted_msg)
stop_session_event_listener(call_sid)
Stops the SSE listener when call ends.
await stop_session_event_listener(call_sid)
publish_voice_message(session_id, interaction_id, role, content)
Publishes voice messages to the session so other channels receive them.
await publish_voice_message(
session_id=session_id,
interaction_id=interaction_id,
role="user",
content="I need flowers for my mom"
)
Published as: SessionEvent with channel="voice"
Call flow:
1. User speaks → transcribed by Whisper
2. Transcription added to LLMContext
3. publish_voice_message() broadcasts to Redis
4. Web chat (if open) receives event via SSE
5. LLM generates response
6. Response spoken via Piper TTS
7. publish_voice_message() broadcasts assistant response
8. Web chat displays assistant response
Web Chat Channel
Located in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/main.py
On chat start:
- Resolve session via
POST /api/sessions/resolve - Attach interaction via
POST /api/sessions/{id}/interactions - Fetch history via
GET /api/sessions/{id}/context - Open SSE connection to
GET /api/sessions/{id}/events?channel=chat - Display conversation history in UI
On user message:
- User types message in web UI
- Frontend POSTs to
/api/chatendpoint - Backend calls
session_manager.publish_message()immediately - Message published to Redis (voice receives it if active)
- Message persisted to database
- LLM processes message
- Assistant response generated
- Response published via
session_manager.publish_message() - Response sent back to web client
On SSE event received:
// Frontend JavaScript
eventSource.onmessage = (event) => {
const sessionEvent = JSON.parse(event.data);
if (sessionEvent.event_type === 'MESSAGE') {
const { role, content } = sessionEvent.data;
displayMessage(role, content, sessionEvent.channel); // Show with channel badge
}
};
Example display:
User (voice): I need flowers for my mom
Assistant (voice): I'd be happy to help!
User (chat): Show me photos of tulips
Assistant (chat): [displays image gallery]
Session Lifecycle
State Transitions
┌──────────────┐
│ created │
└──────┬───────┘
│
▼
┌──────────────┐
┌─────────►│ active │◄─────────┐
│ └──────┬───────┘ │
│ │ │
│ no active │ new │
│ interactions │ interaction │
│ ▼ │
│ ┌──────────────┐ │
│ │ idle │──────────┘
│ │ (resumable) │
│ └──────┬───────┘
│ │
│ exceeded │
│ resume_window│
│ ▼
│ ┌──────────────┐
└──────────│ closed │
explicit close └──────────────┘
Multi-Channel Active State
A session is active if ANY channel has an active interaction.
Example:
Session ABC123:
- Interaction 1: voice (active)
- Interaction 2: chat (active)
Status: active
A session is idle if ALL interactions are closed but within resume window.
Example:
Session ABC123:
- Interaction 1: voice (ended 2 hours ago)
- Interaction 2: chat (ended 2 hours ago)
Resume window: 24 hours
Status: idle (resumable)
Resume Behavior
Within Resume Window:
- User starts new interaction on any channel
SessionManager.resolve_or_create_session()finds existing idle session- Session transitions to
active - Conversation history restored
- New interaction attached
- Real-time events start flowing
Outside Resume Window:
- Old session marked as
closed - New session created
- Fresh conversation starts
- Previous history archived but not restored
Event Types
Defined in SessionEventType enum:
| Event Type | Description | Data Fields |
|---|---|---|
MESSAGE | Chat message from user or assistant | role, content, interaction_id, metadata |
CONTEXT_UPDATE | Session context changed (preferences, etc.) | Updated context fields |
CHANNEL_JOINED | New channel interaction started | interaction_id, channel_ref |
CHANNEL_LEFT | Channel interaction ended | interaction_id |
TYPING | User is typing indicator | is_typing (boolean) |
TOOL_CALL | LLM tool/function call started | tool_name, arguments |
TOOL_RESULT | Tool/function call result | tool_name, result |
Example events:
# MESSAGE event
SessionEvent(
event_type=SessionEventType.MESSAGE,
session_id=session_id,
channel="chat",
data={
"role": "user",
"content": "Hello",
"interaction_id": "...",
"metadata": {}
}
)
# TYPING event
SessionEvent(
event_type=SessionEventType.TYPING,
session_id=session_id,
channel="voice",
data={"is_typing": True}
)
# TOOL_CALL event
SessionEvent(
event_type=SessionEventType.TOOL_CALL,
session_id=session_id,
channel="chat",
data={
"tool_name": "search_florists",
"arguments": {"location": "Austin, TX"}
}
)
Data Flow Diagrams
Real-Time Multi-Channel Flow
┌─────────────────┐ ┌─────────────────┐
│ Voice Channel │ │ Chat Channel │
│ (on phone) │ │ (web browser) │
└────────┬────────┘ └────────┬────────┘
│ │
│ 1. User speaks │
│ "I need flowers" │
│ │
▼ │
publish_voice_message() │
│ │
▼ │
session_manager.publish_message() │
│ │
├──► Redis PUBLISH ──────────────────►│
│ session:ABC:events │
│ ▼
│ SSE receives event
│ │
│ Display: "User (voice): I need flowers"
│ │
│ │ 2. User types in chat
│ │ "Show me photos"
│ │
│ ▼
│ POST /api/sessions/ABC/messages
│ │
│ ▼
│ session_manager.publish_message()
│ │
│◄─────────────────── Redis PUBLISH ──┤
│ session:ABC:events │
│ │
▼ │
SSE receives event │
│ │
Inject: "[Real-time update from chat] │
user: Show me photos" │
│ │
▼ │
LLM processes with context │
from BOTH channels │
│ │
▼ │
Response generated │
"I see you're looking at chat..." │
│ │
▼ │
publish_voice_message() │
│ │
├──► Redis PUBLISH ──────────────────►│
│ session:ABC:events │
│ ▼
│ Display: "Assistant (voice): I see..."
│ │
SSE Subscription Flow
Client (voice channel)
│
▼
GET /api/sessions/{session_id}/events?channel=voice
│
▼
SessionManager.subscribe_to_session(session_id, "voice")
│
▼
Redis SUBSCRIBE session:{session_id}:events
│
└─► Async generator yields events
│
├─► Filter out events where channel="voice"
│
└─► Yield events from other channels
│
▼
Client receives SSE stream
Database + Redis Flow
publish_message() called
│
├──► 1. _append_to_history()
│ │
│ ▼
│ UPDATE sessions
│ SET context = jsonb_set(...)
│ WHERE session_id = ...
│ │
│ └─► Database persistence (immediate)
│
└──► 2. publish_event()
│
▼
Redis PUBLISH
session:{id}:events
│
└─► All subscribed channels
receive event (real-time)
Enabling the System
Step 1: Database Migration
The database tables are already created if you've run recent migrations. Verify:
ssh ravenhelm@100.115.101.81 "docker exec -i postgres psql -U ravenhelm -d ravenmaskos -c '\dt sessions*'"
You should see:
sessionssession_interactionssession_eventschannel_consent
Step 2: Ensure Redis is Running
The real-time event system requires Redis.
ssh ravenhelm@100.115.101.81 "docker ps | grep redis"
If Redis is not running, start it:
ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/compose && docker compose up -d redis"
Step 3: Enable Feature Flag
Edit the Norns environment file:
ssh ravenhelm@100.115.101.81 "nano ~/ravenhelm/.env.hlidskjalf"
Add or update:
NORNS_SESSION_SYSTEM=true
NORNS_SESSION_RESUME_WINDOW=24 hours
NORNS_SESSION_MAX_HISTORY=50
# Redis configuration (required)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=your_redis_password
Step 4: Restart Services
Restart Norns and Telephony services:
ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/docs/AI-ML-Platform/norns-agent && docker compose restart"
ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/services/telephony && docker compose restart"
Step 5: Verify
Check that the feature is enabled:
ssh ravenhelm@100.115.101.81 "docker logs norns-agent 2>&1 | grep -i session"
ssh ravenhelm@100.115.101.81 "docker logs telephony 2>&1 | grep -i session"
You should see log entries indicating the session system and Redis pub/sub are active.
Step 6: Test Real-Time Events
Test concurrent multi-channel:
- Call Norns: +1 (737) 214-3330
- While on call, open web chat
- Send a message in web chat
- Observe voice assistant acknowledges the chat message
- Say something on voice
- Observe web chat displays the voice message in real-time
Test via API:
# Terminal 1: Subscribe to SSE
curl -N "https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=voice"
# Terminal 2: Publish a message
curl -X POST "https://norns.ravenhelm.dev/api/sessions/{session_id}/messages" \
-H "Content-Type: application/json" \
-d '{
"channel": "chat",
"role": "user",
"content": "Test message",
"interaction_id": "test-123",
"metadata": {}
}'
# Terminal 1 should receive the event immediately
Operational Guide
Monitoring
Active Sessions:
SELECT COUNT(*)
FROM sessions
WHERE status = 'active';
Concurrent Multi-Channel Sessions:
SELECT
s.session_id,
s.user_id,
COUNT(si.interaction_id) as active_channels,
array_agg(si.channel) as channels
FROM sessions s
JOIN session_interactions si ON s.session_id = si.session_id
WHERE s.status = 'active' AND si.status = 'active'
GROUP BY s.session_id, s.user_id
HAVING COUNT(si.interaction_id) > 1;
Session Duration:
SELECT
session_id,
user_id,
created_at,
last_activity,
last_activity - created_at AS duration
FROM sessions
WHERE status IN ('active', 'idle')
ORDER BY last_activity DESC
LIMIT 10;
Interactions by Channel:
SELECT
channel,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'active') as active
FROM session_interactions
GROUP BY channel;
Redis Pub/Sub Monitoring:
ssh ravenhelm@100.115.101.81 "docker exec -it redis redis-cli"
> PUBSUB CHANNELS session:*
> PUBSUB NUMSUB session:{session_id}:events
Maintenance
Close Expired Sessions (Manual):
UPDATE sessions
SET status = 'closed', closed_at = NOW()
WHERE status IN ('active', 'idle', 'resumable')
AND last_activity <= NOW() - resume_window;
This should be automated via a background task in production.
Cleanup Old Events:
DELETE FROM session_events
WHERE timestamp < NOW() - INTERVAL '90 days';
Monitor Redis Memory:
ssh ravenhelm@100.115.101.81 "docker exec redis redis-cli INFO memory"
Troubleshooting
Session Not Resuming
-
Check if resume window has passed:
SELECT
session_id,
status,
last_activity,
resume_window,
NOW() - last_activity AS time_since_activity
FROM sessions
WHERE user_id = '<user_id>'; -
Check for multiple sessions (shouldn't happen):
SELECT session_id, status, created_at, last_activity
FROM sessions
WHERE user_id = '<user_id>'
ORDER BY last_activity DESC;
Real-Time Events Not Received
-
Check Redis connectivity:
ssh ravenhelm@100.115.101.81 "docker exec norns-agent ping -c 3 redis" -
Check Redis pub/sub:
ssh ravenhelm@100.115.101.81 "docker exec -it redis redis-cli"
> PUBSUB CHANNELS session:*
> PUBSUB NUMSUB session:{session_id}:events -
Check SSE connection:
curl -N "https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=test" \
-H "Authorization: Bearer <token>" -
Check Norns logs:
ssh ravenhelm@100.115.101.81 "docker logs --tail 100 -f norns-agent | grep -i redis"
Messages Not Persisting
-
Check session context:
SELECT context
FROM sessions
WHERE session_id = '<session_id>'; -
Check interaction close events:
SELECT *
FROM session_events
WHERE session_id = '<session_id>'
AND event_type = 'interaction_ended'
ORDER BY timestamp DESC; -
Check for database errors in logs:
ssh ravenhelm@100.115.101.81 "docker logs norns-agent 2>&1 | grep -i 'database\|error'"
SSE Connection Drops
SSE connections can drop due to network issues or timeouts.
Client-side reconnection logic:
let eventSource;
let reconnectDelay = 1000;
const maxReconnectDelay = 30000;
function connectSSE() {
eventSource = new EventSource(`/api/sessions/${sessionId}/events?channel=chat`);
eventSource.onopen = () => {
console.log('SSE connected');
reconnectDelay = 1000; // Reset delay
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
// Exponential backoff reconnect
setTimeout(connectSSE, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
};
eventSource.onmessage = handleEvent;
}
Performance Considerations
Indexes: The schema includes optimized indexes for:
- User lookup by active sessions
- Session history queries
- Event log searches
- Channel-based queries
Conversation History Size:
History is limited to NORNS_SESSION_MAX_HISTORY messages (default: 50) to prevent unbounded growth. Older messages are trimmed automatically during sync.
Redis Pub/Sub Scalability:
- Redis pub/sub is highly performant (handles millions of messages/sec)
- Each session has its own channel, avoiding global bottlenecks
- No message persistence in Redis (only in PostgreSQL)
Database Load: Each interaction creates:
- 1 row in
session_interactions - 2-4 rows in
session_events(start, end, sync events) - 1 update to
sessions.contextper message
For high-volume deployments, consider:
- Partitioning
session_eventsby timestamp - Archiving closed sessions older than 90 days
- Caching active session contexts in Redis (in addition to pub/sub)
- Read replicas for analytics queries
SSE Connection Limits:
- Each SSE connection holds an open HTTP connection
- Typical server can handle 10,000+ concurrent SSE connections
- Use connection pooling and load balancing for scale
Related Documentation
Last Updated: 2026-01-04
Architecture Version: 2.0 (Event-Driven Real-Time)
Related Issues: RAV-108
WhatsApp Channel
Located in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/whatsapp/
The WhatsApp channel integrates with Twilio's WhatsApp API, supporting the same multi-channel session paradigm as voice and chat.
Key Differences from Other Channels:
| Aspect | Voice/Chat | |
|---|---|---|
| Session Windows | 24-hour rule (WhatsApp requirement) | Configurable resume_window |
| Proactive Messages | Requires approved templates | Free-form allowed |
| Rich Messaging | Buttons (max 3), Lists (max 10) | Varies by channel |
| User Resolution | By phone number (E.164) | Phone or user_id |
Architecture:
WhatsApp Message
│
▼
POST /whatsapp/webhook
│
├──► Verify Twilio signature
│
├──► resolve_user_by_phone()
│
├──► WhatsAppSessionTracker.record_inbound()
│ │
│ └──► Extends 24-hour window
│
├──► SessionManager.resolve_or_create_session()
│
├──► process_message() ──► LangGraph Agent
│
├──► SessionManager.publish_message()
│ │
│ ├──► Redis pub/sub ──► Other channels
│ │
│ └──► Database persist
│
└──► WhatsAppMessageBuilder.build_response()
│
└──► Twilio API ──► User's WhatsApp
Session Window Tracking:
WhatsApp has a 24-hour customer service window rule managed by WhatsAppSessionTracker:
# Check if user is in session window
in_window = await session_tracker.is_in_session_window("+1234567890")
if in_window:
# Can send free-form messages
await client.send_text(to_number, "Any message here")
else:
# Must use approved template
await template_manager.send_template(to_number, "task_reminder", variables)
Database Table: whatsapp_sessions
CREATE TABLE whatsapp_sessions (
session_id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(user_id),
phone_number VARCHAR(20) NOT NULL UNIQUE,
window_expires_at TIMESTAMPTZ,
last_inbound_at TIMESTAMPTZ,
created_at TIMESTAMPTZ
);
Publishing Messages:
When WhatsApp receives a message or sends a response, it publishes to the multi-channel session:
await session_manager.publish_message(
session_id=session_id,
channel="whatsapp",
role="user",
content=message_body,
interaction_id=interaction_id,
metadata={"message_sid": message_sid, "phone": from_number}
)
Cross-Channel Example:
User (voice): "Add milk to my shopping list"
→ Voice publishes to session → WhatsApp receives via SSE
User (whatsapp): "Also add bread"
→ WhatsApp publishes to session → Voice receives via SSE
Both channels now have full conversation context.
Proactive Messaging:
For scheduled reminders outside the 24-hour window, use templates via n8n or API:
curl -X POST "https://norns-pm.ravenhelm.dev/api/whatsapp/send" \
-H "X-API-Key: $NORNS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"user_id": "...",
"template_name": "task_reminder",
"variables": {"task_name": "Buy milk", "due_time": "in 1 hour"}
}'
Available Templates:
| Template | Variables | Use Case |
|---|---|---|
task_reminder | task_name, due_time | Task due reminders |
daily_briefing | date, task_count, top_tasks | Morning briefings |
calendar_reminder | event_name, time, location | Calendar alerts |
task_complete_confirm | task_name | Completion confirmation |
weekly_review | completed_count, highlights | Weekly summary |
See [[AI-ML-Platform/Norns-WhatsApp-Channel|WhatsApp Channel Documentation]] for complete details.