Skip to main content

Multi-Channel Session Memory System

Real-time event-driven conversation continuity across voice, web chat, and messaging channels

The Multi-Channel Session Memory System enables Norns to maintain conversation context across different communication channels with real-time concurrent multi-channel support. Users can have a voice conversation while simultaneously chatting on the web, with both channels receiving live updates from each other.


Table of Contents


Overview

What It Does

The Multi-Channel Session Memory System provides:

  • One Session Per User: Each user has a single active session at any time
  • Real-Time Multi-Channel Communication: Concurrent conversations across multiple channels with live synchronization
  • Event-Driven Architecture: Redis pub/sub broadcasts events to all active channels
  • Cross-Channel Memory: Context persists across voice, web chat, SMS, and other channels
  • Automatic Resume: Sessions automatically resume within a configurable time window (default: 24 hours)
  • Live Context Updates: Changes in one channel immediately visible in all other active channels
  • Channel Consent Tracking: Manages opt-in/opt-out for proactive messaging

Key Capabilities

FeatureDescription
Session ResolutionAutomatically find or create sessions based on user identity
Real-Time EventsSSE endpoint streams events to all connected channels
Redis Pub/SubCentral message bus for cross-channel communication
Context PreservationStore and retrieve conversation state, preferences, and history
Interaction TrackingTrack individual channel interactions within a session
Live History SyncConversation history updates in real-time across channels
Concurrent ChannelsMultiple channels can be active simultaneously on one session
Consent ManagementTrack user consent for different communication channels
Event LoggingAudit trail of all session events

The Florist Scenario

NEW: Real-time concurrent multi-channel conversation

Scenario: Voice Call + Web Chat Simultaneously

8:00 AM - User calls Norns via phone:

User (voice): "Hey Norns, I need to send flowers to my mom for her birthday."
Norns (voice): "I'd be happy to help! What type of flowers does she like?"

8:01 AM - While still on the phone, user opens web chat:

User (web): "Can you show me photos of tulip arrangements?"

What happens:

  1. Web chat publishes message to Redis: session:{id}:events
  2. Voice channel receives real-time event via SSE
  3. Voice channel injects into LLMContext: [Real-time update from chat] user: Can you show me photos...
  4. Norns (voice) responds with context from both channels

8:02 AM - Voice assistant responds:

Norns (voice): "I see you're also looking at the web chat. I'll display some tulip 
arrangements there for you. Which color does she prefer?"
User (voice): "Pink tulips would be perfect."

8:02 AM - Voice message published to Redis:

[Voice channel publishes to session:{id}:events]
Web chat receives event and displays:
User (voice): "Pink tulips would be perfect."

8:03 AM - Web shows search results:

Norns (web chat): Here are three florists with pink tulips:
[images of arrangements displayed]

8:05 AM - User continues on voice:

User (voice): "I'll go with Bloom & Petal."
Norns (voice): "Perfect! I've noted that. You can complete the order on the web
when you're ready."

Both channels see full conversation history in real-time. The session persists across voice and web simultaneously, with each channel receiving live updates from the other.


Architecture

Real-Time Event System

The system uses an event-driven architecture with Redis pub/sub for real-time cross-channel communication.

Key Design:

  • Each session has a Redis channel: session:{session_id}:events
  • All active channels subscribe to the session's event stream via SSE
  • When any channel publishes a message, all other channels receive it instantly
  • Messages are both broadcast via Redis AND persisted to the database

Redis Pub/Sub

Channel Format: session:{session_id}:events

Event Flow:

Web Chat publishes message


session_manager.publish_message()

├─► Redis PUBLISH → session:{session_id}:events

└─► Database INSERT → conversation_history


Voice channel (subscribed via SSE)


Receives SessionEvent


Injects into LLMContext as:
"[Real-time update from chat] user: ..."

Benefits:

  • No polling: SSE provides efficient server-push
  • Real-time: Sub-second latency for cross-channel updates
  • Scalable: Redis pub/sub handles thousands of concurrent sessions
  • Persistent: All events are also written to database for history

Component Overview

┌─────────────────────────────────────────────────────────────────────┐
│ Channel Interfaces │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Telephony │ │ Web Chat │ │ SMS │ │
│ │ Service │ │ /api/chat │ │ (future) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼────────────────────┘
│ │ │
│ SSE subscribe │ publish │
│ GET /events │ POST /messages │
│ │ │
└──────────┬───────┴─────────┬────────┘
│ │
▼ ▼
┌─────────────────────────────────┐
│ SessionManager Service │
│ (session_manager.py) │
│ │
│ • publish_event() │
│ • publish_message() │
│ • subscribe_to_session() │
│ • create_event_listener() │
└─────────────┬───────────────────┘

