Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 9 additions & 29 deletions backend/ml/churnModel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import math
import random
from typing import Dict, List, Optional
from typing import Dict, List

class ChurnPredictionModel:
def __init__(self):
Expand All @@ -13,35 +12,10 @@ def __init__(self):
"price_sensitivity": 0.1
}

def _extract_features(self, user_data: Dict) -> Dict:
"""
Extract normalized features from raw user data.
"""
features = {}
# Normalize payment failures (0 to 1)
features["payment_failures"] = min(user_data.get("recent_payment_failures", 0) / 3.0, 1.0)

# Normalize login frequency drop (e.g., 50% drop -> 0.5)
baseline_logins = max(user_data.get("baseline_logins_per_month", 1), 1)
recent_logins = user_data.get("recent_logins", baseline_logins)
drop = max(0, (baseline_logins - recent_logins) / baseline_logins)
features["login_frequency_drop"] = drop

# Normalize support tickets
features["support_tickets"] = min(user_data.get("open_support_tickets", 0) / 2.0, 1.0)

# Add random noise for simulation
features["app_crashes"] = random.uniform(0, 0.2)
features["price_sensitivity"] = user_data.get("price_sensitivity_index", 0.5)

return features

def predict_churn(self, subscriber_address: str, user_data: Dict) -> Dict:
def predict_churn(self, subscriber_address: str, features: Dict) -> Dict:
"""
Predict churn probability and return risk scoring.
"""
features = self._extract_features(user_data)

# Calculate risk score (0.0 to 1.0)
risk_score = 0.0
for feature, value in features.items():
Expand Down Expand Up @@ -123,5 +97,11 @@ def forecast(self, observations: List[Dict], horizon: int = 3) -> List[Dict]:
"open_support_tickets": 1,
"price_sensitivity_index": 0.8
}
prediction = model.predict_churn("0xDEF456", test_data)
prediction = model.predict_churn("0xDEF456", {
"payment_failures": 0.67,
"login_frequency_drop": 0.75,
"support_tickets": 0.5,
"app_crashes": 0.0,
"price_sensitivity": test_data["price_sensitivity_index"],
})
print(f"Churn Prediction: {prediction}")
63 changes: 24 additions & 39 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
# Local development services for SubTrackr backend.
#
# Usage:
# docker compose up -d
# npm run server:start
#
# Environment (optional .env):
# DB_HOST=localhost DB_PORT=5432 DB_NAME=subtrackr DB_USER=postgres DB_PASSWORD=postgres
# REDIS_HOST=localhost REDIS_PORT=6379

services:
redis:
feature-store:
image: redis:7-alpine
container_name: subtrackr-redis
command: ["redis-server", "--appendonly", "yes"]
ports:
- '${REDIS_PORT:-6379}:6379'
command: ['redis-server', '--save', '', '--appendonly', 'no']
healthcheck:
test: ['CMD', 'redis-cli', 'ping']
interval: 5s
timeout: 3s
retries: 5
volumes:
- redis-data:/data
- "6379:6379"

postgres:
image: postgres:16-alpine
container_name: subtrackr-postgres
ports:
- '${DB_PORT:-5432}:5432'
feature-pipeline:
build:
context: .
dockerfile: services/feature-pipeline/Dockerfile
environment:
POSTGRES_DB: ${DB_NAME:-subtrackr}
POSTGRES_USER: ${DB_USER:-postgres}
POSTGRES_PASSWORD: ${DB_PASSWORD:-postgres}
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U ${DB_USER:-postgres} -d ${DB_NAME:-subtrackr}']
interval: 5s
timeout: 3s
retries: 5
volumes:
- postgres-data:/var/lib/postgresql/data
FEATURE_STORE_URL: redis://feature-store:6379/0
FEATURE_TTL_SECONDS: "7200"
ports:
- "8010:8010"
depends_on:
- feature-store

