Mini-App Framework API Reference¶
Version: 1.1 Last Updated: December 2025
Overview¶
This document provides a complete API reference for the Event Sourcing Layer of the Mini-App Framework, including EventStore, StateCoordinator, reducers, and data structures.
For the Handler Routing Layer (message processing, session management, UI generation), see: - HANDLER_ROUTING.md - Handler documentation - UI_SCHEMA_DESIGN.md - UI configuration schemas
Core Classes¶
EventStore¶
Location: app/miniapps/event_store.py
Manages append-only event log with thread-safe sequence number generation.
Methods¶
append_event()¶
Append an event to the log with atomic sequence number generation.
def append_event(
self,
app_id: str,
room_id: UUID,
event_type: str,
event_data: Dict[str, Any],
user_id: Optional[UUID] = None,
) -> EventLog
Parameters:
- app_id (str): Mini-app identifier (e.g., "bill_split")
- room_id (UUID): Room/session identifier
- event_type (str): Event type from schema enum
- event_data (Dict): Event payload
- user_id (UUID, optional): User who triggered event
Returns: EventLog object with generated sequence number
Example:
from app.miniapps.event_store import EventStore
event_store = EventStore(db_session)
event = event_store.append_event(
app_id="todo_list",
room_id=room_id,
event_type="task_added",
event_data={
"task_id": "task-123",
"description": "Buy milk",
"priority": "normal"
},
user_id=user_id
)
print(event.sequence_number) # e.g., 42
Thread Safety: Uses PostgreSQL advisory locks to ensure atomic sequence number generation.
get_events()¶
Retrieve events for a room, optionally since a sequence number.
Parameters:
- room_id (UUID): Room identifier
- since_sequence (int, optional): Only return events after this sequence number
Returns: List of EventLog objects, ordered by sequence number
Example:
# Get all events
all_events = event_store.get_events(room_id=room_id)
# Get events since snapshot
recent_events = event_store.get_events(
room_id=room_id,
since_sequence=100
)
get_latest_sequence()¶
Get the highest sequence number for a room.
Returns: Latest sequence number, or 0 if no events exist
Example:
StateCoordinator¶
Location: app/miniapps/state_coordinator.py
Reconstructs current state from events using event replay with caching.
Methods¶
get_current_state()¶
Get current state by replaying events (with caching and snapshots).
def get_current_state(
self,
app_id: str,
room_id: UUID,
use_cache: bool = True,
) -> Dict[str, Any]
Parameters:
- app_id (str): Mini-app identifier
- room_id (UUID): Room identifier
- use_cache (bool): Whether to use Redis cache (default: True)
Returns: Current state dictionary
Example:
from app.miniapps.state_coordinator import StateCoordinator
coordinator = StateCoordinator(db_session, redis_client)
state = coordinator.get_current_state(
app_id="todo_list",
room_id=room_id
)
print(state["tasks"]) # List of tasks
print(state["version"]) # State version number
Performance: - Cache hit: <100ms - Snapshot hit: ~200ms (replay since snapshot) - Full replay: ~500ms (replay all events)
register_reducer()¶
Register a state reducer function for a mini-app.
Parameters:
- app_id (str): Mini-app identifier
- reducer (Callable): Reducer function with signature: (state: Dict, event: EventLog) -> Dict
Example:
def my_app_reducer(state: Dict, event: EventLog) -> Dict:
if event.event_type == "item_added":
state["items"].append(event.event_data)
return state
coordinator.register_reducer("my_app", my_app_reducer)
invalidate_cache()¶
Invalidate cached state for a room.
Use Case: Force state rebuild on next read (rarely needed)
Example:
MiniAppDetector¶
Location: app/orchestrator/miniapp_detector.py
Detects mini-app triggers from natural language messages.
Methods¶
detect_trigger()¶
Analyze message and detect mini-app trigger.
def detect_trigger(
self,
message: str,
user_enabled_apps: List[str],
active_app_in_thread: Optional[str] = None,
) -> Optional[str]
Parameters:
- message (str): User message text
- user_enabled_apps (List[str]): Apps user has enabled
- active_app_in_thread (str, optional): Currently active app in thread
Returns: Mini-app ID if triggered, else None
Example:
from app.orchestrator.miniapp_detector import get_detector
detector = get_detector()
triggered = detector.detect_trigger(
message="Let's split this $45 bill",
user_enabled_apps=["bill_split", "todo_list"]
)
print(triggered) # "bill_split"
Pattern Matching: - Keywords: Individual words ("bill", "split", "owe") - Phrases: Multi-word phrases ("split this", "who owes what") - Regex: Pattern matching (r"split.*bill") - Priority: Higher priority patterns win on ties
get_enabled_apps_for_user()¶
Get list of mini-apps user has enabled.
Returns: List of enabled mini-app IDs
Example:
enabled = detector.get_enabled_apps_for_user(user_id)
print(enabled) # ["bill_split", "todo_list", "trip_planner"]
RoomManager¶
Location: app/miniapps/room_manager.py
Manages multi-user rooms/sessions.
Methods¶
create_room()¶
Create a new room for a mini-app.
def create_room(
self,
app_id: str,
creator_id: UUID,
room_type: str = "group",
initial_participants: Optional[List[UUID]] = None,
) -> Room
Parameters:
- app_id (str): Mini-app identifier
- creator_id (UUID): User creating the room
- room_type (str): "group" or "direct"
- initial_participants (List[UUID], optional): Initial members
Returns: Room object
Example:
from app.miniapps.room_manager import RoomManager
room_manager = RoomManager(db_session)
room = room_manager.create_room(
app_id="bill_split",
creator_id=user_id,
room_type="group",
initial_participants=[user_id, friend_id]
)
print(room.id) # UUID
print(room.app_id) # "bill_split"
add_participant()¶
Add a user to a room.
Example:
get_participants()¶
Get all participants in a room.
Returns: List of user IDs
Example:
participants = room_manager.get_participants(room_id)
print(f"Room has {len(participants)} members")
Data Structures¶
TriggerPattern¶
Location: app/orchestrator/miniapp_detector.py
Defines patterns for auto-triggering a mini-app.
@dataclass
class TriggerPattern:
mini_app_id: str # e.g., "bill_split"
keywords: List[str] # ["bill", "receipt", "split"]
phrases: List[str] # ["split this", "split the bill"]
regex_patterns: List[str] # [r"split.*bill"]
priority: int = 5 # Higher = preferred on ties
Example:
TriggerPattern(
mini_app_id="bill_split",
keywords=["bill", "receipt", "split", "owe"],
phrases=["split this", "split the bill", "who owes what"],
regex_patterns=[r"split.*bill", r"who.*owe"],
priority=10
)
EventLog¶
Location: app/models/database.py
Database model for events.
class EventLog(Base):
__tablename__ = "event_log"
id: UUID # Event ID
app_id: str # Mini-app ID
room_id: UUID # Room ID
event_type: str # Event type
event_data: Dict[str, Any] # Event payload (JSON)
sequence_number: int # Ordering within room
user_id: Optional[UUID] # Who triggered event
timestamp: datetime # When event occurred
Example Query:
events = db.query(EventLog).filter(
EventLog.room_id == room_id,
EventLog.app_id == "todo_list"
).order_by(EventLog.sequence_number).all()
Room¶
Location: app/models/database.py
Database model for rooms.
class Room(Base):
__tablename__ = "rooms"
id: UUID # Room ID
app_id: str # Mini-app ID
room_type: str # "group" or "direct"
created_at: datetime
updated_at: datetime
StateSnapshot¶
Location: app/models/database.py
Optimization: Store computed state snapshots to speed up reconstruction.
class StateSnapshot(Base):
__tablename__ = "state_snapshots"
id: UUID
room_id: UUID # Room ID
snapshot_type: str # Mini-app ID
snapshot_data: Dict[str, Any] # Computed state (JSON)
version: int # Last event sequence number included
created_at: datetime
Automatic Creation: StateCoordinator creates snapshots every 100 events.
Reducer Function Signature¶
All reducer functions must follow this signature:
def my_app_reducer(state: Dict[str, Any], event: EventLog) -> Dict[str, Any]:
"""
Pure function: (state, event) → new_state
Rules:
- NO side effects (no DB writes, API calls, etc.)
- Deterministic (same inputs → same output)
- Always return modified state
- Increment state["version"]
"""
if event.event_type == "my_event":
# Transform state based on event
state["field"] = event.event_data["value"]
state["version"] += 1
return state
Required:
- Pure function (no side effects)
- Deterministic
- Returns modified state
- Increments state["version"]
Forbidden:
- Database writes
- API calls
- Random values
- System time (use event.timestamp instead)
- Throwing exceptions (return state unchanged)
Workflow Node Types¶
create_room¶
Create a new room for multi-user collaboration.
WorkflowNode(
id="create_room",
type="create_room",
config={
"app_id": "my_app",
"room_type": "group" # or "direct"
}
)
Outputs:
- {{$node.create_room.room_id}} - Created room UUID
emit_event¶
Emit an event to the EventStore.
WorkflowNode(
id="init_app",
type="emit_event",
config={
"app_id": "my_app",
"event_type": "app_created",
"payload": {
"name": "My App",
"created_by": "{{$context.user_id}}"
}
}
)
Template Variables:
- {{$context.user_id}} - Current user ID
- {{$trigger.text}} - Trigger message text
- {{$node.id.field}} - Output from previous node
get_room_state¶
Fetch current state from StateCoordinator.
Outputs:
- {{$node.get_state}} - Current state dictionary
render_block¶
Send a response block to the user.
WorkflowNode(
id="send_confirmation",
type="render_block",
config={
"block_type": "text",
"block_data": {
"content": "✅ Task added to your list!"
}
}
)
Block Types:
- text - Plain text message
- poll - Interactive poll
- list - Ordered/unordered list
- card - Rich card with title/description/buttons
function¶
Execute custom Python function.
WorkflowNode(
id="parse_task",
type="function",
config={
"function_name": "parse_task_from_message",
"input": "{{$trigger.text}}"
}
)
Requirements: - Function must be registered in workflow system - Must return JSON-serializable output
Database Schema¶
Event Log Table¶
CREATE TABLE event_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
app_id VARCHAR(50) NOT NULL,
room_id UUID NOT NULL REFERENCES rooms(id),
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
sequence_number INTEGER NOT NULL,
user_id UUID REFERENCES users(id),
timestamp TIMESTAMP DEFAULT NOW(),
UNIQUE(room_id, sequence_number)
);
CREATE INDEX idx_event_log_room_sequence ON event_log(room_id, sequence_number);
CREATE INDEX idx_event_log_app_id ON event_log(app_id);
Rooms Table¶
CREATE TABLE rooms (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
app_id VARCHAR(50) NOT NULL,
room_type VARCHAR(20) NOT NULL, -- 'group' or 'direct'
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_rooms_app_id ON rooms(app_id);
User Mini-App Settings Table¶
CREATE TABLE user_miniapp_settings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
mini_app_id VARCHAR(50) NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
settings JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(user_id, mini_app_id)
);
CREATE INDEX idx_user_miniapp_settings_user_id ON user_miniapp_settings(user_id);
Performance Benchmarks¶
| Operation | Latency (p50) | Latency (p95) | Notes |
|---|---|---|---|
| Auto-trigger detection | 30ms | 80ms | Pattern matching |
| Append event | 10ms | 25ms | PostgreSQL write |
| Get state (cached) | 5ms | 15ms | Redis hit |
| Get state (snapshot) | 150ms | 300ms | Replay 100 events |
| Get state (full replay) | 400ms | 800ms | Replay 500 events |
| Snapshot creation | 50ms | 100ms | PostgreSQL write |
Optimization Tips: - Keep event payloads small (<10KB) - Use snapshots (automatic every 100 events) - Enable Redis caching (5-min TTL) - Batch event appends when possible
Error Handling¶
EventStore Errors¶
try:
event = event_store.append_event(
app_id="my_app",
room_id=room_id,
event_type="invalid_type",
event_data={}
)
except IntegrityError:
# Sequence number collision (rare)
logger.error("Event sequence collision")
except Exception as e:
# Other errors
logger.error(f"Failed to append event: {e}")
StateCoordinator Errors¶
try:
state = coordinator.get_current_state("my_app", room_id)
except KeyError:
# Reducer not registered
logger.error("Reducer not found for my_app")
except Exception as e:
# Event replay failed
logger.error(f"State reconstruction failed: {e}")
Thread Safety¶
EventStore¶
Thread-safe: Uses PostgreSQL advisory locks for sequence number generation.
# Multiple threads can safely append events
event1 = event_store.append_event(...) # Thread 1
event2 = event_store.append_event(...) # Thread 2
# Guaranteed unique sequence numbers
StateCoordinator¶
Not thread-safe: Use one instance per request.
# ✅ Good: One instance per request
coordinator = StateCoordinator(db_session, redis_client)
state = coordinator.get_current_state(...)
# ❌ Bad: Sharing instance across threads
global_coordinator = StateCoordinator(...) # Don't do this
Migration Guide¶
Adding a New Event Type¶
-
Add to schema enum:
-
Handle in reducer:
-
No database migration needed! Events are stored as JSON.
Versioning¶
Event Schema Versioning¶
Best Practice: Add version field to event_data:
event_data = {
"version": 2, # Schema version
"task_id": "task-123",
"description": "Buy milk",
"priority": "normal", # New field in v2
}
Reducer Handling:
def my_app_reducer(state, event):
data = event.event_data
version = data.get("version", 1) # Default to v1
if version == 1:
# Handle v1 events (no priority field)
priority = "normal"
else:
# Handle v2+ events
priority = data.get("priority", "normal")
# Apply to state
state["items"].append({
"task_id": data["task_id"],
"description": data["description"],
"priority": priority
})
return state
FAQ¶
Q: Can I query events directly from the database? A: Yes, but prefer using EventStore methods. Direct queries:
Q: How do I handle large event payloads? A: Store references, not content. Example:
# ❌ Bad: Store entire image in event
event_data = {"image_data": base64_image} # Too large!
# ✅ Good: Store S3 URL
event_data = {"image_url": "s3://bucket/image.jpg"}
Q: Can I delete events? A: Technically yes, but strongly discouraged. Events are your audit trail. Instead: - Emit a compensating event (e.g., "item_deleted") - Use soft deletes in state
Q: How do I handle schema changes? A: Always backward compatible: - Add new fields with defaults - Never rename or remove fields - Version your events
Support¶
- Architecture Docs:
docs/decisions/004-miniapp-framework-architecture.md - Framework Guide:
docs/miniapps/MINIAPP_FRAMEWORK.md - Handler Routing:
docs/miniapps/HANDLER_ROUTING.md - UI Design:
docs/miniapps/UI_SCHEMA_DESIGN.md - Testing Guide:
MINIAPP_TESTING_GUIDE.md
Last Updated: December 2025 API Version: 1.1