┌─────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Redis │ │ sessions │ │ session_ │
│ pub/sub │ │ DB │ │ events │
│ │ │ │ │ DB │
└──────────┘ └──────────┘ └──────────┘

│ session:{id}:events


All subscribed channels

Database Schema

Core Tables

sessions

Stores user sessions with context and resume windows.

CREATE TABLE sessions (
session_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
status VARCHAR(20) NOT NULL DEFAULT 'active',
context JSONB NOT NULL DEFAULT '{}',
resume_window INTERVAL NOT NULL DEFAULT '24 hours',
created_at TIMESTAMPTZ DEFAULT NOW(),
last_activity TIMESTAMPTZ DEFAULT NOW(),
closed_at TIMESTAMPTZ
);

-- Indexes
CREATE INDEX idx_sessions_user_active
ON sessions(user_id)
WHERE status IN ('active', 'idle', 'resumable');

CREATE INDEX idx_sessions_last_activity
ON sessions(last_activity DESC);

-- Constraint: Only one active/idle/resumable session per user
CREATE UNIQUE INDEX one_active_session_per_user
ON sessions(user_id)
WHERE status IN ('active', 'idle', 'resumable');

Session States:

  • active: Has one or more active channel interactions
  • idle: No active interactions, but within resume window
  • resumable: Alias for idle (used interchangeably)
  • closed: Explicitly closed or expired beyond resume window

Context Schema:

{
"conversation_history": [
{"role": "user", "content": "Hey Norns, I need flowers", "channel": "voice"},
{"role": "assistant", "content": "I'd be happy to help!", "channel": "voice"},
{"role": "user", "content": "Show me photos", "channel": "chat"}
],
"last_channel": "voice",
"preferences": {},
"custom_data": {}
}

Note: Messages in history now include optional channel field to track origin.

session_interactions

Tracks individual channel interactions within a session.

CREATE TABLE session_interactions (
interaction_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(session_id) ON DELETE CASCADE,
channel VARCHAR(50) NOT NULL,
channel_ref VARCHAR(255), -- External ID (CallSid, message ID)
status VARCHAR(20) NOT NULL DEFAULT 'active',
started_at TIMESTAMPTZ DEFAULT NOW(),
ended_at TIMESTAMPTZ,
legacy_interaction_id UUID REFERENCES agent_interactions(interaction_id),
metadata JSONB DEFAULT '{}'
);

-- Indexes
CREATE INDEX idx_session_interactions_session
ON session_interactions(session_id, status);

CREATE INDEX idx_session_interactions_channel_ref
ON session_interactions(channel_ref)
WHERE channel_ref IS NOT NULL;

Channels:

  • voice: Phone calls via telephony service
  • chat: Web chat interface
  • sms: Text messages
  • slack: Slack messages
  • email: Email interactions

Multiple Concurrent Interactions: A session can have multiple active interactions simultaneously (e.g., voice + chat).

session_events

Audit log for all session events.

CREATE TABLE session_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(session_id) ON DELETE CASCADE,
interaction_id UUID REFERENCES session_interactions(interaction_id),
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
timestamp TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_session_events_session
ON session_events(session_id, timestamp DESC);

CREATE INDEX idx_session_events_type
ON session_events(event_type);

Event Types: See Event Types section.

Tracks user consent for different communication channels.

CREATE TABLE channel_consent (
consent_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
channel_type VARCHAR(50) NOT NULL,
opted_in BOOLEAN NOT NULL DEFAULT FALSE,
opted_in_at TIMESTAMPTZ,
opted_out_at TIMESTAMPTZ,
consent_source VARCHAR(100), -- 'web_form', 'voice_prompt', 'sms_reply'
proactive_allowed BOOLEAN NOT NULL DEFAULT FALSE,
proactive_allowed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),

UNIQUE(user_id, channel_type)
);

-- Index
CREATE INDEX idx_channel_consent_user ON channel_consent(user_id);

Configuration

Environment variables control the session system behavior.

