Skip to content

Complete Proactive Flow: How Sage Decides to Message You

This document traces the entire journey from data scanning → analysis → decision → message delivery for Sage's proactive workflows.


🎯 Overview: The 5-Step Proactive Pipeline

1. SCHEDULER TRIGGER → 2. DATA SCAN → 3. ANALYSIS → 4. DECISION → 5. MESSAGE DELIVERY

📊 Use Case 1: Travel Brain (Flight Reminder)

Scenario: You have a flight tomorrow at 7:15pm, and Sage reminds you 4 hours before.

Step 1: Scheduler Trigger

Location: app/scheduler/background_service.py:206-226

When: Every 20 minutes, 8am-11pm

# Line 61-66
self.scheduler.add_job(
    self.run_travel_brain,
    CronTrigger(hour='8-23', minute='*/20'),  # Every 20 min
    id='travel_brain',
    name='Travel Brain Support'
)

What Happens: 1. APScheduler fires at 3:00pm (20 min past hour) 2. Calls run_travel_brain() 3. Gets all active users from database 4. For each user, calls workflow_engine.execute_workflow()

Code Path:

APScheduler → run_travel_brain() → workflow_engine.execute_workflow(
    workflow_id="proactive_travel_support",
    user_id="+16463831222",
    trigger_data={'scan_time': '2025-11-03T15:00:00', 'user_phone': '+16463831222'}
)


Step 2: Enable Check

Location: app/superpowers/catalog/proactive/travel_support.py:27-44

Node: check_enabled (function node)

What It Does:

manager = get_superpower_manager()
enabled = manager.is_enabled(user_phone, 'travel_brain')
settings = manager.get_settings(user_phone, 'travel_brain')

Database Query:

SELECT enabled, settings 
FROM superpower_settings 
WHERE user_phone = '+16463831222' AND superpower_id = 'travel_brain'

Decision Point: - ✅ If enabled → Continue to data scan - ❌ If disabled → return {'enabled': False} → Workflow stops

Settings Retrieved:

{
    'departure_reminder_hours': 4,  # Default: 4h before flight
    'landing_buffer_hours': 2,
    'immigration_reminder': True
}


Step 3: Data Scanning 🔍

Two parallel data sources are scanned:

3A. Gmail Scan (Flight Confirmations)

Location: app/superpowers/catalog/proactive/travel_support.py:46-56

Node: scan_flight_emails (gmail node)

What It Does:

# Node config
{
    "type": "gmail",
    "action": "search_messages",
    "query": "(flight confirmation OR boarding pass OR itinerary OR eTicket) after:-14d",
    "max_results": 15,
    "include_body": True
}

Execution Path:

WorkflowEngine.execute_node() 
  → GmailNode.execute() 
    → TokenManager.get_token() [OAuth check]
    → Build Gmail service
    → service.users().messages().list(query=query)
    → Fetch 15 emails matching pattern

Code Location: app/superpowers/nodes/connectors/gmail.py:39-262

OAuth Check: - Checks oauth_tokens table for user's Gmail token - If missing → Returns requires_auth: true with auth URL - If present → Uses token to authenticate Gmail API

API Call Made:

# Google Gmail API call
service.users().messages().list(
    userId='me',
    q='(flight confirmation OR boarding pass) after:-14d',
    maxResults=15
).execute()

Data Returned:

{
    'messages': [
        {
            'id': 'msg_123',
            'from': 'united@email.united.com',
            'subject': 'Your Flight Confirmation - UA 2445',
            'snippet': 'Your flight UA 2445 from JFK to LAX...',
            'body': 'Full email HTML/text content...'
        },
        ...
    ]
}


3B. Calendar Scan (Travel Events)

Location: app/superpowers/catalog/proactive/travel_support.py:58-69

Node: scan_calendar_travel (google_calendar node)

What It Does:

# Node config
{
    "type": "google_calendar",
    "action": "list_events",
    "time_min": "now",
    "time_max": "+7d",
    "max_results": 30
}

Execution Path:

WorkflowEngine.execute_node()
  → GoogleCalendarNode.execute()
    → TokenManager.get_token() [OAuth check]
    → Build Calendar service
    → service.events().list()

Code Location: app/superpowers/nodes/connectors/google_calendar.py:47-276

API Call Made:

# Google Calendar API call
service.events().list(
    calendarId='primary',
    timeMin='2025-11-03T15:00:00Z',
    timeMax='2025-11-10T15:00:00Z',
    maxResults=30,
    singleEvents=True,
    orderBy='startTime'
).execute()

Data Returned:

{
    'events': [
        {
            'id': 'cal_event_456',
            'summary': 'Flight to LAX',
            'start': {'dateTime': '2025-11-04T19:15:00-05:00'},
            'end': {'dateTime': '2025-11-04T23:45:00-08:00'},
            'location': 'JFK Terminal 4'
        },
        ...
    ]
}


Step 4: Data Extraction 🔎

Location: app/superpowers/catalog/proactive/travel_support.py:71-164

Node: extract_travel_details (function node)

What It Does: - Parses Gmail emails with regex to extract: - Flight numbers (UA 2445, SQ 23, etc.) - Airport codes (JFK, LAX, SIN) - Departure times - Terminal numbers - Confirmation numbers - Extracts from Calendar: - Travel-related events - Flight titles - Dates/times

Code:

# Regex patterns
flight_match = re.search(r'\b([A-Z]{2}\s?\d{1,4})\b', body)  # UA 2445
airports = re.findall(r'\b([A-Z]{3})\b', body)  # JFK, LAX
time_match = re.search(r'(\d{1,2}:\d{2}\s*(?:AM|PM)?)', body)

Output:

{
    'flights': [
        {
            'source': 'email',
            'flight_number': 'UA2445',
            'from_airport': 'JFK',
            'to_airport': 'LAX',
            'time_str': '7:15 PM',
            'terminal': '4',
            'confirmation': 'ABC123'
        },
        {
            'source': 'calendar',
            'event_id': 'cal_event_456',
            'title': 'Flight to LAX',
            'start': '2025-11-04T19:15:00-05:00',
            'to_airport': 'LAX'
        }
    ]
}


Step 5: Action Analysis 🧠

Location: app/superpowers/catalog/proactive/travel_support.py:166-277

Node: determine_actions (function node)

What It Does: - For each flight, calculates: - Hours until departure - If within reminder window (4-6h before) - If needs immigration reminder (12-24h before, international) - If needs landing buffer (creates calendar block)

Logic:

now = datetime.now()
departure_time = datetime.fromisoformat(flight['start'])
hours_until = (departure_time - now).total_seconds() / 3600

# Departure reminder (4-6h before)
if 4 <= hours_until <= 6:
    actions.append({
        'type': 'departure_reminder',
        'flight': flight,
        'hours_until': hours_until
    })

Output:

{
    'actions': [
        {
            'type': 'departure_reminder',
            'flight': {...},
            'departure_time': '2025-11-04T19:15:00-05:00',
            'hours_until': 4.25  # 4 hours 15 minutes
        }
    ]
}


Step 6: Message Generation ✍️

Location: app/superpowers/catalog/proactive/travel_support.py:279-342

Node: generate_messages (function node)

What It Does: - Builds user-friendly message from flight data - Uses LLM for natural language generation (optional) - Formats with Sage's casual tone

Code:

msg = f"ur flight"
if flight.get('flight_number'):
    msg += f" {flight['flight_number']}"
if flight.get('to_airport'):
    msg += f" to {flight['to_airport']}"
msg += f" is in {int(hours)}h"

if flight.get('from_airport'):
    msg += f" from {flight['from_airport']}"
if flight.get('terminal'):
    msg += f" terminal {flight['terminal']}"

msg += f". leave by {int(hours-1.5)}h before (with traffic)"

Output:

{
    'should_send': True,
    'messages': [
        {
            'text': "ur flight UA2445 to LAX is in 4h from JFK terminal 4. leave by 2:30pm before (with traffic)",
            'type': 'departure_reminder',
            'priority': 'high'
        }
    ]
}


Step 7: Message Delivery 📱

Location: app/superpowers/catalog/proactive/travel_support.py:344-409

Node: send_messages (function node)

What It Does: 1. Calls Universal Sender:

sender = get_proactive_sender()
success = sender.send_proactive_message(
    user_phone=user_phone,
    message_text=msg['text'],
    priority='high'
)

  1. Platform Detection:

    # app/messaging/proactive_sender.py:47-70
    platform = self._get_user_platform(user_phone)
    
    if platform == 'imessage':
        return self._send_via_imessage(...)
    elif platform == 'telegram':
        return self._send_via_telegram(...)
    

  2. Telegram Path:

    # app/messaging/proactive_sender.py:125-142
    from app.messaging.delivery import MessageDelivery
    delivery = MessageDelivery()
    delivery.send_telegram_message(
        chat_id=user_phone,
        text=message_text,
        parse_mode=None
    )
    

  3. iMessage Path (if edge agent active):

    # app/messaging/proactive_sender.py:97-124
    self.edge_manager.schedule_message(
        user_phone=user_phone,
        thread_id=thread_id,
        message_text=message_text,
        send_at=send_at,  # With 1-3 min delay for natural feel
        is_group=False
    )
    

  4. Database Logging:

    INSERT INTO proactive_notifications
    (user_phone, notification_type, item_id, notified_at, notification_text)
    VALUES ('+16463831222', 'travel_brain', 'travel_20251103150000', NOW(), 'ur flight UA2445...')
    

