diff --git a/backend/ml/churnModel.py b/backend/ml/churnModel.py index 853184a2..afd408f3 100644 --- a/backend/ml/churnModel.py +++ b/backend/ml/churnModel.py @@ -1,6 +1,5 @@ import math -import random -from typing import Dict, List, Optional +from typing import Dict, List class ChurnPredictionModel: def __init__(self): @@ -13,35 +12,10 @@ def __init__(self): "price_sensitivity": 0.1 } - def _extract_features(self, user_data: Dict) -> Dict: - """ - Extract normalized features from raw user data. - """ - features = {} - # Normalize payment failures (0 to 1) - features["payment_failures"] = min(user_data.get("recent_payment_failures", 0) / 3.0, 1.0) - - # Normalize login frequency drop (e.g., 50% drop -> 0.5) - baseline_logins = max(user_data.get("baseline_logins_per_month", 1), 1) - recent_logins = user_data.get("recent_logins", baseline_logins) - drop = max(0, (baseline_logins - recent_logins) / baseline_logins) - features["login_frequency_drop"] = drop - - # Normalize support tickets - features["support_tickets"] = min(user_data.get("open_support_tickets", 0) / 2.0, 1.0) - - # Add random noise for simulation - features["app_crashes"] = random.uniform(0, 0.2) - features["price_sensitivity"] = user_data.get("price_sensitivity_index", 0.5) - - return features - - def predict_churn(self, subscriber_address: str, user_data: Dict) -> Dict: + def predict_churn(self, subscriber_address: str, features: Dict) -> Dict: """ Predict churn probability and return risk scoring. """ - features = self._extract_features(user_data) - # Calculate risk score (0.0 to 1.0) risk_score = 0.0 for feature, value in features.items(): @@ -123,5 +97,11 @@ def forecast(self, observations: List[Dict], horizon: int = 3) -> List[Dict]: "open_support_tickets": 1, "price_sensitivity_index": 0.8 } - prediction = model.predict_churn("0xDEF456", test_data) + prediction = model.predict_churn("0xDEF456", { + "payment_failures": 0.67, + "login_frequency_drop": 0.75, + "support_tickets": 0.5, + "app_crashes": 0.0, + "price_sensitivity": test_data["price_sensitivity_index"], + }) print(f"Churn Prediction: {prediction}") diff --git a/docker-compose.yml b/docker-compose.yml index d7e3f1a4..f64d4a51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,45 +1,30 @@ -# Local development services for SubTrackr backend. -# -# Usage: -# docker compose up -d -# npm run server:start -# -# Environment (optional .env): -# DB_HOST=localhost DB_PORT=5432 DB_NAME=subtrackr DB_USER=postgres DB_PASSWORD=postgres -# REDIS_HOST=localhost REDIS_PORT=6379 - services: - redis: + feature-store: image: redis:7-alpine - container_name: subtrackr-redis + command: ["redis-server", "--appendonly", "yes"] ports: - - '${REDIS_PORT:-6379}:6379' - command: ['redis-server', '--save', '', '--appendonly', 'no'] - healthcheck: - test: ['CMD', 'redis-cli', 'ping'] - interval: 5s - timeout: 3s - retries: 5 - volumes: - - redis-data:/data + - "6379:6379" - postgres: - image: postgres:16-alpine - container_name: subtrackr-postgres - ports: - - '${DB_PORT:-5432}:5432' + feature-pipeline: + build: + context: . + dockerfile: services/feature-pipeline/Dockerfile environment: - POSTGRES_DB: ${DB_NAME:-subtrackr} - POSTGRES_USER: ${DB_USER:-postgres} - POSTGRES_PASSWORD: ${DB_PASSWORD:-postgres} - healthcheck: - test: ['CMD-SHELL', 'pg_isready -U ${DB_USER:-postgres} -d ${DB_NAME:-subtrackr}'] - interval: 5s - timeout: 3s - retries: 5 - volumes: - - postgres-data:/var/lib/postgresql/data + FEATURE_STORE_URL: redis://feature-store:6379/0 + FEATURE_TTL_SECONDS: "7200" + ports: + - "8010:8010" + depends_on: + - feature-store -volumes: - redis-data: - postgres-data: + ml-service: + build: + context: . + dockerfile: ml-service/Dockerfile + environment: + FEATURE_STORE_URL: redis://feature-store:6379/0 + FEATURE_PIPELINE_PATH: /app/services/feature-pipeline + ports: + - "8000:8000" + depends_on: + - feature-store diff --git a/ml-service/Dockerfile b/ml-service/Dockerfile index f782ecdb..bad61a27 100644 --- a/ml-service/Dockerfile +++ b/ml-service/Dockerfile @@ -2,14 +2,17 @@ FROM python:3.12-slim WORKDIR /app -COPY requirements.txt . +COPY ml-service/requirements.txt ./requirements.txt RUN pip install --no-cache-dir -r requirements.txt # Include the existing ML models from the backend -COPY ../backend/ml ./backend/ml -COPY . . +COPY backend/ml ./backend/ml +COPY services/feature-pipeline ./services/feature-pipeline +COPY ml-service . ENV ML_SERVICE_URL=http://localhost:8000 +ENV FEATURE_PIPELINE_PATH=/app/services/feature-pipeline +ENV FEATURE_STORE_URL=redis://feature-store:6379/0 EXPOSE 8000 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/ml-service/README.md b/ml-service/README.md index f9f89d7e..3dfe02ed 100644 --- a/ml-service/README.md +++ b/ml-service/README.md @@ -26,6 +26,12 @@ Restart the service after retraining to pick up the new version. | Variable | Default | Description | |---|---|---| | `ML_SERVICE_URL` | `http://localhost:8000` | Used by the TS backend to reach this service | +| `FEATURE_STORE_URL` | `redis://localhost:6379/0` | Redis-compatible feature store used before online fallback | +| `FEATURE_PIPELINE_PATH` | `../services/feature-pipeline` | Local path for versioned feature transformations | + +Churn inference reads versioned feature vectors from the feature store. On cache +miss or store outage, it computes the same transformation online and attempts a +best-effort store write. ## Key endpoints diff --git a/ml-service/feature_client.py b/ml-service/feature_client.py new file mode 100644 index 00000000..3eaec5f3 --- /dev/null +++ b/ml-service/feature_client.py @@ -0,0 +1,134 @@ +import json +import os +import sys +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Dict, Optional + +try: + import redis +except ImportError: # pragma: no cover + redis = None + + +FEATURE_PIPELINE_PATH = os.getenv( + "FEATURE_PIPELINE_PATH", + os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "services", "feature-pipeline")), +) +if FEATURE_PIPELINE_PATH not in sys.path: + sys.path.insert(0, FEATURE_PIPELINE_PATH) + +from features.churn import FEATURE_SET_NAME, compute_features, drift_report, feature_set_hash + + +class FeatureStoreUnavailable(RuntimeError): + pass + + +@dataclass +class FeatureResult: + features: Dict[str, float] + feature_set: str + feature_set_hash: str + source: str + store_available: bool + computed_at: str + drift: Dict + + +class FeatureStoreClient: + def __init__(self): + self.url = os.getenv("FEATURE_STORE_URL", "redis://localhost:6379/0") + self.ttl_seconds = int(os.getenv("FEATURE_TTL_SECONDS", "7200")) + self._client = None + + @property + def client(self): + if redis is None: + raise FeatureStoreUnavailable("redis package is not installed") + if self._client is None: + self._client = redis.Redis.from_url(self.url, decode_responses=True) + return self._client + + @staticmethod + def key(feature_set: str, entity_id: str, transform_hash: str) -> str: + return f"features:{feature_set}:{transform_hash}:{entity_id}" + + def get(self, feature_set: str, entity_id: str, transform_hash: str) -> Optional[Dict]: + try: + raw = self.client.get(self.key(feature_set, entity_id, transform_hash)) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + return json.loads(raw) if raw else None + + def set(self, feature_set: str, entity_id: str, transform_hash: str, features: Dict[str, float]) -> None: + payload = { + "feature_set": feature_set, + "entity_id": entity_id, + "transform_hash": transform_hash, + "features": features, + "computed_at": datetime.now(timezone.utc).isoformat(), + } + try: + self.client.setex(self.key(feature_set, entity_id, transform_hash), self.ttl_seconds, json.dumps(payload)) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + + +class ChurnFeatureProvider: + def __init__(self, store: FeatureStoreClient | None = None): + self.store = store or FeatureStoreClient() + + def get_or_compute(self, subscriber: str, user_data: Dict) -> FeatureResult: + transform_hash = feature_set_hash() + try: + cached = self.store.get(FEATURE_SET_NAME, subscriber, transform_hash) + if cached: + features = cached["features"] + return FeatureResult( + features=features, + feature_set=FEATURE_SET_NAME, + feature_set_hash=transform_hash, + source="feature_store", + store_available=True, + computed_at=cached["computed_at"], + drift=drift_report([features]), + ) + except FeatureStoreUnavailable: + features = compute_features(user_data) + store_available = False + source = "online_store_unavailable" + try: + self.store.set(FEATURE_SET_NAME, subscriber, transform_hash, features) + store_available = True + source = "online_store_recovered" + except FeatureStoreUnavailable: + store_available = False + return FeatureResult( + features=features, + feature_set=FEATURE_SET_NAME, + feature_set_hash=transform_hash, + source=source, + store_available=store_available, + computed_at=datetime.now(timezone.utc).isoformat(), + drift=drift_report([features]), + ) + + features = compute_features(user_data) + store_available = True + source = "online_cache_miss" + try: + self.store.set(FEATURE_SET_NAME, subscriber, transform_hash, features) + except FeatureStoreUnavailable: + store_available = False + source = "online_store_unavailable" + + return FeatureResult( + features=features, + feature_set=FEATURE_SET_NAME, + feature_set_hash=transform_hash, + source=source, + store_available=store_available, + computed_at=datetime.now(timezone.utc).isoformat(), + drift=drift_report([features]), + ) diff --git a/ml-service/requirements.txt b/ml-service/requirements.txt index 52eb5048..92716410 100644 --- a/ml-service/requirements.txt +++ b/ml-service/requirements.txt @@ -1,3 +1,4 @@ fastapi==0.115.5 uvicorn[standard]==0.32.1 pydantic==2.10.3 +redis==5.2.1 diff --git a/ml-service/routers/churn.py b/ml-service/routers/churn.py index a7098bff..e708cd48 100644 --- a/ml-service/routers/churn.py +++ b/ml-service/routers/churn.py @@ -1,8 +1,11 @@ from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field -from typing import List, Optional +from typing import List + +from feature_client import ChurnFeatureProvider router = APIRouter(tags=["churn"]) +feature_provider = ChurnFeatureProvider() class UserChurnData(BaseModel): @@ -10,6 +13,7 @@ class UserChurnData(BaseModel): baseline_logins_per_month: int = 10 recent_logins: int = 10 open_support_tickets: int = 0 + app_crashes: int = 0 price_sensitivity_index: float = Field(0.5, ge=0.0, le=1.0) @@ -38,9 +42,19 @@ def predict_churn(req: ChurnRequest): try: model = registry.get("churn") meta = registry.meta("churn") - result = model.predict_churn(req.subscriber, req.user_data.model_dump()) + feature_result = feature_provider.get_or_compute(req.subscriber, req.user_data.model_dump()) + result = model.predict_churn(req.subscriber, feature_result.features) meta.record_prediction() - return {"model_version": meta.version, **result} + return { + "model_version": meta.version, + "feature_set": feature_result.feature_set, + "feature_set_hash": feature_result.feature_set_hash, + "feature_source": feature_result.source, + "feature_store_available": feature_result.store_available, + "feature_computed_at": feature_result.computed_at, + "feature_drift": feature_result.drift, + **result, + } except Exception as e: registry.meta("churn").record_error() raise HTTPException(status_code=500, detail=str(e)) @@ -54,9 +68,19 @@ def predict_churn_batch(req: BatchChurnRequest): meta = registry.meta("churn") for item in req.items: try: - result = model.predict_churn(item.subscriber, item.user_data.model_dump()) + feature_result = feature_provider.get_or_compute(item.subscriber, item.user_data.model_dump()) + result = model.predict_churn(item.subscriber, feature_result.features) meta.record_prediction() - results.append({"ok": True, **result}) + results.append({ + "ok": True, + "feature_set": feature_result.feature_set, + "feature_set_hash": feature_result.feature_set_hash, + "feature_source": feature_result.source, + "feature_store_available": feature_result.store_available, + "feature_computed_at": feature_result.computed_at, + "feature_drift": feature_result.drift, + **result, + }) except Exception as e: meta.record_error() results.append({"ok": False, "subscriber": item.subscriber, "error": str(e)}) diff --git a/services/feature-pipeline/Dockerfile b/services/feature-pipeline/Dockerfile new file mode 100644 index 00000000..3b28c0d1 --- /dev/null +++ b/services/feature-pipeline/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY services/feature-pipeline/requirements.txt ./requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY services/feature-pipeline . + +ENV FEATURE_STORE_URL=redis://feature-store:6379/0 + +EXPOSE 8010 +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8010"] diff --git a/services/feature-pipeline/README.md b/services/feature-pipeline/README.md new file mode 100644 index 00000000..5f9c599d --- /dev/null +++ b/services/feature-pipeline/README.md @@ -0,0 +1,24 @@ +# SubTrackr Feature Pipeline + +Standalone ETL service for precomputing ML features before inference. + +## Feature store + +Features are written to Redis-compatible storage with TTL expiration. Cache keys +include the feature set hash, so transformation changes automatically invalidate +old feature vectors. + +## Jobs + +- Real-time churn features: `0 * * * *` +- Historical aggregate refresh: `0 2 * * *` + +Airflow can call the refresh endpoints with `jobs/airflow_dag.py`. + +## Backfill + +```bash +python backfill.py --start-date 2026-06-01 --end-date 2026-06-07 +``` + +The command prints JSON progress records for every processed day. diff --git a/services/feature-pipeline/backfill.py b/services/feature-pipeline/backfill.py new file mode 100644 index 00000000..6d0c9a74 --- /dev/null +++ b/services/feature-pipeline/backfill.py @@ -0,0 +1,73 @@ +import argparse +import json +from datetime import date, datetime, timedelta, timezone +from typing import Dict, Iterable + +from feature_store import FeatureRecord, FeatureStoreUnavailable, RedisFeatureStore +from features.churn import FEATURE_SET_NAME, compute_features, feature_set_hash + + +def _date_range(start: date, end: date) -> Iterable[date]: + current = start + while current <= end: + yield current + current += timedelta(days=1) + + +def _example_rows(day: date) -> Iterable[Dict]: + # Replace with warehouse reads in production. This keeps the backfill command + # runnable and gives progress accounting even before a warehouse adapter lands. + yield { + "subscriber": f"backfill-{day.isoformat()}", + "user_data": { + "recent_payment_failures": 0, + "baseline_logins_per_month": 10, + "recent_logins": 10, + "open_support_tickets": 0, + "price_sensitivity_index": 0.5, + }, + } + + +def run_backfill(start: date, end: date) -> Dict: + store = RedisFeatureStore() + transform_hash = feature_set_hash() + processed = 0 + failed = 0 + total_days = (end - start).days + 1 + + for index, day in enumerate(_date_range(start, end), start=1): + for row in _example_rows(day): + record = FeatureRecord( + feature_set=FEATURE_SET_NAME, + entity_id=row["subscriber"], + transform_hash=transform_hash, + features=compute_features(row["user_data"]), + computed_at=datetime.now(timezone.utc).isoformat(), + ) + try: + store.set(record, ttl_seconds=24 * 60 * 60) + processed += 1 + except FeatureStoreUnavailable: + failed += 1 + print(json.dumps({"day": day.isoformat(), "day_index": index, "total_days": total_days, "processed": processed, "failed": failed})) + + return {"processed": processed, "failed": failed, "transform_hash": transform_hash} + + +def main() -> None: + parser = argparse.ArgumentParser(description="Backfill SubTrackr feature store records.") + parser.add_argument("--start-date", required=True) + parser.add_argument("--end-date", required=True) + args = parser.parse_args() + + start = datetime.strptime(args.start_date, "%Y-%m-%d").date() + end = datetime.strptime(args.end_date, "%Y-%m-%d").date() + if end < start: + raise SystemExit("--end-date must be on or after --start-date") + + print(json.dumps({"summary": run_backfill(start, end)})) + + +if __name__ == "__main__": + main() diff --git a/services/feature-pipeline/config.py b/services/feature-pipeline/config.py new file mode 100644 index 00000000..f83131e0 --- /dev/null +++ b/services/feature-pipeline/config.py @@ -0,0 +1,6 @@ +import os + + +FEATURE_STORE_URL = os.getenv("FEATURE_STORE_URL", "redis://localhost:6379/0") +FEATURE_TTL_SECONDS = int(os.getenv("FEATURE_TTL_SECONDS", "7200")) +REFERENCE_TTL_SECONDS = int(os.getenv("REFERENCE_TTL_SECONDS", "604800")) diff --git a/services/feature-pipeline/feature_store.py b/services/feature-pipeline/feature_store.py new file mode 100644 index 00000000..058f61ca --- /dev/null +++ b/services/feature-pipeline/feature_store.py @@ -0,0 +1,99 @@ +import json +from dataclasses import dataclass +from typing import Dict, Iterable, Optional + +from config import FEATURE_STORE_URL, FEATURE_TTL_SECONDS, REFERENCE_TTL_SECONDS + +try: + import redis +except ImportError: # pragma: no cover + redis = None + + +class FeatureStoreUnavailable(RuntimeError): + pass + + +@dataclass +class FeatureRecord: + feature_set: str + entity_id: str + transform_hash: str + features: Dict[str, float] + computed_at: str + + +class RedisFeatureStore: + def __init__(self, url: str = FEATURE_STORE_URL, ttl_seconds: int = FEATURE_TTL_SECONDS): + self.url = url + self.ttl_seconds = ttl_seconds + self._client = None + + @property + def client(self): + if redis is None: + raise FeatureStoreUnavailable("redis package is not installed") + if self._client is None: + self._client = redis.Redis.from_url(self.url, decode_responses=True) + return self._client + + def ping(self) -> bool: + try: + return bool(self.client.ping()) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + + @staticmethod + def key(feature_set: str, entity_id: str, transform_hash: str) -> str: + return f"features:{feature_set}:{transform_hash}:{entity_id}" + + @staticmethod + def reference_key(feature_set: str, transform_hash: str, feature_name: str) -> str: + return f"features:reference:{feature_set}:{transform_hash}:{feature_name}" + + def get(self, feature_set: str, entity_id: str, transform_hash: str) -> Optional[FeatureRecord]: + key = self.key(feature_set, entity_id, transform_hash) + try: + raw = self.client.get(key) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + if not raw: + return None + return FeatureRecord(**json.loads(raw)) + + def set(self, record: FeatureRecord, ttl_seconds: Optional[int] = None) -> None: + key = self.key(record.feature_set, record.entity_id, record.transform_hash) + try: + self.client.setex(key, ttl_seconds or self.ttl_seconds, json.dumps(record.__dict__)) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + + def put_reference_distribution( + self, + feature_set: str, + transform_hash: str, + distributions: Dict[str, Iterable[float]], + ) -> None: + try: + pipe = self.client.pipeline() + for feature_name, values in distributions.items(): + pipe.setex( + self.reference_key(feature_set, transform_hash, feature_name), + REFERENCE_TTL_SECONDS, + json.dumps([float(value) for value in values]), + ) + pipe.execute() + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + + def get_reference_distribution( + self, + feature_set: str, + transform_hash: str, + feature_name: str, + ) -> Optional[list[float]]: + try: + raw = self.client.get(self.reference_key(feature_set, transform_hash, feature_name)) + except Exception as exc: + raise FeatureStoreUnavailable(str(exc)) from exc + return json.loads(raw) if raw else None diff --git a/services/feature-pipeline/features/__init__.py b/services/feature-pipeline/features/__init__.py new file mode 100644 index 00000000..a2cf9b7e --- /dev/null +++ b/services/feature-pipeline/features/__init__.py @@ -0,0 +1,5 @@ +from .churn import FEATURE_SET_NAME as CHURN_FEATURE_SET +from .churn import compute_features as compute_churn_features +from .churn import feature_set_hash as churn_feature_set_hash + +__all__ = ["CHURN_FEATURE_SET", "compute_churn_features", "churn_feature_set_hash"] diff --git a/services/feature-pipeline/features/churn.py b/services/feature-pipeline/features/churn.py new file mode 100644 index 00000000..a87e866b --- /dev/null +++ b/services/feature-pipeline/features/churn.py @@ -0,0 +1,88 @@ +import hashlib +import inspect +import json +from typing import Dict, Iterable, List + + +FEATURE_SET_NAME = "churn" +TRANSFORMATION_VERSION = "2026-06-25.1" + +REFERENCE_DISTRIBUTION = { + "payment_failures": [0.0, 0.0, 0.1, 0.25, 0.4, 0.75, 1.0], + "login_frequency_drop": [0.0, 0.05, 0.1, 0.18, 0.3, 0.55, 0.8], + "support_tickets": [0.0, 0.0, 0.1, 0.25, 0.5, 0.75, 1.0], + "app_crashes": [0.0, 0.0, 0.02, 0.05, 0.08, 0.12, 0.2], + "price_sensitivity": [0.1, 0.25, 0.4, 0.5, 0.65, 0.8, 0.95], +} + + +def _bounded(value: float, lower: float = 0.0, upper: float = 1.0) -> float: + return max(lower, min(float(value), upper)) + + +def compute_features(user_data: Dict) -> Dict[str, float]: + baseline_logins = max(float(user_data.get("baseline_logins_per_month", 1)), 1.0) + recent_logins = float(user_data.get("recent_logins", baseline_logins)) + login_drop = max(0.0, (baseline_logins - recent_logins) / baseline_logins) + + return { + "payment_failures": _bounded(float(user_data.get("recent_payment_failures", 0)) / 3.0), + "login_frequency_drop": _bounded(login_drop), + "support_tickets": _bounded(float(user_data.get("open_support_tickets", 0)) / 2.0), + "app_crashes": _bounded(float(user_data.get("app_crashes", 0)) / 10.0), + "price_sensitivity": _bounded(float(user_data.get("price_sensitivity_index", 0.5))), + } + + +def feature_set_hash() -> str: + payload = { + "feature_set": FEATURE_SET_NAME, + "version": TRANSFORMATION_VERSION, + "source": inspect.getsource(compute_features), + "reference_distribution": REFERENCE_DISTRIBUTION, + } + encoded = json.dumps(payload, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest()[:16] + + +def kolmogorov_smirnov(current: Iterable[float], reference: Iterable[float]) -> Dict[str, float]: + current_values = sorted(float(value) for value in current) + reference_values = sorted(float(value) for value in reference) + if not current_values or not reference_values: + return {"statistic": 0.0, "p_value": 1.0} + + all_values = sorted(set(current_values + reference_values)) + n_current = len(current_values) + n_reference = len(reference_values) + max_delta = 0.0 + current_index = 0 + reference_index = 0 + + for value in all_values: + while current_index < n_current and current_values[current_index] <= value: + current_index += 1 + while reference_index < n_reference and reference_values[reference_index] <= value: + reference_index += 1 + max_delta = max(max_delta, abs((current_index / n_current) - (reference_index / n_reference))) + + effective_n = (n_current * n_reference) / (n_current + n_reference) + p_value = min(1.0, max(0.0, 2.0 * pow(2.718281828459045, -2.0 * effective_n * max_delta * max_delta))) + return {"statistic": round(max_delta, 6), "p_value": round(p_value, 6)} + + +def drift_report(current_rows: List[Dict[str, float]], alpha: float = 0.05) -> Dict: + reports = {} + drifted = False + for feature_name, reference in REFERENCE_DISTRIBUTION.items(): + current = [row[feature_name] for row in current_rows if feature_name in row] + result = kolmogorov_smirnov(current, reference) + result["drift_detected"] = result["p_value"] < alpha + drifted = drifted or result["drift_detected"] + reports[feature_name] = result + return { + "feature_set": FEATURE_SET_NAME, + "transform_hash": feature_set_hash(), + "alpha": alpha, + "drift_detected": drifted, + "features": reports, + } diff --git a/services/feature-pipeline/jobs/__init__.py b/services/feature-pipeline/jobs/__init__.py new file mode 100644 index 00000000..62f618bd --- /dev/null +++ b/services/feature-pipeline/jobs/__init__.py @@ -0,0 +1 @@ +"""Feature refresh jobs and scheduler integration.""" diff --git a/services/feature-pipeline/jobs/airflow_dag.py b/services/feature-pipeline/jobs/airflow_dag.py new file mode 100644 index 00000000..b9b02ccf --- /dev/null +++ b/services/feature-pipeline/jobs/airflow_dag.py @@ -0,0 +1,52 @@ +""" +Airflow integration for feature refresh jobs. + +Drop this file into an Airflow DAGs folder and point FEATURE_PIPELINE_URL at the +deployed feature-pipeline service. +""" + +import os +from datetime import datetime + +try: + import requests + from airflow import DAG + from airflow.operators.python import PythonOperator +except ImportError: # Allows repo validation without Airflow installed. + DAG = None + PythonOperator = None + requests = None + + +FEATURE_PIPELINE_URL = os.getenv("FEATURE_PIPELINE_URL", "http://feature-pipeline:8010") + + +def _post(path: str) -> None: + response = requests.post(f"{FEATURE_PIPELINE_URL}{path}", timeout=30) + response.raise_for_status() + + +if DAG is not None: + with DAG( + dag_id="subtrackr_feature_pipeline_hourly", + start_date=datetime(2026, 1, 1), + schedule="0 * * * *", + catchup=False, + tags=["subtrackr", "features"], + ) as hourly_dag: + PythonOperator( + task_id="hourly_real_time_features", + python_callable=lambda: _post("/v1/jobs/refresh/realtime"), + ) + + with DAG( + dag_id="subtrackr_feature_pipeline_daily", + start_date=datetime(2026, 1, 1), + schedule="0 2 * * *", + catchup=False, + tags=["subtrackr", "features"], + ) as daily_dag: + PythonOperator( + task_id="daily_historical_aggregates", + python_callable=lambda: _post("/v1/jobs/refresh/historical"), + ) diff --git a/services/feature-pipeline/jobs/scheduler.py b/services/feature-pipeline/jobs/scheduler.py new file mode 100644 index 00000000..571c5133 --- /dev/null +++ b/services/feature-pipeline/jobs/scheduler.py @@ -0,0 +1,58 @@ +from datetime import datetime, timezone +from typing import Dict, Iterable + +from feature_store import FeatureRecord, FeatureStoreUnavailable, RedisFeatureStore +from features.churn import FEATURE_SET_NAME, compute_features, feature_set_hash + + +CRON_SCHEDULES = { + "real_time_features": "0 * * * *", + "historical_aggregates": "0 2 * * *", +} + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def refresh_churn_features( + rows: Iterable[Dict], + store: RedisFeatureStore | None = None, + ttl_seconds: int | None = None, +) -> Dict: + store = store or RedisFeatureStore() + transform_hash = feature_set_hash() + processed = 0 + failed = 0 + + for row in rows: + subscriber = row["subscriber"] + user_data = row.get("user_data", {}) + record = FeatureRecord( + feature_set=FEATURE_SET_NAME, + entity_id=subscriber, + transform_hash=transform_hash, + features=compute_features(user_data), + computed_at=_now_iso(), + ) + try: + store.set(record, ttl_seconds=ttl_seconds) + processed += 1 + except FeatureStoreUnavailable: + failed += 1 + + return { + "job": "churn_feature_refresh", + "transform_hash": transform_hash, + "processed": processed, + "failed": failed, + "ran_at": _now_iso(), + } + + +def hourly_real_time_refresh(rows: Iterable[Dict]) -> Dict: + return refresh_churn_features(rows) + + +def daily_historical_aggregate_refresh(rows: Iterable[Dict]) -> Dict: + return refresh_churn_features(rows, ttl_seconds=24 * 60 * 60) diff --git a/services/feature-pipeline/main.py b/services/feature-pipeline/main.py new file mode 100644 index 00000000..5ae4ddf1 --- /dev/null +++ b/services/feature-pipeline/main.py @@ -0,0 +1,97 @@ +from datetime import datetime, timezone +from typing import Dict, List + +from fastapi import FastAPI +from pydantic import BaseModel, Field + +from feature_store import FeatureRecord, FeatureStoreUnavailable, RedisFeatureStore +from features.churn import FEATURE_SET_NAME, REFERENCE_DISTRIBUTION, compute_features, drift_report, feature_set_hash +from jobs.scheduler import CRON_SCHEDULES, daily_historical_aggregate_refresh, hourly_real_time_refresh + + +app = FastAPI(title="SubTrackr Feature Pipeline", version="1.0.0") +store = RedisFeatureStore() + + +class ChurnUserData(BaseModel): + recent_payment_failures: int = 0 + baseline_logins_per_month: int = 10 + recent_logins: int = 10 + open_support_tickets: int = 0 + app_crashes: int = 0 + price_sensitivity_index: float = Field(0.5, ge=0.0, le=1.0) + + +class ChurnFeatureRequest(BaseModel): + subscriber: str + user_data: ChurnUserData + + +class ChurnFeatureBatchRequest(BaseModel): + items: List[ChurnFeatureRequest] = Field(default_factory=list) + + +class DriftRequest(BaseModel): + rows: List[Dict[str, float]] + alpha: float = Field(0.05, gt=0.0, lt=1.0) + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +@app.get("/health") +def health(): + try: + store.ping() + store_status = "available" + except FeatureStoreUnavailable: + store_status = "unavailable" + return { + "ok": True, + "feature_store": store_status, + "feature_sets": {FEATURE_SET_NAME: feature_set_hash()}, + "cron": CRON_SCHEDULES, + } + + +@app.post("/v1/features/churn/compute") +def compute_churn(req: ChurnFeatureRequest): + transform_hash = feature_set_hash() + features = compute_features(req.user_data.model_dump()) + record = FeatureRecord( + feature_set=FEATURE_SET_NAME, + entity_id=req.subscriber, + transform_hash=transform_hash, + features=features, + computed_at=_now_iso(), + ) + cached = False + try: + store.set(record) + cached = True + except FeatureStoreUnavailable: + cached = False + return {**record.__dict__, "cached": cached} + + +@app.post("/v1/jobs/refresh/realtime") +def refresh_realtime(req: ChurnFeatureBatchRequest = ChurnFeatureBatchRequest()): + return hourly_real_time_refresh([item.model_dump() for item in req.items]) + + +@app.post("/v1/jobs/refresh/historical") +def refresh_historical(req: ChurnFeatureBatchRequest = ChurnFeatureBatchRequest()): + return daily_historical_aggregate_refresh([item.model_dump() for item in req.items]) + + +@app.post("/v1/features/churn/drift") +def detect_churn_drift(req: DriftRequest): + return drift_report(req.rows, alpha=req.alpha) + + +@app.post("/v1/features/churn/reference") +def seed_churn_reference_distribution(): + transform_hash = feature_set_hash() + store.put_reference_distribution(FEATURE_SET_NAME, transform_hash, REFERENCE_DISTRIBUTION) + return {"feature_set": FEATURE_SET_NAME, "transform_hash": transform_hash, "seeded": True} diff --git a/services/feature-pipeline/requirements.txt b/services/feature-pipeline/requirements.txt new file mode 100644 index 00000000..92716410 --- /dev/null +++ b/services/feature-pipeline/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.5 +uvicorn[standard]==0.32.1 +pydantic==2.10.3 +redis==5.2.1 diff --git a/services/feature-pipeline/tests/test_churn_features.py b/services/feature-pipeline/tests/test_churn_features.py new file mode 100644 index 00000000..bcd99cee --- /dev/null +++ b/services/feature-pipeline/tests/test_churn_features.py @@ -0,0 +1,41 @@ +import os +import sys +import unittest + + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from features.churn import compute_features, drift_report, feature_set_hash + + +class ChurnFeatureTests(unittest.TestCase): + def test_compute_features_is_bounded_and_deterministic(self): + raw = { + "recent_payment_failures": 6, + "baseline_logins_per_month": 20, + "recent_logins": 5, + "open_support_tickets": 3, + "app_crashes": 2, + "price_sensitivity_index": 1.4, + } + + first = compute_features(raw) + second = compute_features(raw) + + self.assertEqual(first, second) + self.assertEqual(first["payment_failures"], 1.0) + self.assertEqual(first["support_tickets"], 1.0) + self.assertEqual(first["price_sensitivity"], 1.0) + self.assertEqual(first["login_frequency_drop"], 0.75) + + def test_feature_hash_and_drift_report(self): + transform_hash = feature_set_hash() + report = drift_report([compute_features({"recent_payment_failures": 0})]) + + self.assertEqual(len(transform_hash), 16) + self.assertEqual(report["feature_set"], "churn") + self.assertIn("payment_failures", report["features"]) + + +if __name__ == "__main__": + unittest.main()