VariableDefaultDescription
NORNS_SESSION_SYSTEMfalseFeature flag to enable session system
NORNS_SESSION_RESUME_WINDOW24 hoursHow long sessions remain resumable
NORNS_SESSION_MAX_HISTORY50Max conversation messages to keep
REDIS_HOSTlocalhostRedis server host
REDIS_PORT6379Redis server port
REDIS_PASSWORD(empty)Redis authentication password

Example Configuration:

# In ~/ravenhelm/.env.hlidskjalf
NORNS_SESSION_SYSTEM=true
NORNS_SESSION_RESUME_WINDOW=24 hours
NORNS_SESSION_MAX_HISTORY=50

# Redis (required for real-time events)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=your_redis_password

Resume Window Formats:

  • PostgreSQL interval syntax: '24 hours', '30 minutes', '7 days'
  • Can be overridden per-session in API calls

API Endpoints

Session Management

POST /api/sessions/resolve

Resolve or create a session for a user.

curl -X POST https://norns.ravenhelm.dev/api/sessions/resolve \
-H "Content-Type: application/json" \
-d '{
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"resume_window": "12 hours"
}'

Response:

{
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "active",
"created_at": "2026-01-04T10:00:00Z",
"last_activity": "2026-01-04T10:00:00Z",
"resumed": false
}

GET /api/users/by-phone/{phone}

Resolve user by phone number.

curl https://norns.ravenhelm.dev/api/users/by-phone/+15125550123

Response:

{
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"display_name": "Nate Walker",
"phone_number": "+15125550123",
"email": "nate@example.com"
}

POST /api/sessions/{session_id}/interactions

Attach a new channel interaction to a session.

curl -X POST https://norns.ravenhelm.dev/api/sessions/{session_id}/interactions \
-H "Content-Type: application/json" \
-d '{
"channel": "voice",
"channel_ref": "CA1234567890abcdef",
"metadata": {
"call_duration": 120,
"from_number": "+15125550123"
}
}'

Response:

{
"interaction_id": "a1b2c3d4-e5f6-4789-a012-345678901234",
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"channel": "voice",
"status": "active",
"started_at": "2026-01-04T10:05:00Z"
}

POST /api/sessions/interactions/{interaction_id}/close

Close an interaction and optionally sync final conversation history.

curl -X POST https://norns.ravenhelm.dev/api/sessions/interactions/{interaction_id}/close \
-H "Content-Type: application/json" \
-d '{
"final_history": [
{"role": "user", "content": "Thanks!"},
{"role": "assistant", "content": "You're welcome!"}
]
}'

GET /api/sessions/{session_id}/context

Get the full session context.

curl https://norns.ravenhelm.dev/api/sessions/{session_id}/context

Response:

{
"conversation_history": [
{"role": "user", "content": "I need flowers", "channel": "voice"},
{"role": "assistant", "content": "I'd be happy to help!", "channel": "voice"}
],
"last_channel": "voice",
"preferences": {}
}

Real-Time Events

GET /api/sessions/{session_id}/events

Server-Sent Events (SSE) endpoint for real-time session updates.

This endpoint streams events to connected clients whenever any channel publishes to the session.

curl -N https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=voice

Query Parameters:

  • channel (required): The subscribing channel name (e.g., "voice", "chat", "sms")

Event Stream Format (SSE):

data: {"event_type": "MESSAGE", "session_id": "7c9e...", "channel": "chat", "data": {"role": "user", "content": "Hello", "interaction_id": "a1b2...", "metadata": {}}}

data: {"event_type": "TYPING", "session_id": "7c9e...", "channel": "voice", "data": {"is_typing": true}}

data: {"event_type": "CONTEXT_UPDATE", "session_id": "7c9e...", "channel": "chat", "data": {"preferences": {"florist": "Bloom & Petal"}}}

Filtering:

  • Events published by the subscribing channel are filtered out (you don't receive your own events)
  • Only events from OTHER channels are delivered

Connection Management:

  • Clients should reconnect on disconnect
  • Timeouts handled by SSE keep-alive (sent every 15 seconds)

Example JavaScript Client:

const eventSource = new EventSource(
`https://norns.ravenhelm.dev/api/sessions/${sessionId}/events?channel=chat`
);

eventSource.onmessage = (event) => {
const sessionEvent = JSON.parse(event.data);
console.log(`Received from ${sessionEvent.channel}:`, sessionEvent);

if (sessionEvent.event_type === 'MESSAGE') {
displayMessage(sessionEvent.data.role, sessionEvent.data.content);
}
};

eventSource.onerror = (error) => {
console.error('SSE error:', error);
// Reconnect logic
};

Message Publishing

POST /api/sessions/{session_id}/messages

Publish a message to all channels subscribed to the session.

This is the primary API for sending messages in a multi-channel session.

curl -X POST https://norns.ravenhelm.dev/api/sessions/{session_id}/messages \
-H "Content-Type: application/json" \
-d '{
"channel": "chat",
"role": "user",
"content": "Can you show me photos of tulips?",
"interaction_id": "a1b2c3d4-e5f6-4789-a012-345678901234",
"metadata": {
"timestamp": "2026-01-04T10:05:30Z"
}
}'

