Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ HOST=0.0.0.0
PORT=5010

# ─── Database ─────────────────────────────────────────────────────────────────
# Docker Compose: DB_HOST=postgres
# Local dev (no compose): DB_HOST=localhost
DB_HOST=localhost
DB_PORT=5432
DB_USER=mpiper
Expand All @@ -23,6 +25,8 @@ DB_MAX_RETRIES=5
DB_RETRY_DELAY=5

# ─── Redis ────────────────────────────────────────────────────────────────────
# Docker Compose: REDIS_CONNECTION_STRING=redis://redis:6379/0
# Local dev (no compose): REDIS_CONNECTION_STRING=redis://localhost:6379/0
REDIS_CONNECTION_STRING=redis://localhost:6379/0

# Connection pool (Python worker)
Expand All @@ -38,20 +42,29 @@ REDIS_WRITE_TIMEOUT=10
# AES-256 key — must be exactly 32 characters
ENCRYPTION_KEY=LKyGslR3InLES/EYQiJZcW06KFNMoevUd6kehjtrxPA=

# ─── Storage / GCS ────────────────────────────────────────────────────────────
# ─── Storage ──────────────────────────────────────────────────────────────────
# Set BUCKET_PROVIDER to "gcs" or "s3"
BUCKET_PROVIDER=gcs

# Bucket name — required for both providers
BUCKET_NAME=my-media-bucket
BUCKET_REGION=us-east-1
BUCKET_ACCESS_KEY=
BUCKET_SECRET_KEY=
BUCKET_ENDPOINT_URL=
# Path to GCS service-account JSON key file
GCS_SA_PATH=/path/to/service-account.json
BUCKET_SA_PATH=/path/to/service-account.json

# GCS (when BUCKET_PROVIDER=gcs)
# Path to the GCS service-account JSON key file. The Go API reads GCS_SA_PATH;
# the Python worker reads BUCKET_SA_PATH — keep both pointing at the same file.
GCS_SA_PATH=.secrets/service-account.json
BUCKET_SA_PATH=.secrets/service-account.json

# S3 / S3-compatible (when BUCKET_PROVIDER=s3) — not yet implemented (sub-project 4)
# BUCKET_REGION=us-east-1
# BUCKET_ACCESS_KEY=
# BUCKET_SECRET_KEY=
# BUCKET_ENDPOINT_URL= # optional — for MinIO or S3-compatible endpoints

# ─── OpenTelemetry ────────────────────────────────────────────────────────────
OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
# Set to "true" for local/dev collectors that don't use TLS
# Defaults to plaintext gRPC (true) when unset — matches the bundled collector.
# Set to "false" only if your collector endpoint terminates real TLS.
OTEL_TLS_INSECURE=true
DEPLOYMENT_ENV=development
TRACE_SAMPLING_RATE=0.1
Expand Down
97 changes: 97 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Commands

```bash
# Dev server (hot-reload via air)
task run

# Dev server (go run, no hot-reload)
task dev # ENV=development, LOG_LEVEL=DEBUG
task staging # ENV=staging, LOG_LEVEL=INFO
task prod # ENV=production, LOG_LEVEL=WARN

# Run tests
task test # uses gotestsum
task test -- ./internal/... # run specific package
task test-coverage # generates coverage.html

# Lint / format
task lint # golangci-lint
task fmt # go fmt + goimports

# Build
task build # outputs build/mpiper.exe
task build-prod # ENV=production

# Python worker (from project root)
poetry run python -m worker

# Python tests
poetry run pytest worker/tests/

# Docker
task docker-build && task docker-run # API
task docker-build-worker && task docker-run-worker # Worker
```

Env files: `development` → `.env.local`, `staging` → `.env.staging`, `production` → `.env`.

`ENV`, `DB_USER`, `DB_PASSWORD`, `DB_NAME`, `REDIS_CONNECTION_STRING`, and `ENCRYPTION_KEY` (exactly 32 bytes) are required — the config will panic without them.

## Architecture

Two-service pipeline: **Go API server** + **Python media worker**, communicating via **Redis Streams** (`media:jobs` stream). Postgres is the durable source of truth; Redis is transport-only.

### Go API server (`cmd/server`, `internal/`)

Entry point: `cmd/server/main.go`