Result: - ✅ Message sent to Telegram/iMessage - ✅ Logged in database - ✅ Workflow completes


📊 Use Case 2: Cancellation Fee Protector

Scenario: Sage detects a cancellation deadline in your email and warns you 2 hours before the deadline.

Step 1: Scheduler Trigger

When: 4 times daily (8am, noon, 4pm, 8pm)

CronTrigger(hour='8,12,16,20', minute=0)

Step 2: Enable Check

Same as Travel Brain - checks superpower_settings table.

Step 3: Data Scanning 🔍

Location: app/superpowers/catalog/proactive/cancellation_protector.py:33-48

Gmail Scan Only:

{
    "type": "gmail",
    "query": "(cancellation policy OR cancel before OR non-refundable after OR will be charged) after:-30d",
    "max_results": 20,
    "include_body": True
}

Scans for: - Cancellation policies - Deadline language - Fee warnings - Non-refundable dates

Step 4: Deadline Extraction 🔎

Location: app/superpowers/catalog/proactive/cancellation_protector.py:50-144

Regex Patterns:

patterns = [
    r'cancel before ([\w\s,]+\d{4})',
    r'cancellation deadline:?\s*([\w\s,]+\d{4})',
    r'must cancel by ([\w\s,]+\d{4})',
    r'non-refundable after ([\w\s,]+\d{4})'
]

Parses Deadlines:

deadline_date = dateparser.parse("November 5, 2025")
hours_until = (deadline_date - datetime.now()).total_seconds() / 3600

Step 5: Urgency Filter 🧠

Location: app/superpowers/catalog/proactive/cancellation_protector.py:146-199

Checks: 1. Within reminder window (0 to 4 hours before deadline) 2. Not already warned (checks proactive_notifications table)

Logic:

advance_hours = settings.get('advance_notice_hours', 2)  # Default: 2h

if 0 < hours_until <= advance_hours * 2:
    if not already_warned:
        urgent.append(deadline)

Step 6: Message Generation ✍️

Location: app/superpowers/catalog/proactive/cancellation_protector.py:201-221

Message:

msg = f"heads up: last chance to cancel {reservation_name} without fee in {time_str}"
if settings.get('offer_cancellation_help', True):
    msg += ". want me to draft a cancellation email?"

Step 7: Message Delivery 📱

Same universal sender as Travel Brain → Telegram or iMessage.


📊 Use Case 3: Email Urgency Scan

Scenario: Sage detects an urgent email from your professor and notifies you.

Step 1: Scheduler Trigger

When: 3 times daily (8am, 1pm, 6pm)

CronTrigger(hour='8,13,18', minute=0)

Step 2: Enable Check

Checks if proactive_email_urgency is enabled (this is a built-in workflow, always enabled).

Step 3: Data Scanning 🔍

Location: app/superpowers/catalog/proactive/email_urgency.py:36-47

Gmail Scan:

{
    "type": "gmail",
    "query": "is:unread OR is:important",
    "after": "{{$node.get_last_check}}",  # Last 2 hours
    "max_results": 15,
    "include_body": False
}

User Context Scan: Location: app/superpowers/catalog/proactive/email_urgency.py:49-76

Memory/Context Retrieval:

from app.memory.mem0_service import Mem0Service

mem0 = Mem0Service()
memories = mem0.search_memories(
    query="recent searches, interests, upcoming events, stress points",
    user_id=user_phone,
    limit=5
)

Code Location: app/memory/mem0_service.py

Data Sources: - ✅ Gmail (unread/important emails) - ✅ Memories (user context from past conversations)

Step 4: Urgency Analysis 🧠

Location: app/superpowers/catalog/proactive/email_urgency.py:132-212

LLM-Powered Analysis:

llm = LLMClient(provider="openai", model="gpt-5-mini")

system_prompt = '''Analyze emails for urgency and context. Rate 0-10.

Notify if urgency>=7 OR context>=8:
- 10: ASAP/deadline today
- 7-9: This week
- 0-3: Low
'''

response = llm.generate_response(
    system_prompt=system_prompt,
    user_message=f'''User's recent context (memories):
{context_summary}

New emails to analyze:
{email_summaries}

Which emails should trigger a notification?'''
)

