From fa7f31c6a2114178dccdce4c933510e92f90ce74 Mon Sep 17 00:00:00 2001 From: Shantanu Mane Date: Tue, 16 Jun 2026 22:36:23 +0530 Subject: [PATCH] chore(deploy): move compose to root and add worker health sentinel - Move docker-compose stack and observability overlay to project root - Remove obsolete deploy/docker/docker-compose.observability.yml - Write /tmp/worker_healthy sentinel for the worker container healthcheck - Update .env.example docs (compose hosts, storage providers, OTel TLS) - Add project CLAUDE.md --- .env.example | 31 ++-- CLAUDE.md | 97 +++++++++++++ ...ty.yml => docker-compose.observability.yml | 61 +++++--- docker-compose.yml | 133 ++++++++++++++++++ worker/consumer/consumer.py | 9 ++ 5 files changed, 299 insertions(+), 32 deletions(-) create mode 100644 CLAUDE.md rename deploy/docker/docker-compose.observability.yml => docker-compose.observability.yml (81%) create mode 100644 docker-compose.yml diff --git a/.env.example b/.env.example index f745b30..5ce377b 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,8 @@ HOST=0.0.0.0 PORT=5010 # ─── Database ───────────────────────────────────────────────────────────────── +# Docker Compose: DB_HOST=postgres +# Local dev (no compose): DB_HOST=localhost DB_HOST=localhost DB_PORT=5432 DB_USER=mpiper @@ -23,6 +25,8 @@ DB_MAX_RETRIES=5 DB_RETRY_DELAY=5 # ─── Redis ──────────────────────────────────────────────────────────────────── +# Docker Compose: REDIS_CONNECTION_STRING=redis://redis:6379/0 +# Local dev (no compose): REDIS_CONNECTION_STRING=redis://localhost:6379/0 REDIS_CONNECTION_STRING=redis://localhost:6379/0 # Connection pool (Python worker) @@ -38,20 +42,29 @@ REDIS_WRITE_TIMEOUT=10 # AES-256 key — must be exactly 32 characters ENCRYPTION_KEY=LKyGslR3InLES/EYQiJZcW06KFNMoevUd6kehjtrxPA= -# ─── Storage / GCS ──────────────────────────────────────────────────────────── +# ─── Storage ────────────────────────────────────────────────────────────────── +# Set BUCKET_PROVIDER to "gcs" or "s3" BUCKET_PROVIDER=gcs + +# Bucket name — required for both providers BUCKET_NAME=my-media-bucket -BUCKET_REGION=us-east-1 -BUCKET_ACCESS_KEY= -BUCKET_SECRET_KEY= -BUCKET_ENDPOINT_URL= -# Path to GCS service-account JSON key file -GCS_SA_PATH=/path/to/service-account.json -BUCKET_SA_PATH=/path/to/service-account.json + +# GCS (when BUCKET_PROVIDER=gcs) +# Path to the GCS service-account JSON key file. The Go API reads GCS_SA_PATH; +# the Python worker reads BUCKET_SA_PATH — keep both pointing at the same file. +GCS_SA_PATH=.secrets/service-account.json +BUCKET_SA_PATH=.secrets/service-account.json + +# S3 / S3-compatible (when BUCKET_PROVIDER=s3) — not yet implemented (sub-project 4) +# BUCKET_REGION=us-east-1 +# BUCKET_ACCESS_KEY= +# BUCKET_SECRET_KEY= +# BUCKET_ENDPOINT_URL= # optional — for MinIO or S3-compatible endpoints # ─── OpenTelemetry ──────────────────────────────────────────────────────────── OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317 -# Set to "true" for local/dev collectors that don't use TLS +# Defaults to plaintext gRPC (true) when unset — matches the bundled collector. +# Set to "false" only if your collector endpoint terminates real TLS. OTEL_TLS_INSECURE=true DEPLOYMENT_ENV=development TRACE_SAMPLING_RATE=0.1 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..6a74ad9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,97 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Commands + +```bash +# Dev server (hot-reload via air) +task run + +# Dev server (go run, no hot-reload) +task dev # ENV=development, LOG_LEVEL=DEBUG +task staging # ENV=staging, LOG_LEVEL=INFO +task prod # ENV=production, LOG_LEVEL=WARN + +# Run tests +task test # uses gotestsum +task test -- ./internal/... # run specific package +task test-coverage # generates coverage.html + +# Lint / format +task lint # golangci-lint +task fmt # go fmt + goimports + +# Build +task build # outputs build/mpiper.exe +task build-prod # ENV=production + +# Python worker (from project root) +poetry run python -m worker + +# Python tests +poetry run pytest worker/tests/ + +# Docker +task docker-build && task docker-run # API +task docker-build-worker && task docker-run-worker # Worker +``` + +Env files: `development` → `.env.local`, `staging` → `.env.staging`, `production` → `.env`. + +`ENV`, `DB_USER`, `DB_PASSWORD`, `DB_NAME`, `REDIS_CONNECTION_STRING`, and `ENCRYPTION_KEY` (exactly 32 bytes) are required — the config will panic without them. + +## Architecture + +Two-service pipeline: **Go API server** + **Python media worker**, communicating via **Redis Streams** (`media:jobs` stream). Postgres is the durable source of truth; Redis is transport-only. + +### Go API server (`cmd/server`, `internal/`) + +Entry point: `cmd/server/main.go` + +1. `config.InitializeConfig` → `config.Init` — loads env file for the current `Env` build variable, stores singleton (`config.MustGet()` available everywhere after startup). +2. `pkg/logger.New` — builds a `*zap.Logger` with optional OTel log export. +3. `metrics.InitTracer` / `metrics.InitMetrics` — wires up OTel tracing + metrics exporters. +4. `database.NewPostgresDB` — `sqlx.DB` pool; if `AUTO_MIGRATE=true` runs embedded SQL migrations on startup. +5. `server.NewServer` → `server.Start` — Chi router with middleware stack: request-ID, logger, tracing, metrics, recovery, slow-request detector, CORS, auth. + +Layer layout inside `internal/`: +- `handler/` — HTTP handlers, read request → call service → write response via `pkg/utils/response` +- `service/` — business logic (`AssetService`); coordinates repo + queue + storage +- `repository/` — SQL queries via sqlx (`AssetRepository`) +- `router/` — Chi route registration; mounts handlers onto the router returned to `server/` +- `models/` — request/response structs (`UploadAssetRequest`, `UploadAssetResponse`); not DB models +- `queue/` — `RedisQueue.Enqueue` writes to the stream with OTel tracing + retry +- `metrics/` — OTel metric instruments (counters, histograms); `internal/metrics/metrics.go` defines all instruments, `otel.go` handles provider init/shutdown + +### Python worker (`worker/`) + +Entry: `worker/__main__.py` → `consumer/main.py` + +- `Consumer` (Redis Streams, consumer group) polls with `xreadgroup`, processes one message at a time. +- Message contains either `job_id` or `asset_id`. `job_id` is canonical; `asset_id` triggers an upsert into the `jobs` table first. +- `_handle_job` takes a `SELECT … FOR UPDATE` lock, marks the row `in_progress`, calls `process_asset_dispatch`, then marks `done` + acks the stream message. On failure it re-queues (up to `MAX_JOB_ATTEMPTS`). +- `_recover_stuck_pending` re-adds `pending/in_progress` jobs older than 2 min back to the stream (recovery path, called when no messages available). +- `worker/processing/processor.py` — `process_asset_dispatch` routes by asset type to `images.py` or `videos.py`. +- `worker/storage/` — `StorageX` ABC; `GCSStorage` is the concrete impl. +- `worker/utils/metrics.py` — Prometheus metrics via `prometheus_client`. + +### Shared concerns + +**Config singleton (Go):** `internal/config.MustGet()` — call only after `config.Init(cfg)` in `main`. Do not pass `*EnvConfig` via function params; use the singleton. + +**Logger (Go):** `pkg/logger` wraps zap. Request-scoped logger lives in `context`; retrieve with `applogger.FromContext(ctx)` or `middleware.LoggerFromContext(ctx)`. Base logger is constructed once in `main` and passed to subsystems. + +**Error types (Go):** `pkg/errors` has typed API errors (`NotFoundError`, `BadRequestError`, `UnauthorizedError`, `ConflictError`, `InternalServerErrorError`) each embedding `*ApiError` (carries `StatusCode`). Handler layer type-asserts on these to set HTTP status. Use `fmt.Errorf("op: %w", err)` for internal wrapping; use `errors.New*` constructors (e.g. `errors.NewNotFoundError`) at the service/handler boundary. + +**Storage (`pkg/utils/storagex`):** `StorageX` interface with `PutObject`, `GetObject`, `GeneratePresignedURL`, `PublicURL`, `DeleteObject`. Current impl: `GCSStorage`. S3/MinIO provider types exist in config but are not yet implemented. + +**OTel:** Full tracing + metrics on the API side. Go instruments are in `internal/metrics/metrics.go`. Collector config at `observability/otel-collector.yml`; Grafana/Loki/Tempo/Prometheus configs in `observability/`. Python side uses `prometheus_client` (not OTel). + +### Database schema + +- `assets` — core media record; `status` enum: `uploading → uploaded → processing → ready / failed` +- `variants.image` — deduplicated by `variant_hash` (content+params hash); immutable once written +- `jobs` — processing job per asset; `status` enum: `pending → in_progress → done / failed`; `attempts` tracked for retry cap + +Migrations are plain SQL in `db/migrations/`. The Go server can auto-run them at startup (`AUTO_MIGRATE=true`); the Python worker also runs them via `worker/consumer/migrations.py`. diff --git a/deploy/docker/docker-compose.observability.yml b/docker-compose.observability.yml similarity index 81% rename from deploy/docker/docker-compose.observability.yml rename to docker-compose.observability.yml index c04229b..65d4760 100644 --- a/deploy/docker/docker-compose.observability.yml +++ b/docker-compose.observability.yml @@ -1,10 +1,17 @@ # ============================================================================ -# Complete Observability Stack for MPiper +# MPiper — Observability overlay (opt-in) +# +# docker compose -f docker-compose.yml -f docker-compose.observability.yml up +# # - Grafana: Visualization and dashboards # - Tempo: Distributed tracing backend # - Prometheus: Metrics collection and storage # - Loki: Log aggregation # - OTEL Collector: Centralized telemetry collection +# +# `otel-collector` joins the core `mpiper_net` (external) so the api and worker +# can reach it at `otel-collector:4317`. The remaining observability services +# stay isolated on `mpiper_obs_net`. # ============================================================================ name: mpiper-observability-stack @@ -18,7 +25,7 @@ services: container_name: mpiper-tempo command: ["-config.file=/etc/tempo.yaml"] volumes: - - ../../observability/tempo.yml:/etc/tempo.yaml + - ./observability/tempo.yml:/etc/tempo.yaml - tempo-data:/var/tempo ports: - "3200:3200" # Tempo HTTP @@ -27,7 +34,7 @@ services: - "9411:9411" # Zipkin receiver - "14268:14268" # Jaeger ingest networks: - - mpiper-observability + - mpiper_obs_net restart: unless-stopped # ========================================================================== @@ -41,35 +48,35 @@ services: - GF_AUTH_ANONYMOUS_ENABLED=true - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin - GF_AUTH_DISABLE_LOGIN_FORM=false - + # Default admin credentials (CHANGE IN PRODUCTION!) - GF_SECURITY_ADMIN_USER=admin - GF_SECURITY_ADMIN_PASSWORD=admin - + # Enable explore by default - GF_EXPLORE_ENABLED=true - + # Server settings - GF_SERVER_HTTP_PORT=3000 - GF_SERVER_DOMAIN=localhost - + # Feature toggles - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor,traceqlSearch - + # Log level - GF_LOG_LEVEL=info - + volumes: # Pre-configured data sources - - ../../observability/grafana/datasources:/etc/grafana/provisioning/datasources + - ./observability/grafana/datasources:/etc/grafana/provisioning/datasources # Pre-configured dashboards - - ../../observability/grafana/dashboards:/etc/grafana/provisioning/dashboards + - ./observability/grafana/dashboards:/etc/grafana/provisioning/dashboards # Persistent storage for user preferences - grafana-data:/var/lib/grafana ports: - "3000:3000" networks: - - mpiper-observability + - mpiper_obs_net depends_on: - tempo - prometheus @@ -96,12 +103,12 @@ services: - '--web.console.templates=/usr/share/prometheus/consoles' - '--web.enable-lifecycle' volumes: - - ../../observability/prometheus.yml:/etc/prometheus/prometheus.yml + - ./observability/prometheus.yml:/etc/prometheus/prometheus.yml - prometheus-data:/prometheus ports: - "9090:9090" networks: - - mpiper-observability + - mpiper_obs_net restart: unless-stopped healthcheck: test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1"] @@ -118,12 +125,12 @@ services: container_name: mpiper-loki command: -config.file=/etc/loki/loki.yaml volumes: - - ../../observability/loki.yml:/etc/loki/loki.yaml + - ./observability/loki.yml:/etc/loki/loki.yaml - loki-data:/loki ports: - "3100:3100" networks: - - mpiper-observability + - mpiper_obs_net restart: unless-stopped # ========================================================================== @@ -134,31 +141,33 @@ services: container_name: mpiper-promtail command: -config.file=/etc/promtail/promtail.yaml volumes: - - ../../observability/promtail.yml:/etc/promtail/promtail.yaml + - ./observability/promtail.yml:/etc/promtail/promtail.yaml - /var/log:/var/log:ro - /var/lib/docker/containers:/var/lib/docker/containers:ro - /var/run/docker.sock:/var/run/docker.sock:ro networks: - - mpiper-observability + - mpiper_obs_net depends_on: - loki restart: unless-stopped # ========================================================================== # OpenTelemetry Collector - Centralized Telemetry Pipeline + # Bridges mpiper_net (reachable by api/worker at otel-collector:4317) + # and mpiper_obs_net (forwards to tempo/prometheus/loki). # ========================================================================== otel-collector: image: otel/opentelemetry-collector-contrib:latest container_name: mpiper-otel-collector command: ["--config=/etc/otel-collector.yaml"] env_file: - - ../../.env.local + - .env.local environment: DEPLOYMENT_ENV: local SERVICE_VERSION: dev K8S_CLUSTER_NAME: docker-compose volumes: - - ../../observability/otel-collector.yml:/etc/otel-collector.yaml + - ./observability/otel-collector.yml:/etc/otel-collector.yaml - /var/run/docker.sock:/var/run/docker.sock:ro ports: - "4319:4317" # OTLP gRPC (host:4319 -> container:4317) @@ -168,7 +177,8 @@ services: - "13133:13133" # Health check - "55679:55679" # zPages extension networks: - - mpiper-observability + - mpiper_net + - mpiper_obs_net depends_on: - tempo - prometheus @@ -185,9 +195,14 @@ services: # NETWORKS # ============================================================================ networks: - mpiper-observability: + # Joined from docker-compose.yml — the core network the api/worker run on. + mpiper_net: + external: true + name: mpiper_net + # Internal observability network — isolated from core services. + mpiper_obs_net: driver: bridge - name: mpiper-observability + name: mpiper_obs_net # ============================================================================ # VOLUMES diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6d0e3a1 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,133 @@ +# ============================================================================ +# MPiper — Core services +# +# docker compose up +# +# Core stack: api, worker, postgres, redis on the `mpiper_net` bridge network. +# Services address each other by service name (DB_HOST=postgres, etc.). +# +# Optional observability overlay (otel-collector + Grafana/Tempo/Prometheus/Loki): +# docker compose -f docker-compose.yml -f docker-compose.observability.yml up +# +# Copy .env.example -> .env.local and fill in required values before running. +# ============================================================================ + +name: mpiper + +services: + # ========================================================================== + # Go API server + # ========================================================================== + api: + build: + context: . + dockerfile: deploy/docker/mpiper.dockerfile + container_name: mpiper-api + env_file: + - .env.local + environment: + # Compose-internal addressing — services reach each other by name. + DB_HOST: postgres + REDIS_CONNECTION_STRING: redis://redis:6379/0 + # Always apply migrations on first run; override in a compose override file. + AUTO_MIGRATE: "true" + ports: + - "5010:5010" + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - mpiper_net + restart: unless-stopped + healthcheck: + test: ["CMD", "/app/mpiper", "--health-check"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 15s + + # ========================================================================== + # Python media worker + # ========================================================================== + worker: + build: + context: . + dockerfile: deploy/docker/worker.dockerfile + container_name: mpiper-worker + env_file: + - .env.local + environment: + # Compose-internal addressing — services reach each other by name. + DB_HOST: postgres + REDIS_CONNECTION_STRING: redis://redis:6379/0 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - mpiper_net + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "test -f /tmp/worker_healthy || exit 1"] + interval: 10s + timeout: 3s + retries: 5 + start_period: 20s + + # ========================================================================== + # Postgres — durable source of truth + # ========================================================================== + postgres: + image: postgres:16-alpine + container_name: mpiper-postgres + # Postgres init reads POSTGRES_* below. These mirror DB_USER/DB_PASSWORD/DB_NAME. + # Values interpolate from a `.env` file or the shell; the defaults match + # .env.example so plain `docker compose up` works out of the box. To override, + # set DB_USER / DB_PASSWORD / DB_NAME in `.env` (compose's default env file). + environment: + POSTGRES_USER: ${DB_USER:-mpiper} + POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme} + POSTGRES_DB: ${DB_NAME:-mpiper} + volumes: + - mpiper_postgres_data:/var/lib/postgresql/data + networks: + - mpiper_net + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $${POSTGRES_USER} -d $${POSTGRES_DB}"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + + # ========================================================================== + # Redis — transport-only (ephemeral, no volume by design) + # ========================================================================== + redis: + image: redis:7-alpine + container_name: mpiper-redis + networks: + - mpiper_net + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + +# ============================================================================ +# NETWORKS +# ============================================================================ +networks: + mpiper_net: + driver: bridge + name: mpiper_net + +# ============================================================================ +# VOLUMES +# ============================================================================ +volumes: + mpiper_postgres_data: # Postgres data — persists across restarts diff --git a/worker/consumer/consumer.py b/worker/consumer/consumer.py index 10a7288..0aecbb9 100644 --- a/worker/consumer/consumer.py +++ b/worker/consumer/consumer.py @@ -93,6 +93,15 @@ def __init__( except ResponseError as exc: logger.debug("consumer group exists or cannot be created: %s", exc) + # Write a health sentinel once the consumer group is initialised. The + # container healthcheck (test -f /tmp/worker_healthy) reads this file. + # Reaching this point means Redis is connected and the group exists. + try: + with open("/tmp/worker_healthy", "w") as fh: + fh.write("ok") + except OSError as exc: + logger.warning("could not write health sentinel: %s", exc) + def consume(self, consumer_name: str) -> bool: """Poll the stream and process a single message.