Request Body:

  • channel (required): Source channel (e.g., "voice", "chat", "sms")
  • role (required): Message role ("user" or "assistant")
  • content (required): Message text
  • interaction_id (required): Interaction ID from the channel
  • metadata (optional): Additional context (timestamps, references, etc.)

Response:

{
"success": true,
"session_id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"message_published": true
}

What Happens:

  1. Message is added to session's conversation_history in database
  2. SessionEvent with type MESSAGE is published to Redis channel session:{session_id}:events
  3. All channels subscribed via /events endpoint receive the event (except the publishing channel)
  4. session_events table logs the message

SessionManager Service

Located at /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/session_manager.py

The SessionManager is the core service managing all multi-channel session logic.

Key Methods

async publish_event(event: SessionEvent) -> None

Publish a session event to Redis pub/sub.

event = SessionEvent(
event_type=SessionEventType.MESSAGE,
session_id=session_id,
channel="chat",
data={
"role": "user",
"content": "Hello",
"interaction_id": str(interaction_id),
"metadata": {}
}
)
await session_manager.publish_event(event)

Broadcast to: session:{session_id}:events

async publish_message(...) -> None

Publish a message AND persist to conversation history.

await session_manager.publish_message(
session_id=session_id,
channel="voice",
role="assistant",
content="I'd be happy to help!",
interaction_id=interaction_id,
metadata={"timestamp": "2026-01-04T10:00:00Z"}
)

Does two things:

  1. Calls _append_to_history() to persist in database
  2. Calls publish_event() to broadcast via Redis

async subscribe_to_session(session_id, my_channel) -> AsyncGenerator

Subscribe to a session's event stream, filtering out own channel's events.

async for event in session_manager.subscribe_to_session(session_id, my_channel="voice"):
if event.event_type == SessionEventType.MESSAGE:
print(f"Received message from {event.channel}: {event.data['content']}")

Yields: SessionEvent objects from other channels Filters: Events where event.channel == my_channel

async create_event_listener(session_id, my_channel, callback) -> asyncio.Task

Start a background task that listens for events and calls a callback.

async def on_event(event: SessionEvent):
print(f"Event from {event.channel}: {event.event_type}")

task = await session_manager.create_event_listener(
session_id=session_id,
my_channel="voice",
callback=on_event
)

Returns: asyncio.Task that runs until stopped Used by: Telephony service to inject events into LLMContext

async stop_event_listener(session_id, channel) -> None

Stop a running event listener task.

await session_manager.stop_event_listener(session_id, channel="voice")

Cleanup: Cancels the background task and removes from tracking dict

async _append_to_history(session_id, role, content, channel) -> None

Real-time history persistence.

Immediately appends a message to the session's conversation_history in the database.

await session_manager._append_to_history(
session_id=session_id,
role="user",
content="Show me tulips",
channel="chat"
)

Database update:

UPDATE sessions
SET context = jsonb_set(
context,
'{conversation_history}',
context->'conversation_history' || '[{"role": "user", "content": "Show me tulips", "channel": "chat"}]'::jsonb
),
last_activity = NOW()
WHERE session_id = $1

History limit: Trims to NORNS_SESSION_MAX_HISTORY messages


Channel Integration

Voice Channel (Telephony)

Located in /Users/ravenhelm/ravenhelm/services/telephony/main.py

start_session_event_listener(session_id, call_sid, context)

Subscribes to the session's SSE event stream and injects events into the LLMContext.

When called: At start of voice call, after session resolution

What it does:

  1. Opens SSE connection to GET /api/sessions/{session_id}/events?channel=voice
  2. Listens for events from other channels
  3. Injects messages into context as: [Real-time update from {channel}] {role}: {content}
  4. Runs in background task until call ends

Code:

event_listener_task = await start_session_event_listener(
session_id=session_id,
call_sid=call_sid,
context=context # LLMContext object
)

Injection format:

if event.event_type == SessionEventType.MESSAGE:
formatted_msg = f"[Real-time update from {event.channel}] {role}: {content}"
context.add_user_message(formatted_msg)

stop_session_event_listener(call_sid)

Stops the SSE listener when call ends.

await stop_session_event_listener(call_sid)

publish_voice_message(session_id, interaction_id, role, content)

Publishes voice messages to the session so other channels receive them.

await publish_voice_message(
session_id=session_id,
interaction_id=interaction_id,
role="user",
content="I need flowers for my mom"
)

Published as: SessionEvent with channel="voice"

Call flow:

1. User speaks → transcribed by Whisper
2. Transcription added to LLMContext
3. publish_voice_message() broadcasts to Redis
4. Web chat (if open) receives event via SSE
5. LLM generates response
6. Response spoken via Piper TTS
7. publish_voice_message() broadcasts assistant response
8. Web chat displays assistant response

Web Chat Channel

Located in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/main.py

On chat start:

  1. Resolve session via POST /api/sessions/resolve
  2. Attach interaction via POST /api/sessions/{id}/interactions
  3. Fetch history via GET /api/sessions/{id}/context
  4. Open SSE connection to GET /api/sessions/{id}/events?channel=chat
  5. Display conversation history in UI

On user message:

  1. User types message in web UI
  2. Frontend POSTs to /api/chat endpoint
  3. Backend calls session_manager.publish_message() immediately
  4. Message published to Redis (voice receives it if active)
  5. Message persisted to database
  6. LLM processes message
  7. Assistant response generated
  8. Response published via session_manager.publish_message()
  9. Response sent back to web client

On SSE event received:

// Frontend JavaScript
eventSource.onmessage = (event) => {
const sessionEvent = JSON.parse(event.data);

if (sessionEvent.event_type === 'MESSAGE') {
const { role, content } = sessionEvent.data;
displayMessage(role, content, sessionEvent.channel); // Show with channel badge
}
};

Example display:

User (voice): I need flowers for my mom
Assistant (voice): I'd be happy to help!
User (chat): Show me photos of tulips
Assistant (chat): [displays image gallery]

Session Lifecycle

State Transitions

                     ┌──────────────┐
│ created │
└──────┬───────┘


┌──────────────┐
┌─────────►│ active │◄─────────┐
│ └──────┬───────┘ │
│ │ │
│ no active │ new │
│ interactions │ interaction │
│ ▼ │
│ ┌──────────────┐ │
│ │ idle │──────────┘
│ │ (resumable) │
│ └──────┬───────┘
│ │
│ exceeded │
│ resume_window│
│ ▼
│ ┌──────────────┐
└──────────│ closed │
explicit close └──────────────┘

Multi-Channel Active State

A session is active if ANY channel has an active interaction.

Example:

Session ABC123:
- Interaction 1: voice (active)
- Interaction 2: chat (active)
Status: active

A session is idle if ALL interactions are closed but within resume window.

Example:

Session ABC123:
- Interaction 1: voice (ended 2 hours ago)
- Interaction 2: chat (ended 2 hours ago)
Resume window: 24 hours
Status: idle (resumable)

Resume Behavior

Within Resume Window:

  • User starts new interaction on any channel
  • SessionManager.resolve_or_create_session() finds existing idle session
  • Session transitions to active
  • Conversation history restored
  • New interaction attached
  • Real-time events start flowing

Outside Resume Window:

  • Old session marked as closed
  • New session created
  • Fresh conversation starts
  • Previous history archived but not restored

Event Types

Defined in SessionEventType enum:

Event TypeDescriptionData Fields
MESSAGEChat message from user or assistantrole, content, interaction_id, metadata
CONTEXT_UPDATESession context changed (preferences, etc.)Updated context fields
CHANNEL_JOINEDNew channel interaction startedinteraction_id, channel_ref
CHANNEL_LEFTChannel interaction endedinteraction_id
TYPINGUser is typing indicatoris_typing (boolean)
TOOL_CALLLLM tool/function call startedtool_name, arguments
TOOL_RESULTTool/function call resulttool_name, result

Example events:

# MESSAGE event
SessionEvent(
event_type=SessionEventType.MESSAGE,
session_id=session_id,
channel="chat",
data={
"role": "user",
"content": "Hello",
"interaction_id": "...",
"metadata": {}
}
)

