Skip to content

Mini-App Framework API Reference

Version: 1.1 Last Updated: December 2025


Overview

This document provides a complete API reference for the Event Sourcing Layer of the Mini-App Framework, including EventStore, StateCoordinator, reducers, and data structures.

For the Handler Routing Layer (message processing, session management, UI generation), see: - HANDLER_ROUTING.md - Handler documentation - UI_SCHEMA_DESIGN.md - UI configuration schemas


Core Classes

EventStore

Location: app/miniapps/event_store.py

Manages append-only event log with thread-safe sequence number generation.

Methods

append_event()

Append an event to the log with atomic sequence number generation.

def append_event(
    self,
    app_id: str,
    room_id: UUID,
    event_type: str,
    event_data: Dict[str, Any],
    user_id: Optional[UUID] = None,
) -> EventLog

Parameters: - app_id (str): Mini-app identifier (e.g., "bill_split") - room_id (UUID): Room/session identifier - event_type (str): Event type from schema enum - event_data (Dict): Event payload - user_id (UUID, optional): User who triggered event

Returns: EventLog object with generated sequence number

Example:

from app.miniapps.event_store import EventStore

event_store = EventStore(db_session)
event = event_store.append_event(
    app_id="todo_list",
    room_id=room_id,
    event_type="task_added",
    event_data={
        "task_id": "task-123",
        "description": "Buy milk",
        "priority": "normal"
    },
    user_id=user_id
)

print(event.sequence_number)  # e.g., 42

Thread Safety: Uses PostgreSQL advisory locks to ensure atomic sequence number generation.

get_events()

Retrieve events for a room, optionally since a sequence number.

def get_events(
    self,
    room_id: UUID,
    since_sequence: Optional[int] = None,
) -> List[EventLog]

Parameters: - room_id (UUID): Room identifier - since_sequence (int, optional): Only return events after this sequence number

Returns: List of EventLog objects, ordered by sequence number

Example:

# Get all events
all_events = event_store.get_events(room_id=room_id)

# Get events since snapshot
recent_events = event_store.get_events(
    room_id=room_id,
    since_sequence=100
)

get_latest_sequence()

Get the highest sequence number for a room.

def get_latest_sequence(self, room_id: UUID) -> int

Returns: Latest sequence number, or 0 if no events exist

Example:

latest = event_store.get_latest_sequence(room_id)
print(f"Room has {latest} events")


StateCoordinator

Location: app/miniapps/state_coordinator.py

Reconstructs current state from events using event replay with caching.

Methods

get_current_state()

Get current state by replaying events (with caching and snapshots).

def get_current_state(
    self,
    app_id: str,
    room_id: UUID,
    use_cache: bool = True,
) -> Dict[str, Any]

Parameters: - app_id (str): Mini-app identifier - room_id (UUID): Room identifier - use_cache (bool): Whether to use Redis cache (default: True)

Returns: Current state dictionary

Example:

from app.miniapps.state_coordinator import StateCoordinator

coordinator = StateCoordinator(db_session, redis_client)
state = coordinator.get_current_state(
    app_id="todo_list",
    room_id=room_id
)

print(state["tasks"])  # List of tasks
print(state["version"])  # State version number

Performance: - Cache hit: <100ms - Snapshot hit: ~200ms (replay since snapshot) - Full replay: ~500ms (replay all events)

register_reducer()

Register a state reducer function for a mini-app.

def register_reducer(self, app_id: str, reducer: Callable)

Parameters: - app_id (str): Mini-app identifier - reducer (Callable): Reducer function with signature: (state: Dict, event: EventLog) -> Dict

Example:

def my_app_reducer(state: Dict, event: EventLog) -> Dict:
    if event.event_type == "item_added":
        state["items"].append(event.event_data)
    return state

coordinator.register_reducer("my_app", my_app_reducer)

invalidate_cache()

Invalidate cached state for a room.

def invalidate_cache(self, app_id: str, room_id: UUID)

Use Case: Force state rebuild on next read (rarely needed)

Example:

coordinator.invalidate_cache("todo_list", room_id)


MiniAppDetector

Location: app/orchestrator/miniapp_detector.py

Detects mini-app triggers from natural language messages.