volumes:
redis-data:
postgres-data:
ml-service:
build:
context: .
dockerfile: ml-service/Dockerfile
environment:
FEATURE_STORE_URL: redis://feature-store:6379/0
FEATURE_PIPELINE_PATH: /app/services/feature-pipeline
ports:
- "8000:8000"
depends_on:
- feature-store
9 changes: 6 additions & 3 deletions ml-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
COPY ml-service/requirements.txt ./requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Include the existing ML models from the backend
COPY ../backend/ml ./backend/ml
COPY . .
COPY backend/ml ./backend/ml
COPY services/feature-pipeline ./services/feature-pipeline
COPY ml-service .

ENV ML_SERVICE_URL=http://localhost:8000
ENV FEATURE_PIPELINE_PATH=/app/services/feature-pipeline
ENV FEATURE_STORE_URL=redis://feature-store:6379/0

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
6 changes: 6 additions & 0 deletions ml-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ Restart the service after retraining to pick up the new version.
| Variable | Default | Description |
|---|---|---|
| `ML_SERVICE_URL` | `http://localhost:8000` | Used by the TS backend to reach this service |
| `FEATURE_STORE_URL` | `redis://localhost:6379/0` | Redis-compatible feature store used before online fallback |
| `FEATURE_PIPELINE_PATH` | `../services/feature-pipeline` | Local path for versioned feature transformations |

Churn inference reads versioned feature vectors from the feature store. On cache
miss or store outage, it computes the same transformation online and attempts a
best-effort store write.

## Key endpoints

Expand Down
134 changes: 134 additions & 0 deletions ml-service/feature_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, Optional

try:
import redis
except ImportError: # pragma: no cover
redis = None


FEATURE_PIPELINE_PATH = os.getenv(
"FEATURE_PIPELINE_PATH",
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "services", "feature-pipeline")),
)
if FEATURE_PIPELINE_PATH not in sys.path:
sys.path.insert(0, FEATURE_PIPELINE_PATH)

from features.churn import FEATURE_SET_NAME, compute_features, drift_report, feature_set_hash


class FeatureStoreUnavailable(RuntimeError):
pass


@dataclass
class FeatureResult:
features: Dict[str, float]
feature_set: str
feature_set_hash: str
source: str
store_available: bool
computed_at: str
drift: Dict


class FeatureStoreClient:
def __init__(self):
self.url = os.getenv("FEATURE_STORE_URL", "redis://localhost:6379/0")
self.ttl_seconds = int(os.getenv("FEATURE_TTL_SECONDS", "7200"))
self._client = None

@property
def client(self):
if redis is None:
raise FeatureStoreUnavailable("redis package is not installed")
if self._client is None:
self._client = redis.Redis.from_url(self.url, decode_responses=True)
return self._client

@staticmethod
def key(feature_set: str, entity_id: str, transform_hash: str) -> str:
return f"features:{feature_set}:{transform_hash}:{entity_id}"

def get(self, feature_set: str, entity_id: str, transform_hash: str) -> Optional[Dict]:
try:
raw = self.client.get(self.key(feature_set, entity_id, transform_hash))
except Exception as exc:
raise FeatureStoreUnavailable(str(exc)) from exc
return json.loads(raw) if raw else None

def set(self, feature_set: str, entity_id: str, transform_hash: str, features: Dict[str, float]) -> None:
payload = {
"feature_set": feature_set,
"entity_id": entity_id,
"transform_hash": transform_hash,
"features": features,
"computed_at": datetime.now(timezone.utc).isoformat(),
}
try:
self.client.setex(self.key(feature_set, entity_id, transform_hash), self.ttl_seconds, json.dumps(payload))
except Exception as exc:
raise FeatureStoreUnavailable(str(exc)) from exc


class ChurnFeatureProvider:
def __init__(self, store: FeatureStoreClient | None = None):
self.store = store or FeatureStoreClient()

