A Python-based event ingestion, classification, AI enrichment, risk scoring, and alerting pipeline for energy risk intelligence.
- RSS Feed Ingestion: Fetches events from 3 news sources (geopolitical, energy, supply chain)
- Automatic Classification: Categorizes events with keyword matching + category hints
- AI Enrichment: Processes events with GPT to generate summaries and market impact analysis
- Risk Scoring: Converts AI-enriched events into quantitative risk scores
- Rolling Risk Indices: Computes 7-day and 30-day regional risk levels (0-100)
- Asset-Level Risk: Tracks risk direction for oil, gas, fx, and freight
- Monetizable Alerts: Tiered subscription system (Free/Trader/Pro) with plan gating
- REST API: Access events, AI analysis, risk data, and alerts via FastAPI endpoints
| Feature | Free | Trader (€29-49/mo) | Pro (€99-149/mo) |
|---|---|---|---|
| Regions | Europe only | Europe | All regions |
| Alert Types | Risk Spike only | All alerts | All alerts + custom thresholds |
| Channels | Email (delayed 60m) | Email (real-time) | Email + Telegram |
| Daily Digest | No | Yes | Yes |
| Max Alerts/Day | 2 | 20 | 50 |
/src
/config
feeds.json # RSS feed configuration
/db
db.py # Database connection helper
migrations.py # Table creation and schema updates
/ingest
rss_fetcher.py # RSS feed fetching
classifier.py # Category/region/severity classification
ingest_runner.py # Orchestrates ingestion runs
/ai
ai_worker.py # AI processing worker
/risk
risk_engine.py # Risk scoring and aggregation engine
/alerts
alerts_engine.py # Alert evaluation and sending
channels.py # Email and Telegram delivery
templates.py # Alert message templates
/api
app.py # FastAPI application
routes.py # Event API endpoints
risk_routes.py # Risk API endpoints
alert_routes.py # Alert API endpoints
marketing_routes.py # Marketing copy endpoints
main.py # Main entrypoint
The DATABASE_URL is automatically provided by Replit's PostgreSQL integration.
AI Processing uses Replit AI Integrations (no API key required).
Email Sending (optional):
EMAIL_PROVIDER:resend|brevo|smtpEMAIL_FROM: Sender email addressRESEND_API_KEY: For Resend providerBREVO_API_KEY: For Brevo provider
Telegram Alerts (optional):
TELEGRAM_BOT_TOKEN: Your Telegram bot token- Users provide their
telegram_chat_id(Pro tier only)
Internal Runner (for external schedulers):
INTERNAL_RUNNER_TOKEN: Secret token to authenticate worker trigger requests
python src/main.py --mode apiThe API runs on port 5000.
python src/main.py --mode ingestpython src/main.py --mode aipython src/main.py --mode riskpython src/main.py --mode alertsFor continuous alerting (every 10 minutes):
ALERTS_LOOP=true python src/main.py --mode alertspython src/main.py --mode digestSends daily risk digests to Trader and Pro subscribers.
Scheduling suggestion (Amsterdam time): Run digest once daily at 07:30 Amsterdam time (05:30 UTC in winter, 06:30 UTC in summer).
python src/main.py --mode ingest
python src/main.py --mode ai
python src/main.py --mode risk
python src/main.py --mode alertsGET /health- Health checkGET /events- Query events with filtersGET /events/latest- Get 20 most recent eventsGET /events/{id}- Get full event with AI analysisGET /ai/stats- AI processing statisticsGET /ingestion-runs- View ingestion history
GET /risk/summary- Current risk summary for a regionGET /risk/regions- Latest risk indices for all regionsGET /risk/regions/{region}- Historical risk data for a regionGET /risk/assets- Asset-level risk by regionGET /risk/events- View scored risk events
POST /alerts/test- Create test user and preview alertsPOST /alerts/send-test-email- Send a test email to verify Brevo configurationGET /alerts/user/{user_id}- View user's alert history
POST /digest/preview- Preview daily digest without sending
GET /marketing/samples- Sample alert messagesGET /marketing/landing-copy- Landing page copy blocks
These endpoints trigger worker jobs and are protected by INTERNAL_RUNNER_TOKEN.
POST /internal/run/ingest- Run RSS ingestionPOST /internal/run/ai- Run AI processingPOST /internal/run/risk- Run risk scoringPOST /internal/run/alerts- Run alerts enginePOST /internal/run/digest- Run daily digest
Authentication: Requires header X-Runner-Token: <your-token>
Example curl commands:
# Run ingestion
curl -X POST https://your-app.replit.app/internal/run/ingest \
-H "X-Runner-Token: your-secret-token"
# Run AI processing
curl -X POST https://your-app.replit.app/internal/run/ai \
-H "X-Runner-Token: your-secret-token"
# Run risk scoring
curl -X POST https://your-app.replit.app/internal/run/risk \
-H "X-Runner-Token: your-secret-token"
# Run alerts
curl -X POST https://your-app.replit.app/internal/run/alerts \
-H "X-Runner-Token: your-secret-token"
# Run daily digest
curl -X POST https://your-app.replit.app/internal/run/digest \
-H "X-Runner-Token: your-secret-token"# Send a test email via Brevo
curl -X POST https://your-app.replit.app/alerts/send-test-email \
-H "Content-Type: application/json" \
-d '{"email":"your-email@example.com"}'Response format:
{
"status": "ok",
"job": "ingest",
"started_at": "2026-01-08T15:14:52.005173",
"finished_at": "2026-01-08T15:14:54.665580",
"duration_seconds": 2.66,
"details": {}
}Locking: Uses PostgreSQL advisory locks to prevent concurrent runs of the same job. If a job is already running, returns 409 {"status": "busy"}.
Triggers when Europe risk score:
- Crosses threshold (default 70)
- Increases by ≥20% vs previous snapshot
Triggers when asset risk score crosses threshold (default 70). Assets: oil, gas, fx, freight
Triggers for new events with:
- Severity ≥ 4
- Category: energy or geopolitical
- Region: Europe, Middle East, or Black Sea
Risk scores are relative measures from 0-100:
- 0-25: Low relative risk
- 26-50: Moderate risk
- 51-75: Elevated risk
- 76-100: High risk (near recent maximum)
Event-Level Weighted Score:
weighted_score = base_severity × ai_confidence × category_weight × recency_decay
Rolling Normalization: Scores are normalized using a rolling 90-day maximum.
For production, schedule the alerts engine to run every 10 minutes:
Option 1: Replit Scheduled Tasks
Set up a scheduled task in Replit to run python src/main.py --mode alerts
Option 2: Loop Mode
ALERTS_LOOP=true ALERTS_LOOP_INTERVAL=600 python src/main.py --mode alertsEnergyRiskIQ provides informational risk indicators only. This is not investment advice. Always conduct your own research and consult qualified professionals before making trading or business decisions.
- events: News events with classification and AI enrichment
- ingestion_runs: Ingestion run history
- risk_events: Per-event normalized risk contributions
- risk_indices: Rolling aggregate risk scores by region
- asset_risk: Asset-specific risk signals
- users: User accounts (email, telegram_chat_id)
- user_plans: Subscription tier and limits
- user_alert_prefs: Per-user alert preferences
- alerts: Alert history with status
- alert_state: State for deduplication and trend comparison
- Added monetizable alerts with Free/Trader/Pro tiers
- Regional risk spike, asset risk, and high-impact event alerts
- Email and Telegram delivery channels
- Cooldown and max alerts/day anti-spam protections
- Marketing endpoints with sample alerts and landing copy
- Rolling regional risk indices (7d, 30d)
- Asset-level risk tracking (oil, gas, fx, freight)
- AI-generated summaries and market impact analysis
- RSS ingestion from 3 feeds
- Keyword-based classification