Methods

detect_trigger()

Analyze message and detect mini-app trigger.

def detect_trigger(
    self,
    message: str,
    user_enabled_apps: List[str],
    active_app_in_thread: Optional[str] = None,
) -> Optional[str]

Parameters: - message (str): User message text - user_enabled_apps (List[str]): Apps user has enabled - active_app_in_thread (str, optional): Currently active app in thread

Returns: Mini-app ID if triggered, else None

Example:

from app.orchestrator.miniapp_detector import get_detector

detector = get_detector()
triggered = detector.detect_trigger(
    message="Let's split this $45 bill",
    user_enabled_apps=["bill_split", "todo_list"]
)

print(triggered)  # "bill_split"

Pattern Matching: - Keywords: Individual words ("bill", "split", "owe") - Phrases: Multi-word phrases ("split this", "who owes what") - Regex: Pattern matching (r"split.*bill") - Priority: Higher priority patterns win on ties

get_enabled_apps_for_user()

Get list of mini-apps user has enabled.

def get_enabled_apps_for_user(self, user_id: UUID) -> List[str]

Returns: List of enabled mini-app IDs

Example:

enabled = detector.get_enabled_apps_for_user(user_id)
print(enabled)  # ["bill_split", "todo_list", "trip_planner"]


RoomManager

Location: app/miniapps/room_manager.py

Manages multi-user rooms/sessions.

Methods

create_room()

Create a new room for a mini-app.

def create_room(
    self,
    app_id: str,
    creator_id: UUID,
    room_type: str = "group",
    initial_participants: Optional[List[UUID]] = None,
) -> Room

Parameters: - app_id (str): Mini-app identifier - creator_id (UUID): User creating the room - room_type (str): "group" or "direct" - initial_participants (List[UUID], optional): Initial members

Returns: Room object

Example:

from app.miniapps.room_manager import RoomManager

room_manager = RoomManager(db_session)
room = room_manager.create_room(
    app_id="bill_split",
    creator_id=user_id,
    room_type="group",
    initial_participants=[user_id, friend_id]
)

print(room.id)  # UUID
print(room.app_id)  # "bill_split"

add_participant()

Add a user to a room.

def add_participant(self, room_id: UUID, user_id: UUID)

Example:

room_manager.add_participant(room_id, new_user_id)

get_participants()

Get all participants in a room.

def get_participants(self, room_id: UUID) -> List[UUID]

Returns: List of user IDs

Example:

participants = room_manager.get_participants(room_id)
print(f"Room has {len(participants)} members")


Data Structures

TriggerPattern

Location: app/orchestrator/miniapp_detector.py

Defines patterns for auto-triggering a mini-app.

@dataclass
class TriggerPattern:
    mini_app_id: str                # e.g., "bill_split"
    keywords: List[str]             # ["bill", "receipt", "split"]
    phrases: List[str]              # ["split this", "split the bill"]
    regex_patterns: List[str]       # [r"split.*bill"]
    priority: int = 5               # Higher = preferred on ties

Example:

TriggerPattern(
    mini_app_id="bill_split",
    keywords=["bill", "receipt", "split", "owe"],
    phrases=["split this", "split the bill", "who owes what"],
    regex_patterns=[r"split.*bill", r"who.*owe"],
    priority=10
)


EventLog

Location: app/models/database.py

Database model for events.

class EventLog(Base):
    __tablename__ = "event_log"

    id: UUID                        # Event ID
    app_id: str                     # Mini-app ID
    room_id: UUID                   # Room ID
    event_type: str                 # Event type
    event_data: Dict[str, Any]      # Event payload (JSON)
    sequence_number: int            # Ordering within room
    user_id: Optional[UUID]         # Who triggered event
    timestamp: datetime             # When event occurred

Example Query:

events = db.query(EventLog).filter(
    EventLog.room_id == room_id,
    EventLog.app_id == "todo_list"
).order_by(EventLog.sequence_number).all()


Room

Location: app/models/database.py

Database model for rooms.

class Room(Base):
    __tablename__ = "rooms"

    id: UUID                        # Room ID
    app_id: str                     # Mini-app ID
    room_type: str                  # "group" or "direct"
    created_at: datetime
    updated_at: datetime

