Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 171 additions & 111 deletions services/volunteers_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, List, Optional, Union, Any, Tuple
import uuid
import time
import threading
from datetime import datetime
import pytz
from functools import lru_cache
Expand Down Expand Up @@ -2451,7 +2452,8 @@ def send_volunteer_message(

# Invalidate Resend email list cache so next fetch picks up the new email
if delivery_status['email_sent']:
delete_cached("resend:all_emails_index")
delete_cached(_RESEND_INDEX_KEY)
delete_cached(_RESEND_FRESH_KEY)

result = {
'success': success,
Expand Down Expand Up @@ -2621,7 +2623,8 @@ def send_email_to_address(

# Invalidate Resend email list cache so next fetch picks up the new email
if email_success:
delete_cached("resend:all_emails_index")
delete_cached(_RESEND_INDEX_KEY)
delete_cached(_RESEND_FRESH_KEY)

result = {
'success': email_success,
Expand Down Expand Up @@ -2689,11 +2692,142 @@ def get_resend_email_statuses(email_ids: list) -> Dict[str, Any]:
return {'success': False, 'error': str(e)}


# Cache keys for stale-while-revalidate pattern
_RESEND_INDEX_KEY = "resend:all_emails_index" # full data, long TTL (stale)
_RESEND_FRESH_KEY = "resend:all_emails_fresh" # freshness flag, short TTL
_RESEND_LOCK_KEY = "resend:all_emails_refreshing" # background-refresh lock

_RESEND_STALE_TTL = 3600 # keep data for 1 hour
_RESEND_FRESH_TTL = 300 # re-fetch after 5 minutes
_RESEND_LOCK_TTL = 120 # lock expires after 2 minutes (prevents stampede)


def _fetch_and_cache_resend_emails() -> Optional[Dict[str, Any]]:
"""
Fetch all pages from Resend and update both cache keys.
Returns the full payload dict on success, None on failure.
Called either inline (first load) or from a background thread.
"""
resend.api_key = os.environ.get('RESEND_EMAIL_STATUS_KEY')
if not resend.api_key:
return None

all_emails = []
max_pages = 100
max_retries = 2
params = {"limit": 100}
page_error_occurred = False
truncated = False

for page in range(max_pages):
retries = 0
while True:
try:
response = resend.Emails.list(params)
email_list = response.get('data', []) if isinstance(response, dict) else getattr(response, 'data', [])
all_emails.extend(email_list)
info(logger, "Fetched Resend emails page",
page=page, count=len(email_list), total_so_far=len(all_emails))

has_more = response.get('has_more', False) if isinstance(response, dict) else getattr(response, 'has_more', False)
if not has_more or len(email_list) == 0:
break

last_item = email_list[-1]
last_id = last_item.get('id', '') if isinstance(last_item, dict) else getattr(last_item, 'id', '')
if not last_id:
break
params = {"limit": 100, "after": last_id}
break # success — move to next page

except Exception as page_error:
retries += 1
if retries <= max_retries:
warning(logger, "Retrying Resend emails page fetch",
page=page, retry=retries, exc_info=page_error)
time.sleep(1 * retries)
continue
error(logger, "Error fetching Resend emails page after retries",
page=page, exc_info=page_error)
page_error_occurred = True
break
if page_error_occurred:
break
else:
truncated = True
warning(logger, "Resend email pagination hit safety cap",
max_pages=max_pages, total_so_far=len(all_emails))

if page_error_occurred:
# Release lock so the next request can retry
delete_cached(_RESEND_LOCK_KEY)
return None

# Build index by recipient email
index: Dict[str, List[Dict]] = {}
for email_data in all_emails:
if isinstance(email_data, dict):
recipients = email_data.get('to', [])
email_id = email_data.get('id', '')
subject = email_data.get('subject', '')
created_at = email_data.get('created_at', '')
last_event = email_data.get('last_event', '')
else:
recipients = getattr(email_data, 'to', [])
email_id = getattr(email_data, 'id', '')
subject = getattr(email_data, 'subject', '')
created_at = getattr(email_data, 'created_at', '')
last_event = getattr(email_data, 'last_event', '')

if isinstance(recipients, str):
recipients = [recipients]

entry = {'id': email_id, 'subject': subject,
'created_at': created_at, 'last_event': last_event}

for recipient in recipients:
key = recipient.lower().strip()
if key not in index:
index[key] = []
index[key].append(entry)

total_fetched = len(all_emails)
info(logger, "Built Resend email index",
total_fetched=total_fetched, unique_recipients=len(index),
truncated=truncated)

payload = {
'emails_by_recipient': index,
'total_fetched': total_fetched,
'truncated': truncated
}
# Store data with long TTL; store freshness flag with short TTL
set_cached(_RESEND_INDEX_KEY, payload, ttl=_RESEND_STALE_TTL)
set_cached(_RESEND_FRESH_KEY, True, ttl=_RESEND_FRESH_TTL)
delete_cached(_RESEND_LOCK_KEY)
return payload


def _background_refresh_resend_emails() -> None:
"""Fire-and-forget background refresh wrapped in try/except."""
try:
_fetch_and_cache_resend_emails()
info(logger, "Background Resend email cache refresh completed")
except Exception as bg_err:
error(logger, "Background Resend email cache refresh failed", exc_info=bg_err)
delete_cached(_RESEND_LOCK_KEY)


def list_all_resend_emails(filter_emails=None):
"""
Fetch all sent emails from Resend's List Emails API, cached in Redis.
Returns an index of {recipient_email: [{id, subject, created_at, last_event}, ...]}.

Uses a stale-while-revalidate strategy:
- Fresh cache hit → return immediately (fast path)
- Stale cache hit → return stale data immediately + refresh in background
- No cache at all → block on first fetch then return

Args:
filter_emails: Optional list of email addresses to filter results for.

Expand All @@ -2705,11 +2839,28 @@ def list_all_resend_emails(filter_emails=None):
if not resend.api_key:
return {'success': False, 'error': 'Resend API key not configured'}

cache_key = "resend:all_emails_index"
cached = get_cached(cache_key)
is_fresh = get_cached(_RESEND_FRESH_KEY) is not None
cached = get_cached(_RESEND_INDEX_KEY)

if cached is not None:
info(logger, "Using cached Resend email index",
total_emails=cached.get('total_fetched', 0))
if not is_fresh:
# Stale — trigger a background refresh if one isn't already running
if get_cached(_RESEND_LOCK_KEY) is None:
set_cached(_RESEND_LOCK_KEY, True, ttl=_RESEND_LOCK_TTL)
t = threading.Thread(
target=_background_refresh_resend_emails,
daemon=True
)
t.start()
info(logger, "Stale Resend email cache — background refresh triggered",
total_fetched=cached.get('total_fetched', 0))
else:
info(logger, "Stale Resend email cache — refresh already in progress",
total_fetched=cached.get('total_fetched', 0))
else:
info(logger, "Using fresh cached Resend email index",
total_emails=cached.get('total_fetched', 0))

index = cached['emails_by_recipient']
if filter_emails:
filter_set = {e.lower() for e in filter_emails}
Expand All @@ -2719,125 +2870,34 @@ def list_all_resend_emails(filter_emails=None):
'emails_by_recipient': index,
'total_fetched': cached['total_fetched'],
'truncated': cached.get('truncated', False),
'from_cache': True
'from_cache': True,
'stale': not is_fresh
}

# Paginate through resend.Emails.list() (requires resend >= 2.5)
# Safety cap of 100 pages (~10,000 emails); for-else detects if we hit it.
all_emails = []
max_pages = 100
max_retries = 2
params = {"limit": 100}
page_error_occurred = False
truncated = False

for page in range(max_pages):
retries = 0
while True:
try:
response = resend.Emails.list(params)
email_list = response.get('data', []) if isinstance(response, dict) else getattr(response, 'data', [])
all_emails.extend(email_list)
info(logger, "Fetched Resend emails page",
page=page, count=len(email_list), total_so_far=len(all_emails))

has_more = response.get('has_more', False) if isinstance(response, dict) else getattr(response, 'has_more', False)
if not has_more or len(email_list) == 0:
break

# Use last email ID as cursor for next page
last_item = email_list[-1]
last_id = last_item.get('id', '') if isinstance(last_item, dict) else getattr(last_item, 'id', '')
if not last_id:
break
params = {"limit": 100, "after": last_id}
break # success — move to next page

except Exception as page_error:
retries += 1
if retries <= max_retries:
warning(logger, "Retrying Resend emails page fetch",
page=page, retry=retries, exc_info=page_error)
time.sleep(1 * retries)
continue
error(logger, "Error fetching Resend emails page after retries",
page=page, exc_info=page_error)
page_error_occurred = True
break
if page_error_occurred:
break
else:
# Loop exhausted max_pages without a natural break — results are truncated.
truncated = True
warning(logger, "Resend email pagination hit safety cap",
max_pages=max_pages, total_so_far=len(all_emails))

# If a page fetch failed, return an error rather than caching partial data.
if page_error_occurred:
# No cached data at all — block on initial fetch
info(logger, "No Resend email cache found — fetching synchronously")
set_cached(_RESEND_LOCK_KEY, True, ttl=_RESEND_LOCK_TTL)
payload = _fetch_and_cache_resend_emails()
if payload is None:
return {
'success': False,
'error': 'Failed to fetch all email pages from Resend',
'error': 'Failed to fetch email pages from Resend',
'emails_by_recipient': {},
'total_fetched': len(all_emails)
}

# Build index by recipient email
index = {}
for email_data in all_emails:
if isinstance(email_data, dict):
recipients = email_data.get('to', [])
email_id = email_data.get('id', '')
subject = email_data.get('subject', '')
created_at = email_data.get('created_at', '')
last_event = email_data.get('last_event', '')
else:
recipients = getattr(email_data, 'to', [])
email_id = getattr(email_data, 'id', '')
subject = getattr(email_data, 'subject', '')
created_at = getattr(email_data, 'created_at', '')
last_event = getattr(email_data, 'last_event', '')

if isinstance(recipients, str):
recipients = [recipients]

entry = {
'id': email_id,
'subject': subject,
'created_at': created_at,
'last_event': last_event,
'total_fetched': 0
}

for recipient in recipients:
key = recipient.lower().strip()
if key not in index:
index[key] = []
index[key].append(entry)

total_fetched = len(all_emails)

info(logger, "Built Resend email index",
total_fetched=total_fetched, unique_recipients=len(index),
truncated=truncated)

# Cache the full index with 300s TTL, including empty results so we
# don't hammer the Resend API on every request when there are no emails.
set_cached(cache_key, {
'emails_by_recipient': index,
'total_fetched': total_fetched,
'truncated': truncated
}, ttl=300)

# Filter if requested
index = payload['emails_by_recipient']
if filter_emails:
filter_set = {e.lower() for e in filter_emails}
index = {k: v for k, v in index.items() if k in filter_set}

return {
'success': True,
'emails_by_recipient': index,
'total_fetched': total_fetched,
'truncated': truncated,
'from_cache': False
'total_fetched': payload['total_fetched'],
'truncated': payload['truncated'],
'from_cache': False,
'stale': False
}

except Exception as e:
Expand Down
Loading