Complete Proactive Flow: How Sage Decides to Message You¶
This document traces the entire journey from data scanning → analysis → decision → message delivery for Sage's proactive workflows.
🎯 Overview: The 5-Step Proactive Pipeline¶
📊 Use Case 1: Travel Brain (Flight Reminder)¶
Scenario: You have a flight tomorrow at 7:15pm, and Sage reminds you 4 hours before.
Step 1: Scheduler Trigger ⏰¶
Location: app/scheduler/background_service.py:206-226
When: Every 20 minutes, 8am-11pm
# Line 61-66
self.scheduler.add_job(
self.run_travel_brain,
CronTrigger(hour='8-23', minute='*/20'), # Every 20 min
id='travel_brain',
name='Travel Brain Support'
)
What Happens:
1. APScheduler fires at 3:00pm (20 min past hour)
2. Calls run_travel_brain()
3. Gets all active users from database
4. For each user, calls workflow_engine.execute_workflow()
Code Path:
APScheduler → run_travel_brain() → workflow_engine.execute_workflow(
workflow_id="proactive_travel_support",
user_id="+16463831222",
trigger_data={'scan_time': '2025-11-03T15:00:00', 'user_phone': '+16463831222'}
)
Step 2: Enable Check ✅¶
Location: app/superpowers/catalog/proactive/travel_support.py:27-44
Node: check_enabled (function node)
What It Does:
manager = get_superpower_manager()
enabled = manager.is_enabled(user_phone, 'travel_brain')
settings = manager.get_settings(user_phone, 'travel_brain')
Database Query:
SELECT enabled, settings
FROM superpower_settings
WHERE user_phone = '+16463831222' AND superpower_id = 'travel_brain'
Decision Point:
- ✅ If enabled → Continue to data scan
- ❌ If disabled → return {'enabled': False} → Workflow stops
Settings Retrieved:
{
'departure_reminder_hours': 4, # Default: 4h before flight
'landing_buffer_hours': 2,
'immigration_reminder': True
}
Step 3: Data Scanning 🔍¶
Two parallel data sources are scanned:
3A. Gmail Scan (Flight Confirmations)¶
Location: app/superpowers/catalog/proactive/travel_support.py:46-56
Node: scan_flight_emails (gmail node)
What It Does:
# Node config
{
"type": "gmail",
"action": "search_messages",
"query": "(flight confirmation OR boarding pass OR itinerary OR eTicket) after:-14d",
"max_results": 15,
"include_body": True
}
Execution Path:
WorkflowEngine.execute_node()
→ GmailNode.execute()
→ TokenManager.get_token() [OAuth check]
→ Build Gmail service
→ service.users().messages().list(query=query)
→ Fetch 15 emails matching pattern
Code Location: app/superpowers/nodes/connectors/gmail.py:39-262
OAuth Check:
- Checks oauth_tokens table for user's Gmail token
- If missing → Returns requires_auth: true with auth URL
- If present → Uses token to authenticate Gmail API
API Call Made:
# Google Gmail API call
service.users().messages().list(
userId='me',
q='(flight confirmation OR boarding pass) after:-14d',
maxResults=15
).execute()
Data Returned:
{
'messages': [
{
'id': 'msg_123',
'from': 'united@email.united.com',
'subject': 'Your Flight Confirmation - UA 2445',
'snippet': 'Your flight UA 2445 from JFK to LAX...',
'body': 'Full email HTML/text content...'
},
...
]
}
3B. Calendar Scan (Travel Events)¶
Location: app/superpowers/catalog/proactive/travel_support.py:58-69
Node: scan_calendar_travel (google_calendar node)
What It Does:
# Node config
{
"type": "google_calendar",
"action": "list_events",
"time_min": "now",
"time_max": "+7d",
"max_results": 30
}
Execution Path:
WorkflowEngine.execute_node()
→ GoogleCalendarNode.execute()
→ TokenManager.get_token() [OAuth check]
→ Build Calendar service
→ service.events().list()
Code Location: app/superpowers/nodes/connectors/google_calendar.py:47-276
API Call Made:
# Google Calendar API call
service.events().list(
calendarId='primary',
timeMin='2025-11-03T15:00:00Z',
timeMax='2025-11-10T15:00:00Z',
maxResults=30,
singleEvents=True,
orderBy='startTime'
).execute()
Data Returned:
{
'events': [
{
'id': 'cal_event_456',
'summary': 'Flight to LAX',
'start': {'dateTime': '2025-11-04T19:15:00-05:00'},
'end': {'dateTime': '2025-11-04T23:45:00-08:00'},
'location': 'JFK Terminal 4'
},
...
]
}
Step 4: Data Extraction 🔎¶
Location: app/superpowers/catalog/proactive/travel_support.py:71-164
Node: extract_travel_details (function node)
What It Does: - Parses Gmail emails with regex to extract: - Flight numbers (UA 2445, SQ 23, etc.) - Airport codes (JFK, LAX, SIN) - Departure times - Terminal numbers - Confirmation numbers - Extracts from Calendar: - Travel-related events - Flight titles - Dates/times
Code:
# Regex patterns
flight_match = re.search(r'\b([A-Z]{2}\s?\d{1,4})\b', body) # UA 2445
airports = re.findall(r'\b([A-Z]{3})\b', body) # JFK, LAX
time_match = re.search(r'(\d{1,2}:\d{2}\s*(?:AM|PM)?)', body)
Output:
{
'flights': [
{
'source': 'email',
'flight_number': 'UA2445',
'from_airport': 'JFK',
'to_airport': 'LAX',
'time_str': '7:15 PM',
'terminal': '4',
'confirmation': 'ABC123'
},
{
'source': 'calendar',
'event_id': 'cal_event_456',
'title': 'Flight to LAX',
'start': '2025-11-04T19:15:00-05:00',
'to_airport': 'LAX'
}
]
}
Step 5: Action Analysis 🧠¶
Location: app/superpowers/catalog/proactive/travel_support.py:166-277
Node: determine_actions (function node)
What It Does: - For each flight, calculates: - Hours until departure - If within reminder window (4-6h before) - If needs immigration reminder (12-24h before, international) - If needs landing buffer (creates calendar block)
Logic:
now = datetime.now()
departure_time = datetime.fromisoformat(flight['start'])
hours_until = (departure_time - now).total_seconds() / 3600
# Departure reminder (4-6h before)
if 4 <= hours_until <= 6:
actions.append({
'type': 'departure_reminder',
'flight': flight,
'hours_until': hours_until
})
Output:
{
'actions': [
{
'type': 'departure_reminder',
'flight': {...},
'departure_time': '2025-11-04T19:15:00-05:00',
'hours_until': 4.25 # 4 hours 15 minutes
}
]
}
Step 6: Message Generation ✍️¶
Location: app/superpowers/catalog/proactive/travel_support.py:279-342
Node: generate_messages (function node)
What It Does: - Builds user-friendly message from flight data - Uses LLM for natural language generation (optional) - Formats with Sage's casual tone
Code:
msg = f"ur flight"
if flight.get('flight_number'):
msg += f" {flight['flight_number']}"
if flight.get('to_airport'):
msg += f" to {flight['to_airport']}"
msg += f" is in {int(hours)}h"
if flight.get('from_airport'):
msg += f" from {flight['from_airport']}"
if flight.get('terminal'):
msg += f" terminal {flight['terminal']}"
msg += f". leave by {int(hours-1.5)}h before (with traffic)"
Output:
{
'should_send': True,
'messages': [
{
'text': "ur flight UA2445 to LAX is in 4h from JFK terminal 4. leave by 2:30pm before (with traffic)",
'type': 'departure_reminder',
'priority': 'high'
}
]
}
Step 7: Message Delivery 📱¶
Location: app/superpowers/catalog/proactive/travel_support.py:344-409
Node: send_messages (function node)
What It Does: 1. Calls Universal Sender:
sender = get_proactive_sender()
success = sender.send_proactive_message(
user_phone=user_phone,
message_text=msg['text'],
priority='high'
)
-
Platform Detection:
-
Telegram Path:
-
iMessage Path (if edge agent active):
-
Database Logging:
Result: - ✅ Message sent to Telegram/iMessage - ✅ Logged in database - ✅ Workflow completes
📊 Use Case 2: Cancellation Fee Protector¶
Scenario: Sage detects a cancellation deadline in your email and warns you 2 hours before the deadline.
Step 1: Scheduler Trigger ⏰¶
When: 4 times daily (8am, noon, 4pm, 8pm)
Step 2: Enable Check ✅¶
Same as Travel Brain - checks superpower_settings table.
Step 3: Data Scanning 🔍¶
Location: app/superpowers/catalog/proactive/cancellation_protector.py:33-48
Gmail Scan Only:
{
"type": "gmail",
"query": "(cancellation policy OR cancel before OR non-refundable after OR will be charged) after:-30d",
"max_results": 20,
"include_body": True
}
Scans for: - Cancellation policies - Deadline language - Fee warnings - Non-refundable dates
Step 4: Deadline Extraction 🔎¶
Location: app/superpowers/catalog/proactive/cancellation_protector.py:50-144
Regex Patterns:
patterns = [
r'cancel before ([\w\s,]+\d{4})',
r'cancellation deadline:?\s*([\w\s,]+\d{4})',
r'must cancel by ([\w\s,]+\d{4})',
r'non-refundable after ([\w\s,]+\d{4})'
]
Parses Deadlines:
deadline_date = dateparser.parse("November 5, 2025")
hours_until = (deadline_date - datetime.now()).total_seconds() / 3600
Step 5: Urgency Filter 🧠¶
Location: app/superpowers/catalog/proactive/cancellation_protector.py:146-199
Checks:
1. Within reminder window (0 to 4 hours before deadline)
2. Not already warned (checks proactive_notifications table)
Logic:
advance_hours = settings.get('advance_notice_hours', 2) # Default: 2h
if 0 < hours_until <= advance_hours * 2:
if not already_warned:
urgent.append(deadline)
Step 6: Message Generation ✍️¶
Location: app/superpowers/catalog/proactive/cancellation_protector.py:201-221
Message:
msg = f"heads up: last chance to cancel {reservation_name} without fee in {time_str}"
if settings.get('offer_cancellation_help', True):
msg += ". want me to draft a cancellation email?"
Step 7: Message Delivery 📱¶
Same universal sender as Travel Brain → Telegram or iMessage.
📊 Use Case 3: Email Urgency Scan¶
Scenario: Sage detects an urgent email from your professor and notifies you.
Step 1: Scheduler Trigger ⏰¶
When: 3 times daily (8am, 1pm, 6pm)
Step 2: Enable Check ✅¶
Checks if proactive_email_urgency is enabled (this is a built-in workflow, always enabled).
Step 3: Data Scanning 🔍¶
Location: app/superpowers/catalog/proactive/email_urgency.py:36-47
Gmail Scan:
{
"type": "gmail",
"query": "is:unread OR is:important",
"after": "{{$node.get_last_check}}", # Last 2 hours
"max_results": 15,
"include_body": False
}
User Context Scan:
Location: app/superpowers/catalog/proactive/email_urgency.py:49-76
Memory/Context Retrieval:
from app.memory.mem0_service import Mem0Service
mem0 = Mem0Service()
memories = mem0.search_memories(
query="recent searches, interests, upcoming events, stress points",
user_id=user_phone,
limit=5
)
Code Location: app/memory/mem0_service.py
Data Sources: - ✅ Gmail (unread/important emails) - ✅ Memories (user context from past conversations)
Step 4: Urgency Analysis 🧠¶
Location: app/superpowers/catalog/proactive/email_urgency.py:132-212
LLM-Powered Analysis:
llm = LLMClient(provider="openai", model="gpt-5-mini")
system_prompt = '''Analyze emails for urgency and context. Rate 0-10.
Notify if urgency>=7 OR context>=8:
- 10: ASAP/deadline today
- 7-9: This week
- 0-3: Low
'''
response = llm.generate_response(
system_prompt=system_prompt,
user_message=f'''User's recent context (memories):
{context_summary}
New emails to analyze:
{email_summaries}
Which emails should trigger a notification?'''
)
Decision Logic:
analysis = json.loads(response)
if analysis['should_notify']:
for item in analysis['notify_about']:
if item['urgency'] >= 7 or item['context_score'] >= 8:
# Generate notification
Output:
{
'should_notify': True,
'notify_about': [
{
'email_id': 'msg_789',
'urgency': 8,
'context_score': 9,
'notification_text': 'ur prof emailed about the midterm - due tomorrow!'
}
]
}
Step 5: Message Delivery 📱¶
Same universal sender → Telegram/iMessage.
🔍 Data Source Verification¶
✅ Gmail - FULLY IMPLEMENTED¶
- Location:
app/superpowers/nodes/connectors/gmail.py - OAuth:
app/oauth/token_manager.py - Used in:
- Travel Brain (flight confirmations)
- Cancellation Protector (policies)
- Email Urgency (unread emails)
✅ Google Calendar - FULLY IMPLEMENTED¶
- Location:
app/superpowers/nodes/connectors/google_calendar.py - OAuth: Same token manager
- Used in:
- Travel Brain (travel events)
- Morning Calendar Check
- Evening Prep
- Calendar Events Monitoring
✅ Memories (Mem0) - FULLY IMPLEMENTED¶
- Location:
app/memory/mem0_service.py - Used in:
- Email Urgency (user context)
- Evening Prep (stress points)
- All workflows can access memories
⚠️ Weather API - NOT YET IMPLEMENTED¶
- Status: Planned but not built
- Would be used in:
- Weather alerts workflow
- Travel Brain (traffic/weather)
- Integration: Would add
weathernode type
⚠️ Traffic API - NOT YET IMPLEMENTED¶
- Status: Mentioned in code but not implemented
- Would be used in:
- Travel Brain (departure timing)
- Integration: Would add
trafficnode type
📋 Complete Flow Summary¶
Execution Path:¶
1. APScheduler (Cron)
↓
2. SchedulerService.run_*()
↓
3. WorkflowEngine.execute_workflow()
↓
4. For each node in workflow:
├─ check_enabled → SuperpowerManager
├─ scan_* → GmailNode / GoogleCalendarNode
├─ extract_* → Function node (regex parsing)
├─ determine_actions → Function node (time calculations)
├─ generate_messages → Function node (message building)
└─ send_messages → ProactiveSender
├─ Platform detection
├─ Telegram → MessageDelivery
└─ iMessage → EdgeAgentManager
Decision Points:¶
- Is Superpower Enabled? (Line 1 - before any scanning)
- Database query:
superpower_settingstable -
Early exit if disabled → saves API costs
-
Do We Have OAuth Tokens? (During data scan)
- Checks
oauth_tokenstable -
If missing → Returns auth URL → Workflow pauses
-
Is Data Actionable? (After scanning)
- Time windows (e.g., 4-6h before flight)
- Urgency thresholds (e.g., urgency >= 7)
-
Already notified (checks
proactive_notifications) -
Should We Send? (Final check)
- At least one actionable item found
- Not already sent in last 7 days
- User hasn't opted out
✅ Confirmation Checklist¶
Data Sources:¶
- ✅ Gmail - Fully integrated (OAuth + API)
- ✅ Calendar - Fully integrated (OAuth + API)
- ✅ Memories - Fully integrated (Mem0 service)
- ⚠️ Weather - Not yet (would need API key + node)
- ⚠️ Traffic - Not yet (would need API key + node)
Analysis:¶
- ✅ Regex extraction (flight numbers, deadlines)
- ✅ Time calculations (hours until, time windows)
- ✅ LLM analysis (email urgency, context scoring)
- ✅ Memory context (user preferences, stress points)
Decision Making:¶
- ✅ Enable/disable checks
- ✅ Time window validation
- ✅ Duplicate prevention
- ✅ Priority levels
Message Delivery:¶
- ✅ Universal sender (Telegram + iMessage)
- ✅ Platform auto-detection
- ✅ Priority-based timing
- ✅ Database logging
🎯 Summary¶
All core pieces ARE implemented for:
- ✅ Gmail scanning
- ✅ Calendar scanning
- ✅ Memory/context access
- ✅ Analysis and decision-making
- ✅ Message generation
- ✅ Universal delivery (Telegram + iMessage)
Missing pieces (not blocking current workflows): - ⚠️ Weather API (future enhancement) - ⚠️ Traffic API (future enhancement)
The system works end-to-end - from scheduler trigger → data scan → analysis → decision → message delivery. All workflows follow this exact pattern!