02 — Complete Message Flow
Every single step from "user sends message" to "user receives response".
The Full Flow (Step By Step)
═══════════════════════════════════════════════════════════════
STEP 1: INCOMING MESSAGE
═══════════════════════════════════════════════════════════════
User sends message on Discord / Telegram / Slack / etc.
Connector receives the platform event:
- Discord: client.on('messageCreate', ...)
- Telegram: bot.on('message', ...)
- Web: WebSocket message received on port 5070
Connector normalizes to IncomingMessage:
{
id: "msg-abc123" ← unique ID
platform: "discord" ← where it came from
channelId: "1234567890" ← which channel
userId: "987654321" ← who sent it
userName: "John" ← their display name
content: "What's the price?" ← the actual message
timestamp: Date ← when
attachments: [...] ← images, files (optional)
metadata: { isDM: false, ... } ← platform-specific extras
}
Special handling before emit:
- Telegram voice: downloads OGG → Whisper transcription → content = "[Voice]: ..."
- Telegram photo: downloads → base64 → stored in attachment.data
- WhatsApp audio: downloads via Graph API → transcribes
- Discord images: real CDN URLs stored (downloaded later during job processing)
bus.emit('message', msg)
→ Core picks it up
═══════════════════════════════════════════════════════════════
STEP 2: ROUTING
═══════════════════════════════════════════════════════════════
arvis.ts handles the 'message' event:
this.bus.on('message', async (msg) => { await this.handleMessage(msg) })
handleMessage() calls: Router.route(msg)
Router checks 6 conditions in priority order:
(See 03-routing.md for full details)
Result: Agent object OR null (drop message)
If null → silent drop, no response
If agent found → permission check: canUserMessage(userId, agent)
If no permission → silent drop
═══════════════════════════════════════════════════════════════
STEP 3: TYPING INDICATOR
═══════════════════════════════════════════════════════════════
IMMEDIATELY after routing (before any DB work):
bus.emit('typing', { channelId, platform })
Connector sees 'typing' event:
- Discord: channel.sendTyping() ← "Bot is typing..."
- Telegram: bot.sendChatAction(chatId, 'typing')
- Web: WebSocket broadcasts { type: 'typing' } to dashboard
This happens BEFORE the LLM call, so user sees it quickly.
═══════════════════════════════════════════════════════════════
STEP 4: CONVERSATION MANAGEMENT
═══════════════════════════════════════════════════════════════
ConversationManager.getOrCreate(
agentId, platform, channelId, userId, userName
)
SQL: SELECT * FROM conversations
WHERE agent_id=? AND platform=? AND channel_id=? AND status='active'
ORDER BY last_message_at DESC LIMIT 1
If found → reuse existing conversation
If not found → INSERT new conversation row, return it
Then: store the incoming user message:
ConversationManager.addMessage(conversation.id, 'user', msg.content)
This inserts into messages table + updates:
conversations.total_tokens_estimate += tokens
conversations.message_count += 1
conversations.last_message_at = now
═══════════════════════════════════════════════════════════════
STEP 5: COMPACTION CHECK (may be skipped)
═══════════════════════════════════════════════════════════════
shouldCompact(conversationId, threshold)?
threshold = agent.model's context window × 0.75
e.g. claude-sonnet-4-6 = 200k tokens → threshold = 150k tokens
If total_tokens_estimate > threshold → COMPACT
Compaction is a two-phase process:
┌─────────────────────────────────────────────────────────┐
│ PHASE 1: Pre-compaction memory flush │
│ Take the old messages (about to be deleted) │
│ Send to LLM: "Extract key facts from this..." │
│ LLM outputs [MEMORY:*] tags │
│ MemoryManager saves them to memory_facts table │
│ → Facts survive compaction even though messages die │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ PHASE 2: Summarize + Delete │
│ Send old messages to LLM: "Summarize this convo..." │
│ LLM returns summary text │
│ DELETE old messages from messages table │
│ INSERT into compactions table: │
│ { summary, messages_before, messages_after, ... } │
│ UPDATE conversations.total_tokens_estimate = new total │
└─────────────────────────────────────────────────────────┘
After compaction: token count reset to recent messages only.
The compaction summary is injected in the next context build.
═══════════════════════════════════════════════════════════════
STEP 6: CONTEXT BUILDING
═══════════════════════════════════════════════════════════════
ContextBuilder.build(agent, conversation, msg)
Assembles the full prompt (system prompt + history).
See 05-context-memory.md for the 6-layer breakdown.
Returns:
{
systemPrompt: "IDENTITY: You are the Conductor...",
messages: [ {role, content, tokenEstimate}, ... ],
summaryText: "Previous conversation summary: ..."
}
═══════════════════════════════════════════════════════════════
STEP 7: IMAGE HANDLING
═══════════════════════════════════════════════════════════════
Check msg.attachments for images:
For pre-fetched images (Telegram, WhatsApp):
att.data is already base64 → push to images[]
For URL-only images (Discord CDN):
fetch(att.url) → Buffer → base64 → push to images[]
images[] passed as payload so LLM can see them.
═══════════════════════════════════════════════════════════════
STEP 8: ENQUEUE JOB
═══════════════════════════════════════════════════════════════
calculatePriority(msg, agent) → 1-10 (5 = normal)
QueueManager.enqueue({
agentId: agent.id,
type: 'message',
priority: 5,
payload: {
conversationId: conversation.id,
systemPrompt: context.systemPrompt,
prompt: "[user]: prev msg\n[assistant]: prev response\n[user]: current",
channelId: msg.channelId,
platform: msg.platform,
messageId: msg.id,
images: [...] or undefined
}
})
INSERT INTO queue (...) VALUES (...)
setImmediate(() => processNext()) ← fires instantly, no 1s wait
═══════════════════════════════════════════════════════════════
STEP 9: JOB PROCESSING
═══════════════════════════════════════════════════════════════
QueueManager.processNext():
SELECT highest priority pending job
UPDATE queue SET status='running', started_at=now, attempts+=1
activeJobs++
call processJob(job)
processJob():
1. Look up agent by agentId
2. Inject relevant skills into system prompt
skillInjector.getRelevantSkills(prompt, agent)
→ keyword-scored, only injects score > 0 skills
3. Filter agent tools: agent.allowedTools ∩ BUILT_IN_TOOL_NAMES
4. Create per-conversation CWD:
data/sessions/{conversationId}/ (mkdir -p)
→ Isolates Claude CLI sessions per conversation
5. Call AgentRunner.execute(request)
═══════════════════════════════════════════════════════════════
STEP 10: LLM EXECUTION
═══════════════════════════════════════════════════════════════
AgentRunner picks account + runner (see 04-llm-providers.md)
┌─ CLI Runner (for Max subscription accounts) ─────────────┐
│ const args = ['--print', '--model', '...', '--continue'] │
│ spawn('claude', args, { cwd: data/sessions/{convId}/ }) │
│ child.stdin.write(fullPrompt) ← prompt via stdin │
│ stdout = response text │
│ stderr = logged as warnings │
│ timeout: 180 seconds │
└───────────────────────────────────────────────────────────┘
┌─ Provider Runner (for API accounts) ──────────────────────┐
│ Build messages array: [system, ...history, user] │
│ POST to provider's API endpoint │
│ Handle tool_use → execute tool → tool_result loop │
│ Up to 5 tool turns before forcing final response │
│ │
│ Anthropic: /messages endpoint, tool_use content blocks │
│ OpenAI: /chat/completions, function_call format │
│ Google: /generateContent, functionCall format │
│ Ollama: /api/chat, OpenAI-compatible format │
└───────────────────────────────────────────────────────────┘
Returns RunResult:
{
content: "The price of SOL is $142.30...",
model: "claude-sonnet-4-6",
provider: "anthropic",
inputTokens: 1240,
outputTokens: 89,
costUsd: 0.000045,
durationMs: 3420
}
═══════════════════════════════════════════════════════════════
STEP 11: RESPONSE PROCESSING
═══════════════════════════════════════════════════════════════
Raw LLM response may contain special tags.
These are processed in order, then STRIPPED before sending to user.
┌─ Memory Tags ─────────────────────────────────────────────┐
│ [MEMORY:sticky] Never forget this fact [/MEMORY] │
│ [MEMORY:user_preference] User likes X [/MEMORY] │
│ [STATE:key] value [/STATE] │
│ │
│ MemoryManager.parseAndSave(agentId, content, convId) │
│ → Extracts all [MEMORY:*] → INSERT into memory_facts │
│ → Extracts all [STATE:*] → UPSERT into memory_state │
│ → Fuzzy dedup: skip if similar fact already exists │
└───────────────────────────────────────────────────────────┘
┌─ Conductor Tags (only if agent.role === 'conductor') ─────┐
│ ConductorParser.parse(content) → ConductorAction[] │
│ ConductorParser.execute(actions, registry, ...) │
│ │
│ [CREATE_AGENT] → INSERT into agents table │
│ [UPDATE_AGENT:slug] → UPDATE agents WHERE slug=? │
│ [CREATE_CRON] → INSERT into cron_jobs │
│ [CREATE_HEARTBEAT] → INSERT into heartbeat_configs │
│ [CREATE_CLIENT] → INSERT into clients │
└───────────────────────────────────────────────────────────┘
┌─ Delegation Tags ──────────────────────────────────────────┐
│ [DELEGATE:sol-price-monitor] │
│ Check the current SOL price │
│ [/DELEGATE] │
│ │
│ parseDelegations(content) → [{agentSlug, task}] │
│ For each: find target agent by slug │
│ queue.enqueue({ agentId: target.id, priority: 4, ... }) │
│ → Target agent picks up job asynchronously │
│ → Posts result independently to same channel │
└───────────────────────────────────────────────────────────┘
Clean response = strip all tags:
memoryManager.stripTags(
stripDelegations(
agent.role === 'conductor'
? conductorParser.stripActions(content)
: content
)
)
═══════════════════════════════════════════════════════════════
STEP 12: STORE ASSISTANT MESSAGE
═══════════════════════════════════════════════════════════════
ConversationManager.addMessage(
payload.conversationId,
'assistant',
cleanResponse
)
INSERT into messages table
UPDATE conversations (tokens, message_count, last_message_at)
═══════════════════════════════════════════════════════════════
STEP 13: SEND RESPONSE
═══════════════════════════════════════════════════════════════
bus.emit('send', {
channelId: msg.channelId,
platform: msg.platform,
content: cleanResponse
})
Connector receives 'send' event:
- Discord: channel.send(content)
- Telegram: bot.sendMessage(chatId, content)
- Web WS: socket.send(JSON.stringify({ type:'message', content }))
User sees the response. Flow complete.
═══════════════════════════════════════════════════════════════
STEP 14: QUEUE CLEANUP
═══════════════════════════════════════════════════════════════
On success:
UPDATE queue SET status='completed', result=?, completed_at=now
On error (< max_attempts):
backoff = 2^attempts minutes
retryAfter = now + backoff
UPDATE queue SET status='pending', error={retryAfter, message}
→ Job becomes eligible again after backoff period
On error (>= max_attempts = 3):
UPDATE queue SET status='failed', error=message, completed_at=now
→ Shows in /queue page as failed, can retry manually
Scheduled Task Flow (Different From Above)
Scheduler polls every 10 seconds
↓
For each enabled heartbeat/cron that is due:
↓
Flood guard check: pending/running job already exists for this task?
→ YES: skip (prevents duplicate jobs)
→ NO: continue
↓
QueueManager.enqueue({
agentId, type: 'heartbeat' or 'cron',
priority: 10,
payload: { prompt, channelId, platform, configId/cronId }
})
↓
(Same queue processing as above from Step 9 onwards)
↓
But: NO conversationId in payload
→ No conversation context loaded
→ Fresh message each time
→ Response sent to channel/platform in payload
Error Recovery
Process crash while job is running:
On next startup → recoverStuckJobs()
→ SELECT * FROM queue WHERE status='running' AND started_at < 5min ago
→ UPDATE SET status='failed', error='Job timed out — process likely crashed'
→ These appear in /queue as failed, can be retried
Rate limit during execution:
AgentRunner catches RateLimitError
→ accountManager.markRateLimited(accountId, retryAfter)
→ Recursive: execute(request, depth+1) with next account
→ User never sees "rate limited" — they just get a slightly slower response
All accounts exhausted:
throw RateLimitError('All accounts temporarily unavailable')
→ Job fails, retries with exponential backoff (2min, 4min, 8min)