StateSnapshot

Location: app/models/database.py

Optimization: Store computed state snapshots to speed up reconstruction.

class StateSnapshot(Base):
    __tablename__ = "state_snapshots"

    id: UUID
    room_id: UUID                   # Room ID
    snapshot_type: str              # Mini-app ID
    snapshot_data: Dict[str, Any]   # Computed state (JSON)
    version: int                    # Last event sequence number included
    created_at: datetime

Automatic Creation: StateCoordinator creates snapshots every 100 events.


Reducer Function Signature

All reducer functions must follow this signature:

def my_app_reducer(state: Dict[str, Any], event: EventLog) -> Dict[str, Any]:
    """
    Pure function: (state, event) → new_state

    Rules:
    - NO side effects (no DB writes, API calls, etc.)
    - Deterministic (same inputs → same output)
    - Always return modified state
    - Increment state["version"]
    """

    if event.event_type == "my_event":
        # Transform state based on event
        state["field"] = event.event_data["value"]

    state["version"] += 1
    return state

Required: - Pure function (no side effects) - Deterministic - Returns modified state - Increments state["version"]

Forbidden: - Database writes - API calls - Random values - System time (use event.timestamp instead) - Throwing exceptions (return state unchanged)


Workflow Node Types

create_room

Create a new room for multi-user collaboration.

WorkflowNode(
    id="create_room",
    type="create_room",
    config={
        "app_id": "my_app",
        "room_type": "group"  # or "direct"
    }
)

Outputs: - {{$node.create_room.room_id}} - Created room UUID


emit_event

Emit an event to the EventStore.

WorkflowNode(
    id="init_app",
    type="emit_event",
    config={
        "app_id": "my_app",
        "event_type": "app_created",
        "payload": {
            "name": "My App",
            "created_by": "{{$context.user_id}}"
        }
    }
)

Template Variables: - {{$context.user_id}} - Current user ID - {{$trigger.text}} - Trigger message text - {{$node.id.field}} - Output from previous node


get_room_state

Fetch current state from StateCoordinator.

WorkflowNode(
    id="get_state",
    type="get_room_state",
    config={
        "app_id": "my_app"
    }
)

Outputs: - {{$node.get_state}} - Current state dictionary


render_block

Send a response block to the user.

WorkflowNode(
    id="send_confirmation",
    type="render_block",
    config={
        "block_type": "text",
        "block_data": {
            "content": "✅ Task added to your list!"
        }
    }
)

Block Types: - text - Plain text message - poll - Interactive poll - list - Ordered/unordered list - card - Rich card with title/description/buttons


function

Execute custom Python function.

WorkflowNode(
    id="parse_task",
    type="function",
    config={
        "function_name": "parse_task_from_message",
        "input": "{{$trigger.text}}"
    }
)

Requirements: - Function must be registered in workflow system - Must return JSON-serializable output


Database Schema

Event Log Table

CREATE TABLE event_log (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    app_id VARCHAR(50) NOT NULL,
    room_id UUID NOT NULL REFERENCES rooms(id),
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB NOT NULL,
    sequence_number INTEGER NOT NULL,
    user_id UUID REFERENCES users(id),
    timestamp TIMESTAMP DEFAULT NOW(),
    UNIQUE(room_id, sequence_number)
);

CREATE INDEX idx_event_log_room_sequence ON event_log(room_id, sequence_number);
CREATE INDEX idx_event_log_app_id ON event_log(app_id);

Rooms Table