1. `config.InitializeConfig` → `config.Init` — loads env file for the current `Env` build variable, stores singleton (`config.MustGet()` available everywhere after startup).
2. `pkg/logger.New` — builds a `*zap.Logger` with optional OTel log export.
3. `metrics.InitTracer` / `metrics.InitMetrics` — wires up OTel tracing + metrics exporters.
4. `database.NewPostgresDB` — `sqlx.DB` pool; if `AUTO_MIGRATE=true` runs embedded SQL migrations on startup.
5. `server.NewServer` → `server.Start` — Chi router with middleware stack: request-ID, logger, tracing, metrics, recovery, slow-request detector, CORS, auth.

Layer layout inside `internal/`:
- `handler/` — HTTP handlers, read request → call service → write response via `pkg/utils/response`
- `service/` — business logic (`AssetService`); coordinates repo + queue + storage
- `repository/` — SQL queries via sqlx (`AssetRepository`)
- `router/` — Chi route registration; mounts handlers onto the router returned to `server/`
- `models/` — request/response structs (`UploadAssetRequest`, `UploadAssetResponse`); not DB models
- `queue/` — `RedisQueue.Enqueue` writes to the stream with OTel tracing + retry
- `metrics/` — OTel metric instruments (counters, histograms); `internal/metrics/metrics.go` defines all instruments, `otel.go` handles provider init/shutdown

### Python worker (`worker/`)

Entry: `worker/__main__.py` → `consumer/main.py`

- `Consumer` (Redis Streams, consumer group) polls with `xreadgroup`, processes one message at a time.
- Message contains either `job_id` or `asset_id`. `job_id` is canonical; `asset_id` triggers an upsert into the `jobs` table first.
- `_handle_job` takes a `SELECT … FOR UPDATE` lock, marks the row `in_progress`, calls `process_asset_dispatch`, then marks `done` + acks the stream message. On failure it re-queues (up to `MAX_JOB_ATTEMPTS`).
- `_recover_stuck_pending` re-adds `pending/in_progress` jobs older than 2 min back to the stream (recovery path, called when no messages available).
- `worker/processing/processor.py` — `process_asset_dispatch` routes by asset type to `images.py` or `videos.py`.
- `worker/storage/` — `StorageX` ABC; `GCSStorage` is the concrete impl.
- `worker/utils/metrics.py` — Prometheus metrics via `prometheus_client`.

### Shared concerns

**Config singleton (Go):** `internal/config.MustGet()` — call only after `config.Init(cfg)` in `main`. Do not pass `*EnvConfig` via function params; use the singleton.

**Logger (Go):** `pkg/logger` wraps zap. Request-scoped logger lives in `context`; retrieve with `applogger.FromContext(ctx)` or `middleware.LoggerFromContext(ctx)`. Base logger is constructed once in `main` and passed to subsystems.

**Error types (Go):** `pkg/errors` has typed API errors (`NotFoundError`, `BadRequestError`, `UnauthorizedError`, `ConflictError`, `InternalServerErrorError`) each embedding `*ApiError` (carries `StatusCode`). Handler layer type-asserts on these to set HTTP status. Use `fmt.Errorf("op: %w", err)` for internal wrapping; use `errors.New*` constructors (e.g. `errors.NewNotFoundError`) at the service/handler boundary.

**Storage (`pkg/utils/storagex`):** `StorageX` interface with `PutObject`, `GetObject`, `GeneratePresignedURL`, `PublicURL`, `DeleteObject`. Current impl: `GCSStorage`. S3/MinIO provider types exist in config but are not yet implemented.

**OTel:** Full tracing + metrics on the API side. Go instruments are in `internal/metrics/metrics.go`. Collector config at `observability/otel-collector.yml`; Grafana/Loki/Tempo/Prometheus configs in `observability/`. Python side uses `prometheus_client` (not OTel).

### Database schema

- `assets` — core media record; `status` enum: `uploading → uploaded → processing → ready / failed`
- `variants.image` — deduplicated by `variant_hash` (content+params hash); immutable once written
- `jobs` — processing job per asset; `status` enum: `pending → in_progress → done / failed`; `attempts` tracked for retry cap

Migrations are plain SQL in `db/migrations/`. The Go server can auto-run them at startup (`AUTO_MIGRATE=true`); the Python worker also runs them via `worker/consumer/migrations.py`.
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# ============================================================================
# Complete Observability Stack for MPiper
# MPiper — Observability overlay (opt-in)
#
# docker compose -f docker-compose.yml -f docker-compose.observability.yml up
#
# - Grafana: Visualization and dashboards
# - Tempo: Distributed tracing backend
# - Prometheus: Metrics collection and storage
# - Loki: Log aggregation
# - OTEL Collector: Centralized telemetry collection
#
# `otel-collector` joins the core `mpiper_net` (external) so the api and worker
# can reach it at `otel-collector:4317`. The remaining observability services
# stay isolated on `mpiper_obs_net`.
# ============================================================================