# TYPING event
SessionEvent(
event_type=SessionEventType.TYPING,
session_id=session_id,
channel="voice",
data={"is_typing": True}
)

# TOOL_CALL event
SessionEvent(
event_type=SessionEventType.TOOL_CALL,
session_id=session_id,
channel="chat",
data={
"tool_name": "search_florists",
"arguments": {"location": "Austin, TX"}
}
)

Data Flow Diagrams

Real-Time Multi-Channel Flow

┌─────────────────┐                    ┌─────────────────┐
│ Voice Channel │ │ Chat Channel │
│ (on phone) │ │ (web browser) │
└────────┬────────┘ └────────┬────────┘
│ │
│ 1. User speaks │
│ "I need flowers" │
│ │
▼ │
publish_voice_message() │
│ │
▼ │
session_manager.publish_message() │
│ │
├──► Redis PUBLISH ──────────────────►│
│ session:ABC:events │
│ ▼
│ SSE receives event
│ │
│ Display: "User (voice): I need flowers"
│ │
│ │ 2. User types in chat
│ │ "Show me photos"
│ │
│ ▼
│ POST /api/sessions/ABC/messages
│ │
│ ▼
│ session_manager.publish_message()
│ │
│◄─────────────────── Redis PUBLISH ──┤
│ session:ABC:events │
│ │
▼ │
SSE receives event │
│ │
Inject: "[Real-time update from chat] │
user: Show me photos" │
│ │
▼ │
LLM processes with context │
from BOTH channels │
│ │
▼ │
Response generated │
"I see you're looking at chat..." │
│ │
▼ │
publish_voice_message() │
│ │
├──► Redis PUBLISH ──────────────────►│
│ session:ABC:events │
│ ▼
│ Display: "Assistant (voice): I see..."
│ │

SSE Subscription Flow

Client (voice channel)


GET /api/sessions/{session_id}/events?channel=voice


SessionManager.subscribe_to_session(session_id, "voice")


Redis SUBSCRIBE session:{session_id}:events

└─► Async generator yields events

├─► Filter out events where channel="voice"

└─► Yield events from other channels


Client receives SSE stream

Database + Redis Flow

publish_message() called

├──► 1. _append_to_history()
│ │
│ ▼
│ UPDATE sessions
│ SET context = jsonb_set(...)
│ WHERE session_id = ...
│ │
│ └─► Database persistence (immediate)

└──► 2. publish_event()


Redis PUBLISH
session:{id}:events

└─► All subscribed channels
receive event (real-time)

Enabling the System

Step 1: Database Migration

The database tables are already created if you've run recent migrations. Verify:

ssh ravenhelm@100.115.101.81 "docker exec -i postgres psql -U ravenhelm -d ravenmaskos -c '\dt sessions*'"

You should see:

  • sessions
  • session_interactions
  • session_events
  • channel_consent

Step 2: Ensure Redis is Running

The real-time event system requires Redis.

ssh ravenhelm@100.115.101.81 "docker ps | grep redis"

If Redis is not running, start it:

ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/compose && docker compose up -d redis"

Step 3: Enable Feature Flag

Edit the Norns environment file:

ssh ravenhelm@100.115.101.81 "nano ~/ravenhelm/.env.hlidskjalf"

Add or update:

NORNS_SESSION_SYSTEM=true
NORNS_SESSION_RESUME_WINDOW=24 hours
NORNS_SESSION_MAX_HISTORY=50

# Redis configuration (required)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=your_redis_password

Step 4: Restart Services

Restart Norns and Telephony services:

ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/docs/AI-ML-Platform/norns-agent && docker compose restart"
ssh ravenhelm@100.115.101.81 "cd ~/ravenhelm/services/telephony && docker compose restart"

Step 5: Verify

Check that the feature is enabled:

ssh ravenhelm@100.115.101.81 "docker logs norns-agent 2>&1 | grep -i session"
ssh ravenhelm@100.115.101.81 "docker logs telephony 2>&1 | grep -i session"

You should see log entries indicating the session system and Redis pub/sub are active.

Step 6: Test Real-Time Events

Test concurrent multi-channel:

  1. Call Norns: +1 (737) 214-3330
  2. While on call, open web chat
  3. Send a message in web chat
  4. Observe voice assistant acknowledges the chat message
  5. Say something on voice
  6. Observe web chat displays the voice message in real-time

Test via API:

# Terminal 1: Subscribe to SSE
curl -N "https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=voice"

