Mini-App Framework Documentation¶
Status: Production Ready Version: 1.1 Last Updated: December 2025
Overview¶
The Mini-App Framework enables developers to quickly build collaborative mini-applications that integrate seamlessly into conversations. Mini-apps are automatically triggered from natural language and support multi-user collaboration.
Key Capabilities: - Auto-trigger from natural language ("split this bill" → Bill Split app) - Conversational message handling via handlers - Web UI generation via config-driven system - Event-sourced state management (full audit trail) - Multi-user collaboration with conflict resolution - Pattern-based development (~2 hours per new app)
Architecture Overview¶
The framework has two complementary layers:
1. Handler Routing Layer (Chat Interface)¶
Processes messages and generates responses + web UI configs.
- Handlers - Process messages, return responses
- SessionManager - Quick session state persistence
- UI Configs - JSON configurations for web frontend
Documentation: HANDLER_ROUTING.md
2. Event Sourcing Layer (Persistent State)¶
Provides audit trail and multiplayer state reconstruction.
- EventStore - Append-only event log
- StateCoordinator - State reconstruction from events
- Reducers - Pure functions: (state, event) → new_state
Documentation: API_REFERENCE.md
┌─────────────────────────────────────────────────────────────┐
│ User Message │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ HANDLER LAYER │
│ • Parse message commands │
│ • Update session state │
│ • Generate chat response │
│ • Generate web UI config │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ EVENT SOURCING LAYER (Optional) │
│ • Append events for audit trail │
│ • Reconstruct state from events │
│ • Handle multiplayer conflict resolution │
└─────────────────────────────────────────────────────────────┘
Quick Start: Building a Mini-App¶
For most mini-apps, start with the Handler Routing approach:
- Create a handler class extending
BaseMiniAppHandler - Implement
handle()for message processing - Implement
get_ui_config()for web UI - Register in
MiniAppRegistry
See: HANDLER_ROUTING.md for complete guide.
For apps requiring full audit trail or complex multiplayer, also add:
- Define event types in
schema.py - Implement reducer in
reducer.py - Register reducer in
StateCoordinator
See below for event sourcing details.
Architecture¶
Core Components¶
┌─────────────────────────────────────────────────────────────┐
│ User Message │
│ "Let's split this $45 bill" │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MiniAppDetector │
│ • Pattern matching (keywords, phrases, regex) │
│ • Confidence scoring │
│ • User preference filtering │
└──────────────────┬──────────────────────────────────────────┘
│
▼ Trigger detected: "bill_split"
┌─────────────────────────────────────────────────────────────┐
│ Workflow System │
│ • Create room (multi-user session) │
│ • Initialize state (emit events) │
│ • Execute workflow nodes │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ EventStore │
│ • Append-only event log │
│ • PostgreSQL with advisory locks │
│ • Sequence number generation │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ StateCoordinator │
│ • Event replay: (events) → current state │
│ • Redis caching (5-min TTL) │
│ • Snapshot optimization (every 100 events) │
└──────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Reducer (Pure Function) │
│ reducer(state, event) → new_state │
│ • No side effects │
│ • Deterministic │
│ • Last-write-wins conflict resolution │
└─────────────────────────────────────────────────────────────┘
Quick Start: Building Your First Mini-App¶
Step 1: Create Directory Structure¶
mkdir -p app/miniapps/apps/my_app
touch app/miniapps/apps/my_app/__init__.py
touch app/miniapps/apps/my_app/schema.py
touch app/miniapps/apps/my_app/reducer.py
Step 2: Define Data Models & Events (schema.py)¶
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime
class MyAppEventType(str, Enum):
APP_CREATED = "app_created"
ITEM_ADDED = "item_added"
ITEM_UPDATED = "item_updated"
ITEM_DELETED = "item_deleted"
@dataclass
class Item:
item_id: str
name: str
created_at: str
updated_at: str
@dataclass
class MyAppState:
app_id: str
name: str = "My App"
items: List[Item] = field(default_factory=list)
version: int = 0
created_at: str = ""
updated_at: str = ""
Step 3: Implement State Reducer (reducer.py)¶
from typing import Dict, Any
from datetime import datetime
from .schema import MyAppEventType
def my_app_reducer(state: Dict[str, Any], event: Any) -> Dict[str, Any]:
"""
Pure function: (state, event) → new_state
NO side effects allowed:
- No database writes
- No API calls
- No random values
- Deterministic only
"""
if event.event_type == MyAppEventType.APP_CREATED:
state["name"] = event.event_data.get("name", "My App")
state["created_at"] = event.timestamp.isoformat()
elif event.event_type == MyAppEventType.ITEM_ADDED:
state["items"].append({
"item_id": event.event_data["item_id"],
"name": event.event_data["name"],
"created_at": event.timestamp.isoformat(),
"updated_at": event.timestamp.isoformat()
})
elif event.event_type == MyAppEventType.ITEM_UPDATED:
for item in state["items"]:
if item["item_id"] == event.event_data["item_id"]:
item["name"] = event.event_data["name"]
item["updated_at"] = event.timestamp.isoformat()
elif event.event_type == MyAppEventType.ITEM_DELETED:
state["items"] = [
item for item in state["items"]
if item["item_id"] != event.event_data["item_id"]
]
# Always increment version
state["version"] += 1
state["updated_at"] = event.timestamp.isoformat()
return state
def get_initial_state(app_id: str) -> Dict[str, Any]:
"""Initial state for new app instance"""
return {
"app_id": app_id,
"name": "My App",
"items": [],
"version": 0,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
Step 4: Create Workflow Definition¶
# app/superpowers/catalog/multiplayer/my_app.py
from app.superpowers.workflow import (
Workflow, WorkflowNode, WorkflowTrigger, register_workflow
)
my_app_workflow = Workflow(
id="my_app",
name="My App",
category="multiplayer",
description="My collaborative mini-app",
trigger=WorkflowTrigger(
type="manual",
config={
"keywords": ["my app", "start app", "use app"]
}
),
nodes=[
# 1. Create room for collaboration
WorkflowNode(
id="create_room",
type="create_room",
config={
"app_id": "my_app",
"room_type": "group"
}
),
# 2. Initialize app state
WorkflowNode(
id="init_app",
type="emit_event",
config={
"app_id": "my_app",
"event_type": "app_created",
"payload": {
"name": "My App",
"created_by": "{{$context.user_id}}"
}
}
),
# 3. Send confirmation
WorkflowNode(
id="send_confirmation",
type="render_block",
config={
"block_type": "text",
"block_data": {
"content": "✅ My App is ready! What would you like to add?"
}
}
)
],
enabled=True,
version="1.0"
)
# Register workflow
register_workflow(my_app_workflow)
Step 5: Register Reducer¶
Edit app/miniapps/state_coordinator.py:
# Add import at top
try:
from app.miniapps.apps.my_app.reducer import (
my_app_reducer, get_initial_state as my_app_initial_state
)
except ImportError:
logger.warning("My App reducer not available")
my_app_reducer = None
my_app_initial_state = None
# In __init__ method, add to registry
if my_app_reducer:
self._reducers["my_app"] = my_app_reducer
# In _get_initial_state method, add case
elif app_id == "my_app" and my_app_initial_state:
return my_app_initial_state(app_id="")
Step 6: Register Workflow¶
Edit app/superpowers/catalog/multiplayer/__init__.py:
# Import all workflows to register them
from . import trip_planner
from . import bill_split
from . import todo_list
from . import my_app # Add this line
__all__ = ["trip_planner", "bill_split", "todo_list", "my_app"]
Step 7: Add Trigger Patterns¶
Edit app/orchestrator/miniapp_detector.py:
PATTERNS: List[TriggerPattern] = [
# ... existing patterns ...
TriggerPattern(
mini_app_id="my_app",
keywords=["my app", "start app"],
phrases=["use my app", "open my app"],
regex_patterns=[r"start.*app", r"use.*my.*app"],
priority=5
),
]
Step 8: Enable for Users¶
Add migration to pre-populate user preferences:
# alembic/versions/XXXXX_add_my_app_to_user_settings.py
def upgrade():
# Enable my_app for all existing users
connection = op.get_bind()
connection.execute("""
INSERT INTO user_miniapp_settings (user_id, mini_app_id, enabled)
SELECT id, 'my_app', true
FROM users
WHERE id NOT IN (
SELECT user_id FROM user_miniapp_settings WHERE mini_app_id = 'my_app'
)
""")
Step 9: Test It!¶
# Run the server
./run.sh
# Test via API
curl -X POST http://localhost:8000/orchestrator/message \
-H "Content-Type: application/json" \
-d '{
"chat_guid": "test-123",
"mode": "direct",
"sender": "+15551234567",
"text": "use my app",
"timestamp": 1699123456,
"participants": ["+15551234567"]
}'
Expected Response:
Pattern Reference¶
Event Naming Conventions¶
# ✅ Good: verb_past_tense
"item_added"
"item_updated"
"item_deleted"
"bill_created"
"payment_marked"
# ❌ Bad: present tense or nouns
"add_item"
"update"
"item"
State Structure Best Practices¶
@dataclass
class MyAppState:
# Required fields
app_id: str # Unique identifier
version: int = 0 # Incremented on each event
created_at: str = "" # ISO 8601 timestamp
updated_at: str = "" # ISO 8601 timestamp
# App-specific fields
name: str = ""
items: List[Item] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
Reducer Best Practices¶
✅ DO:
- Keep reducers pure (no side effects)
- Always increment state["version"]
- Update state["updated_at"] timestamp
- Use Last-Write-Wins (LWW) for conflicts
- Return the modified state
❌ DON'T:
- Make API calls in reducers
- Write to database in reducers
- Use random values or current time (use event.timestamp)
- Mutate state without returning it
- Throw exceptions (return state unchanged if error)
Conflict Resolution¶
Default is Last-Write-Wins (LWW) based on event.timestamp:
def my_app_reducer(state: Dict[str, Any], event: Any) -> Dict[str, Any]:
# For updates, always apply latest event
if event.event_type == MyAppEventType.ITEM_UPDATED:
for item in state["items"]:
if item["item_id"] == event.event_data["item_id"]:
# LWW: timestamp determines winner
if event.timestamp.isoformat() > item["updated_at"]:
item["name"] = event.event_data["name"]
item["updated_at"] = event.timestamp.isoformat()
return state
Custom Resolution:
For append-only operations (e.g., both users add different items), merge both:
if event.event_type == MyAppEventType.ITEM_ADDED:
# Check for duplicate item_id (unlikely but possible)
existing = next((item for item in state["items"]
if item["item_id"] == event.event_data["item_id"]), None)
if not existing:
# No conflict, just add
state["items"].append({
"item_id": event.event_data["item_id"],
"name": event.event_data["name"],
"created_at": event.timestamp.isoformat()
})
else:
# Conflict: use LWW
if event.timestamp.isoformat() > existing["created_at"]:
existing["name"] = event.event_data["name"]
Advanced Topics¶
Snapshotting¶
StateCoordinator automatically saves snapshots every 100 events to optimize state reconstruction:
# Happens automatically in StateCoordinator
if (latest_seq - snapshot.version) >= 100:
self._save_snapshot(app_id, room_id, state, latest_seq)
You don't need to implement this - it's handled by the framework.
Caching¶
Redis caches state with 5-minute TTL:
# Automatic in StateCoordinator.get_current_state()
cached = self._get_cached_state(app_id, room_id)
if cached:
return cached # Fast path: ~1ms
# Rebuild from events if cache miss
state = self._rebuild_from_events(app_id, room_id)
self._cache_state(app_id, room_id, state) # Cache for next time
Vision API Integration (Receipt Analyzer Example)¶
For mini-apps that need image analysis:
# app/miniapps/apps/my_app/image_analyzer.py
from openai import OpenAI
from app.config import get_config
class ImageAnalyzer:
def __init__(self):
self.client = OpenAI(api_key=get_config().openai_api_key)
def analyze_image(self, image_data: bytes) -> Dict[str, Any]:
"""Analyze image using OpenAI Vision API"""
import base64
base64_image = base64.b64encode(image_data).decode('utf-8')
response = self.client.chat.completions.create(
model="gpt-4o-vision",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Analyze this image and extract relevant information."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
max_tokens=1000
)
return response.choices[0].message.content
See app/miniapps/apps/bill_split/receipt_analyzer.py for full example.
Testing¶
Unit Tests (Reducer)¶
# tests/test_my_app_reducer.py
import pytest
from datetime import datetime
from app.miniapps.apps.my_app.reducer import my_app_reducer, get_initial_state
from app.miniapps.apps.my_app.schema import MyAppEventType
def test_initial_state():
state = get_initial_state("test-app-123")
assert state["app_id"] == "test-app-123"
assert state["version"] == 0
assert len(state["items"]) == 0
def test_item_added():
state = get_initial_state("test-app-123")
# Mock event
class MockEvent:
event_type = MyAppEventType.ITEM_ADDED
event_data = {"item_id": "item-1", "name": "Test Item"}
timestamp = datetime.utcnow()
event = MockEvent()
new_state = my_app_reducer(state, event)
assert len(new_state["items"]) == 1
assert new_state["items"][0]["name"] == "Test Item"
assert new_state["version"] == 1
def test_conflict_resolution_lww():
"""Test Last-Write-Wins conflict resolution"""
state = get_initial_state("test-app-123")
# Add item
event1 = MockEvent(
event_type=MyAppEventType.ITEM_ADDED,
event_data={"item_id": "item-1", "name": "Original"},
timestamp=datetime(2025, 1, 1, 10, 0, 0)
)
state = my_app_reducer(state, event1)
# Update item (later timestamp wins)
event2 = MockEvent(
event_type=MyAppEventType.ITEM_UPDATED,
event_data={"item_id": "item-1", "name": "Updated"},
timestamp=datetime(2025, 1, 1, 10, 5, 0)
)
state = my_app_reducer(state, event2)
assert state["items"][0]["name"] == "Updated"
Integration Tests¶
See MINIAPP_TESTING_GUIDE.md for comprehensive testing examples.
Performance Optimization¶
State Reconstruction Performance¶
- Cached: <100ms (Redis hit)
- With Snapshot: ~200ms (replay 100 events)
- Full Replay: ~500ms (replay 500+ events)
Optimization Tips: 1. Use snapshots (automatic every 100 events) 2. Keep events lean (don't store large payloads) 3. Use Redis caching (5-min TTL) 4. Limit event history (consider TTL on old events)
Deployment¶
Database Migration¶
Environment Variables¶
No new environment variables needed - mini-apps use existing infrastructure.
Monitoring¶
Key metrics to track: - Auto-trigger detection latency - State reconstruction time - Event throughput (writes/sec) - Cache hit rate - Reducer execution time
Troubleshooting¶
Mini-App Not Triggering¶
Problem: User says trigger phrase but mini-app doesn't start
Solutions:
1. Check pattern registration in miniapp_detector.py
2. Verify user has app enabled in user_miniapp_settings
3. Check logs for "Mini-app triggered: my_app"
4. Test pattern matching manually:
from app.orchestrator.miniapp_detector import get_detector
detector = get_detector()
result = detector.detect_trigger("use my app", ["my_app"])
print(result) # Should print "my_app"
State Not Updating¶
Problem: Events emitted but state not changing
Solutions:
1. Check reducer registration in state_coordinator.py
2. Verify reducer returns modified state
3. Check event_type matches enum value exactly
4. Clear Redis cache: redis-cli FLUSHALL
5. Check EventLog table for events:
Slow State Reconstruction¶
Problem: State takes >1 second to load
Solutions:
1. Check number of events: SELECT COUNT(*) FROM event_log WHERE room_id = 'xxx'
2. Verify snapshots are being created
3. Check Redis is running and accessible
4. Consider compaction (merge old events)
FAQ¶
Q: Can I use external APIs in my reducer? A: No. Reducers must be pure functions. Put API calls in workflow nodes or separate services, then emit events with the results.
Q: How do I handle user input mid-workflow?
A: Use the wait_for_input workflow node type. It pauses the workflow until the user responds.
Q: Can multiple users edit the same item simultaneously? A: Yes. Last-Write-Wins (LWW) conflict resolution applies. The event with the latest timestamp wins.
Q: Do I need a custom database table?
A: No! All state is derived from events in event_log. This is the beauty of event sourcing.
Q: How do I delete old data?
A: Emit a deletion event (e.g., ITEM_DELETED). Don't delete events themselves - they're your audit trail.
Q: Can I change my event schema later? A: Yes, but carefully. Add new fields with defaults. Never remove or rename existing fields (events are immutable).
Examples¶
See these production mini-apps for reference:
- Trip Planner:
app/miniapps/apps/trip_planner/(activity voting, expenses) - Bill Split:
app/miniapps/apps/bill_split/(receipt analysis, payment tracking) - Todo List:
app/miniapps/apps/todo_list/(task management, assignments)
Support¶
- Architecture Questions: See
docs/decisions/004-miniapp-framework-architecture.md - Event Sourcing: See
docs/decisions/001-event-sourcing-vs-traditional-state.md - Testing Guide: See
MINIAPP_TESTING_GUIDE.md - API Documentation: (coming soon)
Last Updated: November 17, 2025 Framework Version: 1.0 Status: Production Ready ✅