name: mpiper-observability-stack
Expand All @@ -18,7 +25,7 @@ services:
container_name: mpiper-tempo
command: ["-config.file=/etc/tempo.yaml"]
volumes:
- ../../observability/tempo.yml:/etc/tempo.yaml
- ./observability/tempo.yml:/etc/tempo.yaml
- tempo-data:/var/tempo
ports:
- "3200:3200" # Tempo HTTP
Expand All @@ -27,7 +34,7 @@ services:
- "9411:9411" # Zipkin receiver
- "14268:14268" # Jaeger ingest
networks:
- mpiper-observability
- mpiper_obs_net
restart: unless-stopped

# ==========================================================================
Expand All @@ -41,35 +48,35 @@ services:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=false

# Default admin credentials (CHANGE IN PRODUCTION!)
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin

# Enable explore by default
- GF_EXPLORE_ENABLED=true

# Server settings
- GF_SERVER_HTTP_PORT=3000
- GF_SERVER_DOMAIN=localhost

# Feature toggles
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor,traceqlSearch

# Log level
- GF_LOG_LEVEL=info

volumes:
# Pre-configured data sources
- ../../observability/grafana/datasources:/etc/grafana/provisioning/datasources
- ./observability/grafana/datasources:/etc/grafana/provisioning/datasources
# Pre-configured dashboards
- ../../observability/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./observability/grafana/dashboards:/etc/grafana/provisioning/dashboards
# Persistent storage for user preferences
- grafana-data:/var/lib/grafana
ports:
- "3000:3000"
networks:
- mpiper-observability
- mpiper_obs_net
depends_on:
- tempo
- prometheus
Expand All @@ -96,12 +103,12 @@ services:
- '--web.console.templates=/usr/share/prometheus/consoles'
- '--web.enable-lifecycle'
volumes:
- ../../observability/prometheus.yml:/etc/prometheus/prometheus.yml
- ./observability/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
networks:
- mpiper-observability
- mpiper_obs_net
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1"]
Expand All @@ -118,12 +125,12 @@ services:
container_name: mpiper-loki
command: -config.file=/etc/loki/loki.yaml
volumes:
- ../../observability/loki.yml:/etc/loki/loki.yaml
- ./observability/loki.yml:/etc/loki/loki.yaml
- loki-data:/loki
ports:
- "3100:3100"
networks:
- mpiper-observability
- mpiper_obs_net
restart: unless-stopped

# ==========================================================================
Expand All @@ -134,31 +141,33 @@ services:
container_name: mpiper-promtail
command: -config.file=/etc/promtail/promtail.yaml
volumes:
- ../../observability/promtail.yml:/etc/promtail/promtail.yaml
- ./observability/promtail.yml:/etc/promtail/promtail.yaml
- /var/log:/var/log:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- mpiper-observability
- mpiper_obs_net
depends_on:
- loki
restart: unless-stopped

# ==========================================================================
# OpenTelemetry Collector - Centralized Telemetry Pipeline
# Bridges mpiper_net (reachable by api/worker at otel-collector:4317)
# and mpiper_obs_net (forwards to tempo/prometheus/loki).
# ==========================================================================
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
container_name: mpiper-otel-collector
command: ["--config=/etc/otel-collector.yaml"]
env_file:
- ../../.env.local
- .env.local
environment:
DEPLOYMENT_ENV: local
SERVICE_VERSION: dev
K8S_CLUSTER_NAME: docker-compose
volumes:
- ../../observability/otel-collector.yml:/etc/otel-collector.yaml
- ./observability/otel-collector.yml:/etc/otel-collector.yaml
- /var/run/docker.sock:/var/run/docker.sock:ro
ports:
- "4319:4317" # OTLP gRPC (host:4319 -> container:4317)
Expand All @@ -168,7 +177,8 @@ services:
- "13133:13133" # Health check
- "55679:55679" # zPages extension
networks:
- mpiper-observability
- mpiper_net
- mpiper_obs_net
depends_on:
- tempo
- prometheus
Expand All @@ -185,9 +195,14 @@ services:
# NETWORKS
# ============================================================================
networks:
mpiper-observability:
# Joined from docker-compose.yml — the core network the api/worker run on.
mpiper_net:
external: true
name: mpiper_net
# Internal observability network — isolated from core services.
mpiper_obs_net:
driver: bridge
name: mpiper-observability
name: mpiper_obs_net

# ============================================================================
# VOLUMES
Expand Down
Loading