def get_or_compute(self, subscriber: str, user_data: Dict) -> FeatureResult:
transform_hash = feature_set_hash()
try:
cached = self.store.get(FEATURE_SET_NAME, subscriber, transform_hash)
if cached:
features = cached["features"]
return FeatureResult(
features=features,
feature_set=FEATURE_SET_NAME,
feature_set_hash=transform_hash,
source="feature_store",
store_available=True,
computed_at=cached["computed_at"],
drift=drift_report([features]),
)
except FeatureStoreUnavailable:
features = compute_features(user_data)
store_available = False
source = "online_store_unavailable"
try:
self.store.set(FEATURE_SET_NAME, subscriber, transform_hash, features)
store_available = True
source = "online_store_recovered"
except FeatureStoreUnavailable:
store_available = False
return FeatureResult(
features=features,
feature_set=FEATURE_SET_NAME,
feature_set_hash=transform_hash,
source=source,
store_available=store_available,
computed_at=datetime.now(timezone.utc).isoformat(),
drift=drift_report([features]),
)

features = compute_features(user_data)
store_available = True
source = "online_cache_miss"
try:
self.store.set(FEATURE_SET_NAME, subscriber, transform_hash, features)
except FeatureStoreUnavailable:
store_available = False
source = "online_store_unavailable"

return FeatureResult(
features=features,
feature_set=FEATURE_SET_NAME,
feature_set_hash=transform_hash,
source=source,
store_available=store_available,
computed_at=datetime.now(timezone.utc).isoformat(),
drift=drift_report([features]),
)
1 change: 1 addition & 0 deletions ml-service/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fastapi==0.115.5
uvicorn[standard]==0.32.1
pydantic==2.10.3
redis==5.2.1
34 changes: 29 additions & 5 deletions ml-service/routers/churn.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
from typing import List

from feature_client import ChurnFeatureProvider

router = APIRouter(tags=["churn"])
feature_provider = ChurnFeatureProvider()


class UserChurnData(BaseModel):
recent_payment_failures: int = 0
baseline_logins_per_month: int = 10
recent_logins: int = 10
open_support_tickets: int = 0
app_crashes: int = 0
price_sensitivity_index: float = Field(0.5, ge=0.0, le=1.0)


Expand Down Expand Up @@ -38,9 +42,19 @@ def predict_churn(req: ChurnRequest):
try:
model = registry.get("churn")
meta = registry.meta("churn")
result = model.predict_churn(req.subscriber, req.user_data.model_dump())
feature_result = feature_provider.get_or_compute(req.subscriber, req.user_data.model_dump())
result = model.predict_churn(req.subscriber, feature_result.features)
meta.record_prediction()
return {"model_version": meta.version, **result}
return {
"model_version": meta.version,
"feature_set": feature_result.feature_set,
"feature_set_hash": feature_result.feature_set_hash,
"feature_source": feature_result.source,
"feature_store_available": feature_result.store_available,
"feature_computed_at": feature_result.computed_at,
"feature_drift": feature_result.drift,
**result,
}
except Exception as e:
registry.meta("churn").record_error()
raise HTTPException(status_code=500, detail=str(e))
Expand All @@ -54,9 +68,19 @@ def predict_churn_batch(req: BatchChurnRequest):
meta = registry.meta("churn")
for item in req.items:
try:
result = model.predict_churn(item.subscriber, item.user_data.model_dump())
feature_result = feature_provider.get_or_compute(item.subscriber, item.user_data.model_dump())
result = model.predict_churn(item.subscriber, feature_result.features)
meta.record_prediction()
results.append({"ok": True, **result})
results.append({
"ok": True,
"feature_set": feature_result.feature_set,
"feature_set_hash": feature_result.feature_set_hash,
"feature_source": feature_result.source,
"feature_store_available": feature_result.store_available,
"feature_computed_at": feature_result.computed_at,
"feature_drift": feature_result.drift,
**result,
})
except Exception as e:
meta.record_error()
results.append({"ok": False, "subscriber": item.subscriber, "error": str(e)})
Expand Down
13 changes: 13 additions & 0 deletions services/feature-pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.12-slim

WORKDIR /app

COPY services/feature-pipeline/requirements.txt ./requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY services/feature-pipeline .

ENV FEATURE_STORE_URL=redis://feature-store:6379/0

EXPOSE 8010
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8010"]
Loading