CREATE TABLE rooms (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    app_id VARCHAR(50) NOT NULL,
    room_type VARCHAR(20) NOT NULL,  -- 'group' or 'direct'
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_rooms_app_id ON rooms(app_id);

User Mini-App Settings Table

CREATE TABLE user_miniapp_settings (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    mini_app_id VARCHAR(50) NOT NULL,
    enabled BOOLEAN DEFAULT TRUE,
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    UNIQUE(user_id, mini_app_id)
);

CREATE INDEX idx_user_miniapp_settings_user_id ON user_miniapp_settings(user_id);

Performance Benchmarks

Operation Latency (p50) Latency (p95) Notes
Auto-trigger detection 30ms 80ms Pattern matching
Append event 10ms 25ms PostgreSQL write
Get state (cached) 5ms 15ms Redis hit
Get state (snapshot) 150ms 300ms Replay 100 events
Get state (full replay) 400ms 800ms Replay 500 events
Snapshot creation 50ms 100ms PostgreSQL write

Optimization Tips: - Keep event payloads small (<10KB) - Use snapshots (automatic every 100 events) - Enable Redis caching (5-min TTL) - Batch event appends when possible


Error Handling

EventStore Errors

try:
    event = event_store.append_event(
        app_id="my_app",
        room_id=room_id,
        event_type="invalid_type",
        event_data={}
    )
except IntegrityError:
    # Sequence number collision (rare)
    logger.error("Event sequence collision")
except Exception as e:
    # Other errors
    logger.error(f"Failed to append event: {e}")

StateCoordinator Errors

try:
    state = coordinator.get_current_state("my_app", room_id)
except KeyError:
    # Reducer not registered
    logger.error("Reducer not found for my_app")
except Exception as e:
    # Event replay failed
    logger.error(f"State reconstruction failed: {e}")

Thread Safety

EventStore

Thread-safe: Uses PostgreSQL advisory locks for sequence number generation.

# Multiple threads can safely append events
event1 = event_store.append_event(...)  # Thread 1
event2 = event_store.append_event(...)  # Thread 2
# Guaranteed unique sequence numbers

StateCoordinator

Not thread-safe: Use one instance per request.

# ✅ Good: One instance per request
coordinator = StateCoordinator(db_session, redis_client)
state = coordinator.get_current_state(...)

# ❌ Bad: Sharing instance across threads
global_coordinator = StateCoordinator(...)  # Don't do this

Migration Guide

Adding a New Event Type

  1. Add to schema enum:

    class MyAppEventType(str, Enum):
        EXISTING_EVENT = "existing_event"
        NEW_EVENT = "new_event"  # Add this
    

  2. Handle in reducer:

    def my_app_reducer(state, event):
        if event.event_type == MyAppEventType.NEW_EVENT:
            # Handle new event
            state["new_field"] = event.event_data["value"]
        return state
    

  3. No database migration needed! Events are stored as JSON.


Versioning

Event Schema Versioning

Best Practice: Add version field to event_data:

event_data = {
    "version": 2,  # Schema version
    "task_id": "task-123",
    "description": "Buy milk",
    "priority": "normal",  # New field in v2
}

Reducer Handling:

def my_app_reducer(state, event):
    data = event.event_data
    version = data.get("version", 1)  # Default to v1

    if version == 1:
        # Handle v1 events (no priority field)
        priority = "normal"
    else:
        # Handle v2+ events
        priority = data.get("priority", "normal")

    # Apply to state
    state["items"].append({
        "task_id": data["task_id"],
        "description": data["description"],
        "priority": priority
    })

    return state

FAQ

Q: Can I query events directly from the database? A: Yes, but prefer using EventStore methods. Direct queries:

SELECT * FROM event_log
WHERE room_id = 'xxx' AND app_id = 'my_app'
ORDER BY sequence_number;

Q: How do I handle large event payloads? A: Store references, not content. Example:

# ❌ Bad: Store entire image in event
event_data = {"image_data": base64_image}  # Too large!

# ✅ Good: Store S3 URL
event_data = {"image_url": "s3://bucket/image.jpg"}

Q: Can I delete events? A: Technically yes, but strongly discouraged. Events are your audit trail. Instead: - Emit a compensating event (e.g., "item_deleted") - Use soft deletes in state

Q: How do I handle schema changes? A: Always backward compatible: - Add new fields with defaults - Never rename or remove fields - Version your events


Support

  • Architecture Docs: docs/decisions/004-miniapp-framework-architecture.md
  • Framework Guide: docs/miniapps/MINIAPP_FRAMEWORK.md
  • Handler Routing: docs/miniapps/HANDLER_ROUTING.md
  • UI Design: docs/miniapps/UI_SCHEMA_DESIGN.md
  • Testing Guide: MINIAPP_TESTING_GUIDE.md

Last Updated: December 2025 API Version: 1.1