# Terminal 2: Publish a message
curl -X POST "https://norns.ravenhelm.dev/api/sessions/{session_id}/messages" \
-H "Content-Type: application/json" \
-d '{
"channel": "chat",
"role": "user",
"content": "Test message",
"interaction_id": "test-123",
"metadata": {}
}'

# Terminal 1 should receive the event immediately

Operational Guide

Monitoring

Active Sessions:

SELECT COUNT(*) 
FROM sessions
WHERE status = 'active';

Concurrent Multi-Channel Sessions:

SELECT 
s.session_id,
s.user_id,
COUNT(si.interaction_id) as active_channels,
array_agg(si.channel) as channels
FROM sessions s
JOIN session_interactions si ON s.session_id = si.session_id
WHERE s.status = 'active' AND si.status = 'active'
GROUP BY s.session_id, s.user_id
HAVING COUNT(si.interaction_id) > 1;

Session Duration:

SELECT 
session_id,
user_id,
created_at,
last_activity,
last_activity - created_at AS duration
FROM sessions
WHERE status IN ('active', 'idle')
ORDER BY last_activity DESC
LIMIT 10;

Interactions by Channel:

SELECT 
channel,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'active') as active
FROM session_interactions
GROUP BY channel;

Redis Pub/Sub Monitoring:

ssh ravenhelm@100.115.101.81 "docker exec -it redis redis-cli"
> PUBSUB CHANNELS session:*
> PUBSUB NUMSUB session:{session_id}:events

Maintenance

Close Expired Sessions (Manual):

UPDATE sessions
SET status = 'closed', closed_at = NOW()
WHERE status IN ('active', 'idle', 'resumable')
AND last_activity <= NOW() - resume_window;

This should be automated via a background task in production.

Cleanup Old Events:

DELETE FROM session_events
WHERE timestamp < NOW() - INTERVAL '90 days';

Monitor Redis Memory:

ssh ravenhelm@100.115.101.81 "docker exec redis redis-cli INFO memory"

Troubleshooting