Decision Logic:

analysis = json.loads(response)

if analysis['should_notify']:
    for item in analysis['notify_about']:
        if item['urgency'] >= 7 or item['context_score'] >= 8:
            # Generate notification

Output:

{
    'should_notify': True,
    'notify_about': [
        {
            'email_id': 'msg_789',
            'urgency': 8,
            'context_score': 9,
            'notification_text': 'ur prof emailed about the midterm - due tomorrow!'
        }
    ]
}

Step 5: Message Delivery 📱

Same universal sender → Telegram/iMessage.


🔍 Data Source Verification

Gmail - FULLY IMPLEMENTED

  • Location: app/superpowers/nodes/connectors/gmail.py
  • OAuth: app/oauth/token_manager.py
  • Used in:
  • Travel Brain (flight confirmations)
  • Cancellation Protector (policies)
  • Email Urgency (unread emails)

Google Calendar - FULLY IMPLEMENTED

  • Location: app/superpowers/nodes/connectors/google_calendar.py
  • OAuth: Same token manager
  • Used in:
  • Travel Brain (travel events)
  • Morning Calendar Check
  • Evening Prep
  • Calendar Events Monitoring

Memories (Mem0) - FULLY IMPLEMENTED

  • Location: app/memory/mem0_service.py
  • Used in:
  • Email Urgency (user context)
  • Evening Prep (stress points)
  • All workflows can access memories

⚠️ Weather API - NOT YET IMPLEMENTED

  • Status: Planned but not built
  • Would be used in:
  • Weather alerts workflow
  • Travel Brain (traffic/weather)
  • Integration: Would add weather node type

⚠️ Traffic API - NOT YET IMPLEMENTED

  • Status: Mentioned in code but not implemented
  • Would be used in:
  • Travel Brain (departure timing)
  • Integration: Would add traffic node type

📋 Complete Flow Summary

Execution Path:

1. APScheduler (Cron)
2. SchedulerService.run_*()
3. WorkflowEngine.execute_workflow()
4. For each node in workflow:
   ├─ check_enabled → SuperpowerManager
   ├─ scan_* → GmailNode / GoogleCalendarNode
   ├─ extract_* → Function node (regex parsing)
   ├─ determine_actions → Function node (time calculations)
   ├─ generate_messages → Function node (message building)
   └─ send_messages → ProactiveSender
      ├─ Platform detection
      ├─ Telegram → MessageDelivery
      └─ iMessage → EdgeAgentManager

Decision Points:

  1. Is Superpower Enabled? (Line 1 - before any scanning)
  2. Database query: superpower_settings table
  3. Early exit if disabled → saves API costs

  4. Do We Have OAuth Tokens? (During data scan)

  5. Checks oauth_tokens table
  6. If missing → Returns auth URL → Workflow pauses

  7. Is Data Actionable? (After scanning)

  8. Time windows (e.g., 4-6h before flight)
  9. Urgency thresholds (e.g., urgency >= 7)
  10. Already notified (checks proactive_notifications)

  11. Should We Send? (Final check)

  12. At least one actionable item found
  13. Not already sent in last 7 days
  14. User hasn't opted out

✅ Confirmation Checklist

Data Sources:

  • ✅ Gmail - Fully integrated (OAuth + API)
  • ✅ Calendar - Fully integrated (OAuth + API)
  • ✅ Memories - Fully integrated (Mem0 service)
  • ⚠️ Weather - Not yet (would need API key + node)
  • ⚠️ Traffic - Not yet (would need API key + node)

Analysis:

  • ✅ Regex extraction (flight numbers, deadlines)
  • ✅ Time calculations (hours until, time windows)
  • ✅ LLM analysis (email urgency, context scoring)
  • ✅ Memory context (user preferences, stress points)

Decision Making:

  • ✅ Enable/disable checks
  • ✅ Time window validation
  • ✅ Duplicate prevention
  • ✅ Priority levels

Message Delivery:

  • ✅ Universal sender (Telegram + iMessage)
  • ✅ Platform auto-detection
  • ✅ Priority-based timing
  • ✅ Database logging

🎯 Summary

All core pieces ARE implemented for: - ✅ Gmail scanning - ✅ Calendar scanning
- ✅ Memory/context access - ✅ Analysis and decision-making - ✅ Message generation - ✅ Universal delivery (Telegram + iMessage)

Missing pieces (not blocking current workflows): - ⚠️ Weather API (future enhancement) - ⚠️ Traffic API (future enhancement)

The system works end-to-end - from scheduler trigger → data scan → analysis → decision → message delivery. All workflows follow this exact pattern!