Session Not Resuming

  1. Check if resume window has passed:

    SELECT 
    session_id,
    status,
    last_activity,
    resume_window,
    NOW() - last_activity AS time_since_activity
    FROM sessions
    WHERE user_id = '<user_id>';
  2. Check for multiple sessions (shouldn't happen):

    SELECT session_id, status, created_at, last_activity
    FROM sessions
    WHERE user_id = '<user_id>'
    ORDER BY last_activity DESC;

Real-Time Events Not Received

  1. Check Redis connectivity:

    ssh ravenhelm@100.115.101.81 "docker exec norns-agent ping -c 3 redis"
  2. Check Redis pub/sub:

    ssh ravenhelm@100.115.101.81 "docker exec -it redis redis-cli"
    > PUBSUB CHANNELS session:*
    > PUBSUB NUMSUB session:{session_id}:events
  3. Check SSE connection:

    curl -N "https://norns.ravenhelm.dev/api/sessions/{session_id}/events?channel=test" \
    -H "Authorization: Bearer <token>"
  4. Check Norns logs:

    ssh ravenhelm@100.115.101.81 "docker logs --tail 100 -f norns-agent | grep -i redis"

Messages Not Persisting

  1. Check session context:

    SELECT context 
    FROM sessions
    WHERE session_id = '<session_id>';
  2. Check interaction close events:

    SELECT * 
    FROM session_events
    WHERE session_id = '<session_id>'
    AND event_type = 'interaction_ended'
    ORDER BY timestamp DESC;
  3. Check for database errors in logs:

    ssh ravenhelm@100.115.101.81 "docker logs norns-agent 2>&1 | grep -i 'database\|error'"

SSE Connection Drops

SSE connections can drop due to network issues or timeouts.

Client-side reconnection logic:

let eventSource;
let reconnectDelay = 1000;
const maxReconnectDelay = 30000;

function connectSSE() {
eventSource = new EventSource(`/api/sessions/${sessionId}/events?channel=chat`);

eventSource.onopen = () => {
console.log('SSE connected');
reconnectDelay = 1000; // Reset delay
};

eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();

// Exponential backoff reconnect
setTimeout(connectSSE, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
};

eventSource.onmessage = handleEvent;
}

Performance Considerations

Indexes: The schema includes optimized indexes for:

  • User lookup by active sessions
  • Session history queries
  • Event log searches
  • Channel-based queries

Conversation History Size: History is limited to NORNS_SESSION_MAX_HISTORY messages (default: 50) to prevent unbounded growth. Older messages are trimmed automatically during sync.

Redis Pub/Sub Scalability:

  • Redis pub/sub is highly performant (handles millions of messages/sec)
  • Each session has its own channel, avoiding global bottlenecks
  • No message persistence in Redis (only in PostgreSQL)

Database Load: Each interaction creates:

  • 1 row in session_interactions
  • 2-4 rows in session_events (start, end, sync events)
  • 1 update to sessions.context per message

For high-volume deployments, consider:

  • Partitioning session_events by timestamp
  • Archiving closed sessions older than 90 days
  • Caching active session contexts in Redis (in addition to pub/sub)
  • Read replicas for analytics queries

SSE Connection Limits:

  • Each SSE connection holds an open HTTP connection
  • Typical server can handle 10,000+ concurrent SSE connections
  • Use connection pooling and load balancing for scale


Last Updated: 2026-01-04
Architecture Version: 2.0 (Event-Driven Real-Time)
Related Issues: RAV-108

WhatsApp Channel

Located in /Users/ravenhelm/ravenhelm/docs/AI-ML-Platform/norns-agent/agent/whatsapp/

The WhatsApp channel integrates with Twilio's WhatsApp API, supporting the same multi-channel session paradigm as voice and chat.

Key Differences from Other Channels:

AspectWhatsAppVoice/Chat
Session Windows24-hour rule (WhatsApp requirement)Configurable resume_window
Proactive MessagesRequires approved templatesFree-form allowed
Rich MessagingButtons (max 3), Lists (max 10)Varies by channel
User ResolutionBy phone number (E.164)Phone or user_id

Architecture:

WhatsApp Message


POST /whatsapp/webhook

├──► Verify Twilio signature

├──► resolve_user_by_phone()

├──► WhatsAppSessionTracker.record_inbound()
│ │
│ └──► Extends 24-hour window

├──► SessionManager.resolve_or_create_session()

├──► process_message() ──► LangGraph Agent

├──► SessionManager.publish_message()
│ │
│ ├──► Redis pub/sub ──► Other channels
│ │
│ └──► Database persist

└──► WhatsAppMessageBuilder.build_response()

└──► Twilio API ──► User's WhatsApp

Session Window Tracking:

WhatsApp has a 24-hour customer service window rule managed by WhatsAppSessionTracker:

# Check if user is in session window
in_window = await session_tracker.is_in_session_window("+1234567890")

if in_window:
# Can send free-form messages
await client.send_text(to_number, "Any message here")
else:
# Must use approved template
await template_manager.send_template(to_number, "task_reminder", variables)

Database Table: whatsapp_sessions

CREATE TABLE whatsapp_sessions (
session_id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(user_id),
phone_number VARCHAR(20) NOT NULL UNIQUE,
window_expires_at TIMESTAMPTZ,
last_inbound_at TIMESTAMPTZ,
created_at TIMESTAMPTZ
);

Publishing Messages:

When WhatsApp receives a message or sends a response, it publishes to the multi-channel session:

await session_manager.publish_message(
session_id=session_id,
channel="whatsapp",
role="user",
content=message_body,
interaction_id=interaction_id,
metadata={"message_sid": message_sid, "phone": from_number}
)

Cross-Channel Example:

User (voice): "Add milk to my shopping list"
→ Voice publishes to session → WhatsApp receives via SSE

User (whatsapp): "Also add bread"
→ WhatsApp publishes to session → Voice receives via SSE

Both channels now have full conversation context.

Proactive Messaging:

For scheduled reminders outside the 24-hour window, use templates via n8n or API:

curl -X POST "https://norns-pm.ravenhelm.dev/api/whatsapp/send" \
-H "X-API-Key: $NORNS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"user_id": "...",
"template_name": "task_reminder",
"variables": {"task_name": "Buy milk", "due_time": "in 1 hour"}
}'

Available Templates:

TemplateVariablesUse Case
task_remindertask_name, due_timeTask due reminders
daily_briefingdate, task_count, top_tasksMorning briefings
calendar_reminderevent_name, time, locationCalendar alerts
task_complete_confirmtask_nameCompletion confirmation
weekly_reviewcompleted_count, highlightsWeekly summary

See [[AI-ML-Platform/Norns-WhatsApp-Channel|WhatsApp Channel Documentation]] for complete details.