diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1187a27..03af4fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,80 @@ jobs: - run: pnpm run build - run: pnpm run test + e2e: + runs-on: ubuntu-latest + services: + redis: + image: redis:7 + ports: + - 6390:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + postgres: + image: postgres:17-alpine + env: + POSTGRES_USER: storage + POSTGRES_PASSWORD: storage + POSTGRES_DB: storage + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U storage -d storage" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + strategy: + matrix: + node-version: [22, 24, 26] + env: + REDIS_URL: redis://127.0.0.1:6390 + PG_URL: postgresql://storage:storage@127.0.0.1:5432/storage + steps: + - uses: actions/checkout@v4 + - uses: pnpm/action-setup@v4 + with: + version: latest + - uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node-version }} + cache: 'pnpm' + - run: pnpm install + - run: pnpm run test:e2e + + docker-smoke: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Bring up storage-db stack + working-directory: examples/storage-db + run: docker compose up --build -d --wait + - name: Wait for coordinator to be ready + run: | + for i in $(seq 1 30); do + if curl -sf http://127.0.0.1:8080/pods > /dev/null; then + echo "coordinator ready after ${i}s" + exit 0 + fi + sleep 1 + done + echo "coordinator did not become ready in 30s" + docker compose -f examples/storage-db/docker-compose.yml logs + exit 1 + - name: Run smoke script + working-directory: examples/storage-db + run: ./scripts/smoke.sh + - name: Dump logs on failure + if: failure() + working-directory: examples/storage-db + run: docker compose logs + - name: Tear down + if: always() + working-directory: examples/storage-db + run: docker compose down -v + lint: runs-on: ubuntu-latest steps: diff --git a/coordinator-pattern.md b/coordinator-pattern.md index c0f22e3..1ed4d83 100644 --- a/coordinator-pattern.md +++ b/coordinator-pattern.md @@ -231,14 +231,56 @@ The coordinator picks among a destination's pod set on the request path using th Reducing fan-out (removing a pod from a destination's set when load drops) is not a runtime concern. It is an operator action or a slow background reconciler; running it from the data path risks thrash under bursty load. -## Why Kubernetes +## Platform requirements -The architecture leans on two K8s features that some other platforms (ECS, plain VM fleets) do not directly provide: +The pattern is platform-neutral. It runs anywhere these three properties hold: -- **Headless Services**: the coordinator addresses resource pods by pod IP, read from Valkey. Platforms that route traffic through load balancers or DNS service discovery typically do not expose a stable per-pod endpoint for the coordinator to dial directly. -- **Stable pod identity** (StatefulSet-style): the identity registered into Valkey is the pod's K8s identity. Ephemeral task IPs force the heartbeat path to do more work after every restart. +1. **Each resource pod has a dialable address.** A scheme + host + port the coordinator can open a TCP connection to. Host can be a per-pod DNS name (Kubernetes Headless Service), a per-task IP (ECS `awsvpc` ENI, Nomad alloc IP), or any other stable-enough endpoint. +2. **Each resource pod can compute its own address at startup** and write it into Valkey. The pod tells the registry where it is; the registry is the discovery layer. +3. **The coordinator and all resource pods can reach Valkey** and can open TCP connections to the addresses they read from it. -The in-process co-location of caller and coordinator inside one process is a separate constraint and works on either platform. ECS can be made to work with Cloud Map for service discovery and a custom registration path, but at meaningful cost to operational simplicity. +Nothing in `@platformatic/coordinator` calls a Kubernetes API, parses downward-API files, or relies on K8s-specific behavior. The library is Valkey + HTTP + a `memberAddress` string. + +### Kubernetes + +The natural fit. A `StatefulSet` + Headless Service gives each pod a predictable DNS name like `pod-0.svc.namespace.svc.cluster.local`, and the downward API composes that into an env var: + +```yaml +env: + - name: POD_NAME + valueFrom: { fieldRef: { fieldPath: metadata.name } } + - name: MEMBER_ADDRESS + value: "http://$(POD_NAME).svc.namespace.svc.cluster.local:3000" +``` + +The pod reads `MEMBER_ADDRESS` from config and passes it to `Member`. + +### ECS / Fargate + +Each task in `awsvpc` mode gets its own ENI with a private VPC IP. The task fetches that IP from the ECS task metadata endpoint at startup, composes its address, and self-registers. A minimal entrypoint: + +```sh +#!/bin/sh +IP=$(wget -qO- "${ECS_CONTAINER_METADATA_URI_V4}/task" \ + | jq -r '.Containers[0].Networks[0].IPv4Addresses[0]') +export MEMBER_ADDRESS="http://${IP}:3000" +exec node start.js +``` + +The coordinator dials the IP directly, so Cloud Map and ALBs are not in the path. Two Fargate tasks in the same VPC can reach each other given a permissive intra-SG rule. Task IDs are random rather than ordinal, but the library does not care: `memberId` is opaque. + +### Nomad, plain VMs, Docker Compose, local dev + +Same shape. Each instance discovers its own address (alloc IP, instance metadata, container name, `localhost:`), exports it as `MEMBER_ADDRESS`, and registers. The `storage-db` example in this repo runs on plain Docker Compose with explicit per-pod hostnames and is identical, code-wise, to a Fargate or Kubernetes deployment. + +### What you give up off-Kubernetes + +The two ergonomic conveniences that K8s provides without effort: + +- **Ordinal pod names.** Kubernetes `StatefulSet` gives you stable, human-friendly identifiers like `pod-0`, `pod-1`. ECS and most schedulers give you random IDs. This is a debugging convenience, not a functional requirement. +- **Stable DNS.** K8s Headless Service publishes per-pod DNS names that survive across IP changes. On ECS the address is the task IP, which changes on restart. Both are fine for the coordinator pattern because the registry is the source of truth, but K8s gives you a second source for free. + +Neither shortfall changes the protocol or the code. They affect the deployment glue around the library. ## Scaling diff --git a/examples/storage-db/.dockerignore b/examples/storage-db/.dockerignore new file mode 100644 index 0000000..4c44e43 --- /dev/null +++ b/examples/storage-db/.dockerignore @@ -0,0 +1,5 @@ +node_modules +dist +.eslintcache +.git +*.log diff --git a/examples/storage-db/.gitignore b/examples/storage-db/.gitignore new file mode 100644 index 0000000..4bbe675 --- /dev/null +++ b/examples/storage-db/.gitignore @@ -0,0 +1,4 @@ +node_modules +dist +.eslintcache +*.log diff --git a/examples/storage-db/Dockerfile b/examples/storage-db/Dockerfile new file mode 100644 index 0000000..ea1c039 --- /dev/null +++ b/examples/storage-db/Dockerfile @@ -0,0 +1,26 @@ +# Build context must be the workspace root (coordinator/). +# See docker-compose.yml in this directory. +FROM node:24-alpine + +RUN corepack enable + +WORKDIR /workspace + +COPY pnpm-workspace.yaml package.json ./ +COPY examples/storage-db/package.json examples/storage-db/ + +RUN --mount=type=cache,target=/root/.local/share/pnpm/store \ + pnpm install + +COPY src ./src +COPY tsconfig.json ./ +COPY examples/storage-db/src examples/storage-db/src +COPY examples/storage-db/migrations examples/storage-db/migrations +COPY examples/storage-db/tsconfig.json examples/storage-db/ + +RUN pnpm --filter @platformatic/coordinator run build + +WORKDIR /workspace/examples/storage-db +EXPOSE 3000 8080 + +CMD ["node", "src/bin/pod.ts"] diff --git a/examples/storage-db/MANUAL_TESTING.md b/examples/storage-db/MANUAL_TESTING.md new file mode 100644 index 0000000..b7590d3 --- /dev/null +++ b/examples/storage-db/MANUAL_TESTING.md @@ -0,0 +1,328 @@ +# Manual Testing + +How to spin up the storage-db stack by hand and exercise it from a shell. The automated e2e suite at `coordinator/test/e2e/storage-db.test.ts` covers the same paths in CI; this document is for when you want to poke at it interactively. + +The end state in both paths below is the same: a coordinator listening on `localhost:8080`, three pods registered in Valkey, Postgres ready for tenant data. + +## Path 1 - everything in containers + +Fastest. One command: + +```sh +cd examples/storage-db +docker compose up --build -d --wait +``` + +Brings up: + +| Service | Host port | Role | +|---|---|---| +| `valkey` | `6379` | member registry + destination sets + lock records | +| `postgres` | `5432` | shared database, one schema per tenant | +| `coordinator` | `8080` | the HTTP front door | +| `pod1`, `pod2`, `pod3` | not published | storage pods, reachable from inside the compose network | + +Run the bundled exerciser: + +```sh +./scripts/smoke.sh +``` + +Tear down (also wipes Valkey + Postgres state): + +```sh +docker compose down -v +``` + +## Path 2 - node processes against containerized Valkey + Postgres + +Useful when you want to attach a debugger, edit code, and see changes without a Docker rebuild. + +Start only the data stores: + +```sh +cd examples/storage-db +docker compose up -d --wait valkey postgres +``` + +From the workspace root (the `coordinator/` directory), install and build the library: + +```sh +cd ../.. +pnpm install +pnpm run build +``` + +Then run each process in its own terminal: + +```sh +cd examples/storage-db + +# Terminal 1: coordinator on :8080 +REDIS_URL=redis://127.0.0.1:6379 \ + STRATEGY=least-loaded \ + PORT=8080 \ + node src/bin/coordinator.ts + +# Terminal 2: pod1 on :3001 +REDIS_URL=redis://127.0.0.1:6379 \ + PG_URL=postgresql://storage:storage@127.0.0.1:5432/storage \ + MEMBER_ID=pod1 MEMBER_ADDRESS=http://127.0.0.1:3001 \ + PORT=3001 \ + node src/bin/pod.ts + +# Terminal 3: pod2 on :3002 (same as pod1 but PORT=3002, MEMBER_ID=pod2, MEMBER_ADDRESS=http://127.0.0.1:3002) +# Terminal 4: pod3 on :3003 +``` + +The smoke script and the curl examples below work against either path because both expose the coordinator on `localhost:8080`. + +## Exercise the API + +```sh +# List the live pods +curl -s localhost:8080/pods | jq + +# Create a tenant. The coordinator picks a pod via the configured strategy +# (default: least-loaded) and returns the chosen memberId. +curl -s -X POST localhost:8080/tenants/foo | jq + +# Write a key +curl -s -X PUT localhost:8080/tenants/foo/keys/hello \ + -H 'content-type: application/json' \ + -d '{"value":"world"}' + +# Read it back. The response is tagged with the serving pod's memberId. +# All reads/writes for the same tenant route to the same pod. +curl -s localhost:8080/tenants/foo/keys/hello | jq + +# List all keys for a tenant +curl -s localhost:8080/tenants/foo/keys | jq + +# Drop the tenant (DROP SCHEMA CASCADE + remove the destination set) +curl -s -X DELETE localhost:8080/tenants/foo +``` + +### Transactions + +```sh +# Begin a transaction. Returns { lockId, tenantId, memberId }. +# The pod opens a pinned pg.PoolClient, runs BEGIN, registers the lock in Valkey. +curl -s -X POST localhost:8080/tenants/foo/transactions | jq + +# Capture the lockId for the next steps +LOCK=$(curl -s -X POST localhost:8080/tenants/foo/transactions | jq -r .lockId) + +# Write inside the transaction. The coordinator routes this via lookupLockAndProxy +# to the same pod that holds the pinned connection. +curl -s -X PUT localhost:8080/transactions/$LOCK/keys/k \ + -H 'content-type: application/json' \ + -d '{"value":"inside-txn"}' + +# Read inside the transaction sees the uncommitted write +curl -s localhost:8080/transactions/$LOCK/keys/k | jq + +# Read outside the transaction does NOT see the uncommitted write +curl -s localhost:8080/tenants/foo/keys/k | jq +# -> 404 until commit + +# Commit. The pod runs COMMIT, releases the pinned connection, unregisters the lock. +curl -s -X POST localhost:8080/transactions/$LOCK/commit + +# Now the read from outside sees the value +curl -s localhost:8080/tenants/foo/keys/k | jq + +# Or rollback instead: +# curl -s -X POST localhost:8080/transactions/$LOCK/rollback +``` + +## Inspect Valkey while the stack runs + +```sh +# Container-local CLI +docker compose exec valkey valkey-cli + +# Or from the host (Valkey port is published) +redis-cli -h 127.0.0.1 -p 6379 +``` + +Useful reads: + +```sh +docker compose exec valkey valkey-cli SMEMBERS storage-db:members +docker compose exec valkey valkey-cli HGETALL storage-db:member:pod1 +docker compose exec valkey valkey-cli TTL storage-db:member:pod1 +docker compose exec valkey valkey-cli SMEMBERS storage-db:destination:foo +docker compose exec valkey valkey-cli KEYS 'storage-db:lock:*' +docker compose exec valkey valkey-cli HGETALL storage-db:lock: + +# Live tail of every Valkey command (heartbeats, resolves, lock writes) +docker compose exec valkey valkey-cli MONITOR +``` + +## Inspect Postgres directly + +Bypass the pod and the coordinator to verify what actually landed in the database: + +```sh +docker compose exec postgres psql -U storage -d storage +``` + +Inside psql: + +```sql +-- List tenant schemas +SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'tenant_%'; + +-- Postgrator's migration log for one tenant +SELECT * FROM tenant_foo.schemaversion; + +-- The actual data table +SELECT key, value, updated_at FROM tenant_foo.kv; +``` + +## Tail logs + +```sh +docker compose logs -f coordinator +docker compose logs -f pod1 +docker compose logs -f valkey postgres # combined data-store logs +``` + +## Walkthrough: failover when a pod dies + +End-to-end demonstration that a tenant survives its pod going away. Assumes a clean stack (`docker compose down -v && docker compose up --build -d --wait`). + +### 1. Create a tenant and write data + +```sh +ASSIGNED=$(curl -s -X POST localhost:8080/tenants/foo | jq -r .memberId) +echo "Tenant foo is on $ASSIGNED" + +curl -s -X PUT localhost:8080/tenants/foo/keys/k \ + -H 'content-type: application/json' \ + -d '{"value":"before-failover"}' + +curl -s localhost:8080/tenants/foo/keys/k | jq +# { "key": "k", "value": "before-failover", "memberId": "" } +``` + +Confirm Valkey and Postgres both reflect the new state: + +```sh +docker compose exec valkey valkey-cli SMEMBERS storage-db:destination:foo +# 1) "" + +docker compose exec postgres psql -U storage -d storage \ + -c "SELECT key, value FROM tenant_foo.kv" +# key | value +# -----+----------------- +# k | before-failover +``` + +### 2. Stop the assigned pod + +```sh +docker compose stop "$ASSIGNED" +``` + +`docker compose stop` sends `SIGTERM`. The pod's signal handler runs `Member.deregister()`, which: + +- removes the pod from `storage-db:members` (the live-pod set) +- deletes `storage-db:member:` (the hash holding `address` and `load`) + +```sh +docker compose exec valkey valkey-cli SMEMBERS storage-db:members +# (the other two pods, no $ASSIGNED) + +docker compose exec valkey valkey-cli EXISTS "storage-db:member:$ASSIGNED" +# (integer) 0 + +# But the destination set still says the tenant is on the dead pod: +docker compose exec valkey valkey-cli SMEMBERS storage-db:destination:foo +# 1) "" +``` + +The destination set isn't cleaned up at shutdown because the registry has no reverse index from pod -> destinations it serves. The cleanup happens lazily on the next request, in the orphan-reassign code path. + +### 3. Wait for the coordinator's resolve cache to expire + +The coordinator caches `resolveDestination` results for 5 seconds by default. While that entry is warm, requests for `foo` will still try the dead address and fail with a connection error. Wait past the TTL: + +```sh +sleep 6 +``` + +(Or `docker compose restart coordinator` to wipe the cache immediately. The e2e suite sets `CACHE_TTL_MS=500` for the same reason.) + +### 4. Trigger the failover with a real request + +```sh +curl -s localhost:8080/tenants/foo/keys/k | jq +# { +# "key": "k", +# "value": "before-failover", +# "memberId": "" +# } +``` + +Three things happened inside that one HTTP call: + +1. Coordinator cache miss -> fresh `resolveDestination("foo")`. +2. Destination set was `[]`, member hash gone, so `livePods=[]`, `deadPods=[]`. With `reassignOrphans: true`, the coordinator picked a live pod via the strategy, did `SREM ` and `SADD ` on `destination:foo`, returned the new pod. +3. The new pod's PUT/GET handler called `pools.ensure("foo")`. `CREATE SCHEMA IF NOT EXISTS tenant_foo` was a no-op, postgrator saw schemaversion at version 1 and ran no migrations. `SELECT FROM tenant_foo.kv` returned the existing row. + +The destination set in Valkey now reflects the new assignment: + +```sh +docker compose exec valkey valkey-cli SMEMBERS storage-db:destination:foo +# 1) "" +``` + +### 5. Verify the new pod owns the tenant + +```sh +docker compose exec coordinator wget -qO- "http://:3000/health" | jq +# { "ok": true, "memberId": "", "load": 1, "tenants": ["foo"] } +``` + +### 6. Write more data; it lands in the same schema + +```sh +curl -s -X PUT localhost:8080/tenants/foo/keys/k2 \ + -H 'content-type: application/json' \ + -d '{"value":"after-failover"}' + +docker compose exec postgres psql -U storage -d storage \ + -c "SELECT key, value FROM tenant_foo.kv ORDER BY key" +# key | value +# -----+----------------- +# k | before-failover +# k2 | after-failover +``` + +The data persists across the pod loss because storage-db is schema-per-tenant in a **shared** Postgres database. The pod owns the connections; the data lives in Postgres. + +### 7. Restore the original pod (optional) + +```sh +docker compose start "$ASSIGNED" +``` + +It re-registers itself within ~10 seconds and joins `storage-db:members` as available capacity for new tenants. The existing `foo` tenant stays on the new pod (there's no rebalancing on rejoin). + +### Variant: hard kill instead of graceful stop + +```sh +docker compose kill "$ASSIGNED" +``` + +`SIGKILL` skips the deregister handler. The member hash stays in Valkey until its TTL expires (default `30s`, set via `MEMBER_TTL`). During that window the next request still routes to the dead pod's address and times out. After TTL expires, orphan-reassign kicks in. The e2e suite uses `MEMBER_TTL=3` to make this fast; for an interactive demo you can either wait, or rebuild the image with a shorter `MEMBER_TTL` in `docker-compose.yml`. + +## Things worth observing + +- `GET /pods` shows `load` per pod (sum of open Postgres connections across its tenant pools). Watch it climb after writes, then settle to idle after `idleTimeoutMillis` (60s). +- After several `POST /tenants/...` calls, the `load` field starts to diverge between pods. Subsequent tenants land on whichever pod has the lowest `load`. +- `docker compose restart pod2` -> the pod re-registers itself in Valkey within seconds; tenants pinned to pod2 keep routing to it as soon as it's back. +- `docker compose stop pod3` -> after ~30s (the default `MEMBER_TTL`), pod3's hash record expires in Valkey; tenants that were on pod3 reassign to a live pod on their next request (orphan reassignment). +- `docker compose exec valkey valkey-cli FLUSHALL` -> everything reassigns: pods will re-register on the next heartbeat, but every destination set is gone, so the next request for a tenant returns 404 (no claim-on-miss in this example). diff --git a/examples/storage-db/README.md b/examples/storage-db/README.md new file mode 100644 index 0000000..0767525 --- /dev/null +++ b/examples/storage-db/README.md @@ -0,0 +1,95 @@ +# @platformatic/storage-db + +A reference *headless service* for the `@platformatic/coordinator` pattern: a multi-tenant Postgres KV store split across N pods, with each pod opening one Postgres connection pool **per tenant it serves**. Each pod self-registers in Valkey; a coordinator process routes tenant traffic to the right pod. + +This repo exists to demonstrate that the coordinator pattern is platform-neutral (no Kubernetes required) and to give a working end-to-end smoke test you can run with one docker-compose command. + +## What's inside + +| Process | Role | Port | +|---|---|---| +| `valkey` | Member registry + destination sets | internal | +| `postgres` | Shared Postgres, one schema per tenant | internal | +| `coordinator` | Resolves tenant -> pod via `@platformatic/coordinator`, proxies HTTP | `8080` (host) | +| `pod1` / `pod2` / `pod3` | Storage pods: open a `pg.Pool` per tenant lazily on first hit, report `load = total open connections` | internal | + +## Run + +```sh +docker compose up --build +``` + +Then in a second terminal: + +```sh +./scripts/smoke.sh +``` + +You should see five tenants get spread across the three pods (default strategy is `least-loaded`), and reads come back tagged with the `memberId` of the pod that served them. + +For step-by-step instructions on driving the stack manually (curl examples, transactions, Valkey/Postgres inspection), see [MANUAL_TESTING.md](./MANUAL_TESTING.md). + +## API (via the coordinator at `:8080`) + +| Method | Path | What it does | +|---|---|---| +| `GET` | `/pods` | List live pods + their current `load` | +| `POST` | `/tenants/:tenantId` | Pick a pod, create the tenant on it, bind tenant -> pod in Valkey | +| `PUT` | `/tenants/:tenantId/keys/:key` | Upsert `{ value }` for the tenant's owning pod | +| `GET` | `/tenants/:tenantId/keys/:key` | Read a single key | +| `GET` | `/tenants/:tenantId/keys` | List all keys for the tenant | +| `DELETE` | `/tenants/:tenantId/keys/:key` | Delete one key | +| `DELETE` | `/tenants/:tenantId` | Drop the tenant entirely + remove the binding | + +`tenantId` must match `^[a-zA-Z0-9_-]{1,64}$`. + +## How the pieces map to the coordinator pattern + +| Abstract concept (see `coordinator/coordinator-pattern.md`) | storage-db | +|---|---| +| Resource pod | One `pod*` container running `src/bin/pod.ts` | +| Destination | A `tenantId` | +| Local share | The per-tenant `pg.Pool` on that pod | +| `load` | Sum of `pool.totalCount` across the pod's tenant pools | +| Coordinator | `coordinator` container running `src/bin/coordinator.ts` | +| Member registry | Valkey, key prefix `storage-db:` | + +## Layout + +``` +src/ + bin/ + pod.ts # pod entry point (Member.register + heartbeat + Fastify) + coordinator.ts # coordinator entry point (Registry + helpers + Fastify) + pod-plugin.ts # /tenants/:id and /keys routes for the pod + coordinator-plugin.ts# routes that use pickAndRegister / lookupAndProxy / lookupAndDeregister + pool-manager.ts # per-tenant pg.Pool, lazy creation, load = total open connections +scripts/smoke.sh # end-to-end exerciser +docker-compose.yml # valkey + postgres + coordinator + 3 pods +Dockerfile # one image, two entry points (pod, coordinator) +``` + +## Environment variables + +### Pod (`src/bin/pod.ts`) +- `PORT` (default `3000`) +- `HOST` (default `0.0.0.0`) +- `REDIS_URL` (required) +- `MEMBER_ID` (required) - opaque, stable across the pod's lifetime +- `MEMBER_ADDRESS` (required) - the URL the coordinator will dial, e.g. `http://pod1:3000` +- `PG_URL` (required) +- `KEY_PREFIX` (default `storage-db`) +- `HEARTBEAT_MS` (default `10000`) + +### Coordinator (`src/bin/coordinator.ts`) +- `PORT` (default `8080`) +- `HOST` (default `0.0.0.0`) +- `REDIS_URL` (required) +- `KEY_PREFIX` (default `storage-db`) +- `STRATEGY` (default `least-loaded`) - one of `round-robin`, `least-loaded`, `random` + +## Notes + +- This is a demo, not production code. There is no auth, no TLS, no metrics. +- Tenant deletion drops the Postgres schema with `CASCADE`. Don't point this at a database you care about. +- The pod uses `--experimental-strip-types` to run TypeScript directly; no build step is needed for the demo. diff --git a/examples/storage-db/docker-compose.yml b/examples/storage-db/docker-compose.yml new file mode 100644 index 0000000..5deeb0b --- /dev/null +++ b/examples/storage-db/docker-compose.yml @@ -0,0 +1,71 @@ +services: + valkey: + image: valkey/valkey:8 + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "valkey-cli", "ping"] + interval: 2s + timeout: 2s + retries: 10 + + postgres: + image: postgres:17-alpine + ports: + - "5432:5432" + environment: + POSTGRES_USER: storage + POSTGRES_PASSWORD: storage + POSTGRES_DB: storage + healthcheck: + test: ["CMD-SHELL", "pg_isready -U storage -d storage"] + interval: 2s + timeout: 2s + retries: 10 + + coordinator: + build: + context: ../.. + dockerfile: examples/storage-db/Dockerfile + command: ["node", "src/bin/coordinator.ts"] + environment: + PORT: "8080" + REDIS_URL: redis://valkey:6379 + KEY_PREFIX: storage-db + STRATEGY: least-loaded + LOG_LEVEL: info + depends_on: + valkey: { condition: service_healthy } + ports: + - "8080:8080" + + pod1: &pod + build: + context: ../.. + dockerfile: examples/storage-db/Dockerfile + command: ["node", "src/bin/pod.ts"] + environment: &pod-env + PORT: "3000" + REDIS_URL: redis://valkey:6379 + KEY_PREFIX: storage-db + MEMBER_ID: pod1 + MEMBER_ADDRESS: http://pod1:3000 + PG_URL: postgresql://storage:storage@postgres:5432/storage + LOG_LEVEL: info + depends_on: + valkey: { condition: service_healthy } + postgres: { condition: service_healthy } + + pod2: + <<: *pod + environment: + <<: *pod-env + MEMBER_ID: pod2 + MEMBER_ADDRESS: http://pod2:3000 + + pod3: + <<: *pod + environment: + <<: *pod-env + MEMBER_ID: pod3 + MEMBER_ADDRESS: http://pod3:3000 diff --git a/examples/storage-db/migrations/001.do.create-kv.sql b/examples/storage-db/migrations/001.do.create-kv.sql new file mode 100644 index 0000000..7752326 --- /dev/null +++ b/examples/storage-db/migrations/001.do.create-kv.sql @@ -0,0 +1,5 @@ +CREATE TABLE kv ( + key text PRIMARY KEY, + value text NOT NULL, + updated_at timestamptz NOT NULL DEFAULT now() +); diff --git a/examples/storage-db/migrations/001.undo.create-kv.sql b/examples/storage-db/migrations/001.undo.create-kv.sql new file mode 100644 index 0000000..7ec25a7 --- /dev/null +++ b/examples/storage-db/migrations/001.undo.create-kv.sql @@ -0,0 +1 @@ +DROP TABLE kv; diff --git a/examples/storage-db/package.json b/examples/storage-db/package.json new file mode 100644 index 0000000..04a132a --- /dev/null +++ b/examples/storage-db/package.json @@ -0,0 +1,31 @@ +{ + "name": "@platformatic/storage-db", + "version": "0.1.0", + "description": "Multi-tenant per-pod Postgres proxy. Reference headless service for the @platformatic/coordinator pattern", + "author": "Platformatic Inc. (https://platformatic.dev)", + "license": "Apache-2.0", + "private": true, + "type": "module", + "scripts": { + "build": "tsc", + "start:pod": "node src/bin/pod.ts", + "start:coordinator": "node src/bin/coordinator.ts", + "compose:up": "docker compose up --build", + "compose:down": "docker compose down -v", + "smoke": "./scripts/smoke.sh" + }, + "dependencies": { + "@fastify/reply-from": "^12.6.2", + "@platformatic/coordinator": "workspace:*", + "fastify": "^5.3.2", + "fastify-plugin": "^5.0.1", + "pg": "^8.13.1", + "postgrator": "^8.0.0" + }, + "devDependencies": { + "@types/pg": "^8.11.10" + }, + "engines": { + "node": ">= 22.18.0" + } +} diff --git a/examples/storage-db/scripts/smoke.sh b/examples/storage-db/scripts/smoke.sh new file mode 100755 index 0000000..f2329e4 --- /dev/null +++ b/examples/storage-db/scripts/smoke.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +set -euo pipefail + +BASE="${BASE:-http://localhost:8080}" +TENANTS=("alpha" "bravo" "charlie" "delta" "echo") + +echo "==> live pods" +curl -fsS "$BASE/pods" | jq . +echo + +echo "==> creating ${#TENANTS[@]} tenants" +for t in "${TENANTS[@]}"; do + curl -fsS -X POST "$BASE/tenants/$t" | jq -c . +done +echo + +echo "==> writing one key per tenant" +for t in "${TENANTS[@]}"; do + curl -fsS -X PUT "$BASE/tenants/$t/keys/greeting" \ + -H 'content-type: application/json' \ + -d "{\"value\":\"hello from $t\"}" +done +echo "done" +echo + +echo "==> reading back, observing which pod served each tenant" +for t in "${TENANTS[@]}"; do + result=$(curl -fsS "$BASE/tenants/$t/keys/greeting") + echo "$t -> $result" +done +echo + +echo "==> final pod load distribution" +curl -fsS "$BASE/pods" | jq . diff --git a/examples/storage-db/src/bin/coordinator.ts b/examples/storage-db/src/bin/coordinator.ts new file mode 100644 index 0000000..7d7800e --- /dev/null +++ b/examples/storage-db/src/bin/coordinator.ts @@ -0,0 +1,40 @@ +import Fastify from 'fastify' +import { Registry } from '@platformatic/coordinator' +import { coordinatorPlugin } from '../coordinator-plugin.ts' + +const env = (key: string, fallback?: string): string => { + const v = process.env[key] ?? fallback + if (v === undefined) throw new Error(`missing env var: ${key}`) + return v +} + +const port = Number(process.env.PORT ?? 8080) +const host = process.env.HOST ?? '0.0.0.0' +const redisUrl = env('REDIS_URL') +const keyPrefix = process.env.KEY_PREFIX ?? 'storage-db' +const strategy = (process.env.STRATEGY ?? 'least-loaded') as 'round-robin' | 'least-loaded' | 'random' +const cacheTtlMs = process.env.CACHE_TTL_MS ? Number(process.env.CACHE_TTL_MS) : undefined + +const registry = new Registry({ + redis: redisUrl, + keyPrefix, + strategy, + cache: cacheTtlMs !== undefined ? { ttl: cacheTtlMs } : undefined +}) + +const app = Fastify({ logger: { level: process.env.LOG_LEVEL ?? 'info' } }) +await app.register(coordinatorPlugin, { registry }) + +const shutdown = async (): Promise => { + try { await registry.close() } catch { /* ignore */ } + await app.close() +} + +for (const sig of ['SIGINT', 'SIGTERM'] as const) { + process.once(sig, () => { + shutdown().then(() => process.exit(0), () => process.exit(1)) + }) +} + +await app.listen({ port, host }) +app.log.info({ port, host, strategy }, 'storage-db coordinator listening') diff --git a/examples/storage-db/src/bin/pod.ts b/examples/storage-db/src/bin/pod.ts new file mode 100644 index 0000000..81a4860 --- /dev/null +++ b/examples/storage-db/src/bin/pod.ts @@ -0,0 +1,59 @@ +import Fastify from 'fastify' +import { Member } from '@platformatic/coordinator' +import { PoolManager } from '../pool-manager.ts' +import { podPlugin } from '../pod-plugin.ts' + +const env = (key: string, fallback?: string): string => { + const v = process.env[key] ?? fallback + if (v === undefined) throw new Error(`missing env var: ${key}`) + return v +} + +const port = Number(process.env.PORT ?? 3000) +const host = process.env.HOST ?? '0.0.0.0' +const redisUrl = env('REDIS_URL') +const memberId = env('MEMBER_ID') +const memberAddress = env('MEMBER_ADDRESS') +const pgUrl = env('PG_URL') +const keyPrefix = process.env.KEY_PREFIX ?? 'storage-db' +const heartbeatMs = Number(process.env.HEARTBEAT_MS ?? 10_000) +const memberTtl = process.env.MEMBER_TTL ? Number(process.env.MEMBER_TTL) : undefined + +const pools = new PoolManager({ connectionString: pgUrl }) + +const member = new Member({ + redis: redisUrl, + memberId, + address: memberAddress, + keyPrefix, + ttl: memberTtl, + getLoad: () => pools.load() +}) + +const app = Fastify({ logger: { level: process.env.LOG_LEVEL ?? 'info' } }) +await app.register(podPlugin, { pools, member, memberId }) + +await member.register() +app.log.info({ memberId, memberAddress }, 'registered in member registry') + +const heartbeat = setInterval(() => { + member.heartbeat().catch(err => app.log.error({ err }, 'heartbeat failed')) +}, heartbeatMs) +heartbeat.unref() + +const shutdown = async (): Promise => { + clearInterval(heartbeat) + try { await member.deregister() } catch { /* ignore */ } + try { await member.close() } catch { /* ignore */ } + try { await pools.close() } catch { /* ignore */ } + await app.close() +} + +for (const sig of ['SIGINT', 'SIGTERM'] as const) { + process.once(sig, () => { + shutdown().then(() => process.exit(0), () => process.exit(1)) + }) +} + +await app.listen({ port, host }) +app.log.info({ port, host }, 'storage-db pod listening') diff --git a/examples/storage-db/src/coordinator-plugin.ts b/examples/storage-db/src/coordinator-plugin.ts new file mode 100644 index 0000000..1ab9b58 --- /dev/null +++ b/examples/storage-db/src/coordinator-plugin.ts @@ -0,0 +1,106 @@ +import fp from 'fastify-plugin' +import replyFrom from '@fastify/reply-from' +import type { FastifyInstance, FastifyRequest } from 'fastify' +import { + Registry, + lookupAndProxy, + pickAndRegister, + lookupAndDeregister, + lookupLockAndProxy +} from '@platformatic/coordinator' + +interface TenantParams { tenantId: string } +interface LockParams { lockId: string } + +export interface CoordinatorOptions { + registry: Registry +} + +const tenantSchema = { + params: { + type: 'object', + properties: { tenantId: { type: 'string', pattern: '^[a-zA-Z0-9_-]{1,64}$' } }, + required: ['tenantId'] + } +} as const + +const tenantKeySchema = { + params: { + type: 'object', + properties: { + tenantId: { type: 'string', pattern: '^[a-zA-Z0-9_-]{1,64}$' }, + key: { type: 'string', minLength: 1, maxLength: 256 } + }, + required: ['tenantId', 'key'] + } +} as const + +const lockSchema = { + params: { + type: 'object', + properties: { lockId: { type: 'string', minLength: 1, maxLength: 128 } }, + required: ['lockId'] + } +} as const + +const lockKeySchema = { + params: { + type: 'object', + properties: { + lockId: { type: 'string', minLength: 1, maxLength: 128 }, + key: { type: 'string', minLength: 1, maxLength: 256 } + }, + required: ['lockId', 'key'] + } +} as const + +async function coordinatorRoutes (app: FastifyInstance, opts: CoordinatorOptions): Promise { + const { registry } = opts + await app.register(replyFrom) + + const tenantFrom = (req: FastifyRequest): string => (req.params as TenantParams).tenantId + + app.get('/pods', async () => { + const members = await registry.listLiveMembers() + return { count: members.length, members } + }) + + app.post('/tenants/:tenantId', { schema: tenantSchema }, pickAndRegister(registry, { + registerIdFrom: (body: any) => body.tenantId, + expectedStatus: 201, + unavailableMessage: 'no pods available' + })) + + const proxyOpts = { + destinationFrom: tenantFrom, + reassignOrphans: true, + notFoundMessage: 'tenant not found' + } + + app.get('/tenants/:tenantId/keys', { schema: tenantSchema }, lookupAndProxy(registry, proxyOpts)) + app.get('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) + app.put('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) + app.delete('/tenants/:tenantId/keys/:key', { schema: tenantKeySchema }, lookupAndProxy(registry, proxyOpts)) + + app.delete('/tenants/:tenantId', { schema: tenantSchema }, lookupAndDeregister(registry, { + destinationFrom: tenantFrom, + notFoundMessage: 'tenant not found' + })) + + app.post('/tenants/:tenantId/transactions', + { schema: tenantSchema }, + lookupAndProxy(registry, proxyOpts)) + + const lockFrom = (req: FastifyRequest): string => (req.params as LockParams).lockId + const lockProxy = lookupLockAndProxy(registry, { + lockFrom, + notFoundMessage: 'transaction not found' + }) + + app.put('/transactions/:lockId/keys/:key', { schema: lockKeySchema }, lockProxy) + app.get('/transactions/:lockId/keys/:key', { schema: lockKeySchema }, lockProxy) + app.post('/transactions/:lockId/commit', { schema: lockSchema }, lockProxy) + app.post('/transactions/:lockId/rollback', { schema: lockSchema }, lockProxy) +} + +export const coordinatorPlugin = fp(coordinatorRoutes, { name: 'storage-db-coordinator' }) diff --git a/examples/storage-db/src/pod-plugin.ts b/examples/storage-db/src/pod-plugin.ts new file mode 100644 index 0000000..c87c7d8 --- /dev/null +++ b/examples/storage-db/src/pod-plugin.ts @@ -0,0 +1,188 @@ +import fp from 'fastify-plugin' +import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify' +import type { Member } from '@platformatic/coordinator' +import { PoolManager, isValidTenantId } from './pool-manager.ts' + +interface TenantParams { tenantId: string } +interface KeyParams { tenantId: string, key: string } +interface LockParams { lockId: string } +interface LockKeyParams { lockId: string, key: string } +interface ValueBody { value: string } + +export interface PodOptions { + pools: PoolManager + member: Member + memberId: string +} + +const tenantSchema = { + params: { + type: 'object', + properties: { tenantId: { type: 'string', pattern: '^[a-zA-Z0-9_-]{1,64}$' } }, + required: ['tenantId'] + } +} as const + +const keySchema = { + params: { + type: 'object', + properties: { + tenantId: { type: 'string', pattern: '^[a-zA-Z0-9_-]{1,64}$' }, + key: { type: 'string', minLength: 1, maxLength: 256 } + }, + required: ['tenantId', 'key'] + } +} as const + +const lockSchema = { + params: { + type: 'object', + properties: { lockId: { type: 'string', minLength: 1, maxLength: 128 } }, + required: ['lockId'] + } +} as const + +const lockKeySchema = { + params: { + type: 'object', + properties: { + lockId: { type: 'string', minLength: 1, maxLength: 128 }, + key: { type: 'string', minLength: 1, maxLength: 256 } + }, + required: ['lockId', 'key'] + } +} as const + +async function podRoutes (app: FastifyInstance, opts: PodOptions): Promise { + const { pools, member, memberId } = opts + + app.get('/health', async () => ({ ok: true, memberId, load: pools.load(), tenants: pools.tenantIds() })) + + app.post<{ Params: TenantParams }>('/tenants/:tenantId', { schema: tenantSchema }, async (req, reply) => { + const { tenantId } = req.params + if (!isValidTenantId(tenantId)) return reply.code(400).send({ error: 'invalid tenantId' }) + await pools.ensure(tenantId) + return reply.code(201).send({ tenantId, memberId }) + }) + + app.put<{ Params: KeyParams, Body: ValueBody }>('/tenants/:tenantId/keys/:key', { + schema: { + ...keySchema, + body: { + type: 'object', + properties: { value: { type: 'string' } }, + required: ['value'] + } + } + }, async (req, reply) => { + const { tenantId, key } = req.params + const pool = await pools.ensure(tenantId) + const schema = pools.schema(tenantId) + await pool.query( + `INSERT INTO "${schema}".kv (key, value) + VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now()`, + [key, req.body.value] + ) + return reply.code(204).send() + }) + + app.get<{ Params: KeyParams }>('/tenants/:tenantId/keys/:key', { schema: keySchema }, async (req, reply) => { + const { tenantId, key } = req.params + const pool = await pools.ensure(tenantId) + const schema = pools.schema(tenantId) + const result = await pool.query<{ value: string }>( + `SELECT value FROM "${schema}".kv WHERE key = $1`, + [key] + ) + if (result.rowCount === 0) return reply.code(404).send({ error: 'key not found' }) + return { key, value: result.rows[0].value, memberId } + }) + + app.delete<{ Params: KeyParams }>('/tenants/:tenantId/keys/:key', { schema: keySchema }, async (req, reply) => { + const { tenantId, key } = req.params + const pool = await pools.ensure(tenantId) + const schema = pools.schema(tenantId) + await pool.query(`DELETE FROM "${schema}".kv WHERE key = $1`, [key]) + return reply.code(204).send() + }) + + app.get<{ Params: TenantParams }>('/tenants/:tenantId/keys', { schema: tenantSchema }, async (req) => { + const { tenantId } = req.params + const pool = await pools.ensure(tenantId) + const schema = pools.schema(tenantId) + const result = await pool.query<{ key: string }>(`SELECT key FROM "${schema}".kv ORDER BY key`) + return { tenantId, memberId, keys: result.rows.map((r: { key: string }) => r.key) } + }) + + app.delete<{ Params: TenantParams }>('/tenants/:tenantId', { schema: tenantSchema }, async (req: FastifyRequest, reply: FastifyReply) => { + const { tenantId } = req.params as TenantParams + await pools.drop(tenantId) + return reply.code(204).send() + }) + + app.post<{ Params: TenantParams }>('/tenants/:tenantId/transactions', { schema: tenantSchema }, async (req, reply) => { + const { tenantId } = req.params + if (!isValidTenantId(tenantId)) return reply.code(400).send({ error: 'invalid tenantId' }) + const handle = await pools.beginTransaction(tenantId) + try { + await member.registerLock(handle.lockId, tenantId) + } catch (err) { + await pools.rollbackTransaction(handle.lockId) + throw err + } + return reply.code(201).send({ lockId: handle.lockId, tenantId, memberId }) + }) + + app.put<{ Params: LockKeyParams, Body: ValueBody }>('/transactions/:lockId/keys/:key', { + schema: { + ...lockKeySchema, + body: { + type: 'object', + properties: { value: { type: 'string' } }, + required: ['value'] + } + } + }, async (req, reply) => { + const { lockId, key } = req.params + const txn = pools.transaction(lockId) + if (!txn) return reply.code(404).send({ error: 'transaction not found on this pod' }) + await txn.client.query( + `INSERT INTO "${txn.schema}".kv (key, value) + VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now()`, + [key, req.body.value] + ) + return reply.code(204).send() + }) + + app.get<{ Params: LockKeyParams }>('/transactions/:lockId/keys/:key', { schema: lockKeySchema }, async (req, reply) => { + const { lockId, key } = req.params + const txn = pools.transaction(lockId) + if (!txn) return reply.code(404).send({ error: 'transaction not found on this pod' }) + const result = await txn.client.query<{ value: string }>( + `SELECT value FROM "${txn.schema}".kv WHERE key = $1`, + [key] + ) + if (result.rowCount === 0) return reply.code(404).send({ error: 'key not found' }) + return { key, value: result.rows[0].value, memberId, lockId } + }) + + app.post<{ Params: LockParams }>('/transactions/:lockId/commit', { schema: lockSchema }, async (req, reply) => { + const { lockId } = req.params + const handle = await pools.commitTransaction(lockId) + if (!handle) return reply.code(404).send({ error: 'transaction not found on this pod' }) + await member.unregisterLock(lockId) + return reply.code(204).send() + }) + + app.post<{ Params: LockParams }>('/transactions/:lockId/rollback', { schema: lockSchema }, async (req, reply) => { + const { lockId } = req.params + const handle = await pools.rollbackTransaction(lockId) + if (!handle) return reply.code(404).send({ error: 'transaction not found on this pod' }) + await member.unregisterLock(lockId) + return reply.code(204).send() + }) +} + +export const podPlugin = fp(podRoutes, { name: 'storage-db-pod' }) diff --git a/examples/storage-db/src/pool-manager.ts b/examples/storage-db/src/pool-manager.ts new file mode 100644 index 0000000..da9f761 --- /dev/null +++ b/examples/storage-db/src/pool-manager.ts @@ -0,0 +1,178 @@ +import { randomUUID } from 'node:crypto' +import { fileURLToPath } from 'node:url' +import { dirname, resolve } from 'node:path' +import pg from 'pg' +import Postgrator from 'postgrator' + +const { Pool } = pg + +const MIGRATION_DIR = resolve(dirname(fileURLToPath(import.meta.url)), '../migrations') + +export interface PoolManagerOptions { + connectionString: string + max?: number + migrationDir?: string +} + +export interface TransactionHandle { + lockId: string + client: pg.PoolClient + tenantId: string + schema: string +} + +export class PoolManager { + #connectionString: string + #max: number + #migrationDir: string + #pools = new Map() + #transactions = new Map() + + constructor (opts: PoolManagerOptions) { + this.#connectionString = opts.connectionString + this.#max = opts.max ?? 5 + this.#migrationDir = opts.migrationDir ?? MIGRATION_DIR + } + + async ensure (tenantId: string): Promise { + let pool = this.#pools.get(tenantId) + if (pool) return pool + + pool = new Pool({ + connectionString: this.#connectionString, + max: this.#max, + idleTimeoutMillis: 60_000 + }) + + const schema = schemaName(tenantId) + const client = await pool.connect() + try { + await client.query(`CREATE SCHEMA IF NOT EXISTS ${quoteIdent(schema)}`) + await client.query(`SET search_path TO ${quoteIdent(schema)}`) + + const postgrator = new Postgrator({ + migrationPattern: `${this.#migrationDir}/*`, + driver: 'pg', + schemaTable: `${schema}.schemaversion`, + currentSchema: schema, + execQuery: (query: string) => client.query(query) + }) + await postgrator.migrate() + } finally { + client.release() + } + + this.#pools.set(tenantId, pool) + return pool + } + + pool (tenantId: string): pg.Pool | undefined { + return this.#pools.get(tenantId) + } + + async drop (tenantId: string): Promise { + const pool = this.#pools.get(tenantId) + if (!pool) return + this.#pools.delete(tenantId) + try { + const client = await pool.connect() + try { + await client.query(`DROP SCHEMA IF EXISTS ${quoteIdent(schemaName(tenantId))} CASCADE`) + } finally { + client.release() + } + } finally { + await pool.end() + } + } + + schema (tenantId: string): string { + return schemaName(tenantId) + } + + async beginTransaction (tenantId: string): Promise { + const pool = await this.ensure(tenantId) + const client = await pool.connect() + try { + const schema = schemaName(tenantId) + await client.query(`SET search_path TO ${quoteIdent(schema)}`) + await client.query('BEGIN') + const lockId = `tx-${randomUUID()}` + const handle: TransactionHandle = { lockId, client, tenantId, schema } + this.#transactions.set(lockId, handle) + return handle + } catch (err) { + client.release() + throw err + } + } + + transaction (lockId: string): TransactionHandle | undefined { + return this.#transactions.get(lockId) + } + + async commitTransaction (lockId: string): Promise { + const handle = this.#transactions.get(lockId) + if (!handle) return null + this.#transactions.delete(lockId) + try { + await handle.client.query('COMMIT') + } finally { + handle.client.release() + } + return handle + } + + async rollbackTransaction (lockId: string): Promise { + const handle = this.#transactions.get(lockId) + if (!handle) return null + this.#transactions.delete(lockId) + try { + await handle.client.query('ROLLBACK') + } finally { + handle.client.release() + } + return handle + } + + load (): number { + let total = 0 + for (const pool of this.#pools.values()) { + total += pool.totalCount + } + return total + } + + tenantIds (): string[] { + return [...this.#pools.keys()] + } + + async close (): Promise { + for (const handle of this.#transactions.values()) { + try { await handle.client.query('ROLLBACK') } catch { /* ignore */ } + handle.client.release() + } + this.#transactions.clear() + + const pools = [...this.#pools.values()] + this.#pools.clear() + await Promise.all(pools.map(p => p.end())) + } +} + +const tenantPattern = /^[a-zA-Z0-9_-]{1,64}$/ + +export function isValidTenantId (id: string): boolean { + return tenantPattern.test(id) +} + +function schemaName (tenantId: string): string { + if (!isValidTenantId(tenantId)) { + throw new Error(`invalid tenantId: ${tenantId}`) + } + return `tenant_${tenantId.replace(/-/g, '_')}` +} + +function quoteIdent (ident: string): string { + return `"${ident.replace(/"/g, '""')}"` +} diff --git a/examples/storage-db/tsconfig.json b/examples/storage-db/tsconfig.json new file mode 100644 index 0000000..c1e3890 --- /dev/null +++ b/examples/storage-db/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "declaration": true, + "sourceMap": true, + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "isolatedModules": true, + "allowImportingTsExtensions": true, + "rewriteRelativeImportExtensions": true, + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"] +} diff --git a/package.json b/package.json index 07951de..1e3d368 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,8 @@ "type": "module", "scripts": { "build": "tsc", - "test": "node --experimental-strip-types --test --test-reporter=cleaner-spec-reporter test/*.test.ts", + "test": "node --test --test-reporter=cleaner-spec-reporter test/*.test.ts", + "test:e2e": "pnpm --filter @platformatic/coordinator run build && node --test --test-reporter=cleaner-spec-reporter --test-timeout=60000 test/e2e/*.test.ts", "test:redis:up": "docker compose up -d --wait", "test:redis:down": "docker compose down -v", "lint": "eslint --cache" @@ -38,10 +39,12 @@ }, "devDependencies": { "@types/node": "^25.5.2", + "@types/pg": "^8.11.10", "cleaner-spec-reporter": "^1.0.3", "eslint": "^9.39.4", "fastify": "^5.3.2", - "neostandard": "^0.13.0" + "neostandard": "^0.13.0", + "pg": "^8.13.1" }, "engines": { "node": ">= 22.18.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d3bffde..7705b48 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21,6 +21,9 @@ importers: '@types/node': specifier: ^25.5.2 version: 25.8.0 + '@types/pg': + specifier: ^8.11.10 + version: 8.20.0 cleaner-spec-reporter: specifier: ^1.0.3 version: 1.0.3 @@ -33,6 +36,34 @@ importers: neostandard: specifier: ^0.13.0 version: 0.13.0(eslint@9.39.4)(typescript@6.0.3) + pg: + specifier: ^8.13.1 + version: 8.20.0 + + examples/storage-db: + dependencies: + '@fastify/reply-from': + specifier: ^12.6.2 + version: 12.6.2 + '@platformatic/coordinator': + specifier: workspace:* + version: link:../.. + fastify: + specifier: ^5.3.2 + version: 5.8.5 + fastify-plugin: + specifier: ^5.0.1 + version: 5.1.0 + pg: + specifier: ^8.13.1 + version: 8.20.0 + postgrator: + specifier: ^8.0.0 + version: 8.0.0 + devDependencies: + '@types/pg': + specifier: ^8.11.10 + version: 8.20.0 packages: @@ -121,6 +152,10 @@ packages: '@iovalkey/commands@0.1.0': resolution: {integrity: sha512-/B9W4qKSSITDii5nkBCHyPkIkAi+ealUtr1oqBJsLxjSRLka4pxun2VvMNSmcwgAMxgXtQfl0qRv7TE+udPJzg==} + '@isaacs/cliui@9.0.0': + resolution: {integrity: sha512-AokJm4tuBHillT+FpMtxQ60n8ObyXBatq7jD2/JA9dxbDDokKQm8KMht5ibGzLVU9IJDIKK4TPKgMHEYMn3lMg==} + engines: {node: '>=18'} + '@pinojs/redact@0.4.0': resolution: {integrity: sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==} @@ -139,6 +174,9 @@ packages: '@types/node@25.8.0': resolution: {integrity: sha512-TCFSk8IZh+iLX1xtksoBVtdmgL+1IX0fC9BeU4QqFSuNdN/K+HUlhqOzEmSYYpZUVsLYcPqc9KX+60iDuninSQ==} + '@types/pg@8.20.0': + resolution: {integrity: sha512-bEPFOaMAHTEP1EzpvHTbmwR8UsFyHSKsRisLIHVMXnpNefSbGA1bD6CVy+qKjGSqmZqNqBDV2azOBo8TgkcVow==} + '@typescript-eslint/eslint-plugin@8.59.1': resolution: {integrity: sha512-BOziFIfE+6osHO9FoJG4zjoHUcvI7fTNBSpdAwrNH0/TLvzjsk2oo8XSSOT2HhqUyhZPfHv4UOffoJ9oEEQ7Ag==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -571,6 +609,10 @@ packages: resolution: {integrity: sha512-dKx12eRCVIzqCxFGplyFKJMPvLEWgmNtUrpTiJIR5u97zEhRG8ySrtboPHZXx7daLxQVrl643cTzbab2tkQjxg==} engines: {node: '>= 0.4'} + foreground-child@3.3.1: + resolution: {integrity: sha512-gIXjKqtFuWEgzFRJA9WCQeSJLZDjgJUOMCMzxtvFq/37KojM1BFGufqsCy0r4qSQmYLsZYMeyRqzIWOMup03sw==} + engines: {node: '>=14'} + function-bind@1.1.2: resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} @@ -604,6 +646,12 @@ packages: resolution: {integrity: sha512-XxwI8EOhVQgWp6iDL+3b0r86f4d6AX6zSU55HfB4ydCEuXLXc5FcYeOu+nnGftS4TEju/11rt4KJPTMgbfmv4A==} engines: {node: '>=10.13.0'} + glob@11.1.0: + resolution: {integrity: sha512-vuNwKSaKiqm7g0THUBu2x7ckSs3XJLXE+2ssL7/MfTGPLLcrJQ/4Uq1CjPTtO5cCIiRxqvN6Twy1qOwhL0Xjcw==} + engines: {node: 20 || >=22} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me + hasBin: true + globals@14.0.0: resolution: {integrity: sha512-oahGvuMGQlPw/ivIYBjVSrWAfWLBeku5tpPE2fOPLi+WHffIWbuh2tCjhyQhTBPMf5E9jDEH4FOmTYgYwbKwtQ==} engines: {node: '>=18'} @@ -791,6 +839,10 @@ packages: resolution: {integrity: sha512-H0dkQoCa3b2VEeKQBOxFph+JAbcrQdE7KC0UkqwpLmv2EC4P41QXP+rqo9wYodACiG5/WM5s9oDApTU8utwj9g==} engines: {node: '>= 0.4'} + jackspeak@4.2.3: + resolution: {integrity: sha512-ykkVRwrYvFm1nb2AJfKKYPr0emF6IiXDYUaFx4Zn9ZuIH7MrzEZ3sD5RlqGXNRpHtvUHJyOnCEFxOlNDtGo7wg==} + engines: {node: 20 || >=22} + js-tokens@4.0.0: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} @@ -848,6 +900,10 @@ packages: resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==} hasBin: true + lru-cache@11.3.6: + resolution: {integrity: sha512-Gf/KoL3C/MlI7Bt0PGI9I+TeTC/I6r/csU58N4BSNc4lppLBeKsOdFYkK+dX0ABDUMJNfCHTyPpzwwO21Awd3A==} + engines: {node: 20 || >=22} + math-intrinsics@1.1.0: resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==} engines: {node: '>= 0.4'} @@ -859,6 +915,10 @@ packages: minimatch@3.1.5: resolution: {integrity: sha512-VgjWUsnnT6n+NUk6eZq77zeFdpW2LWDzP6zFGrCbHXiYNul5Dzqk2HHQ5uFH2DNW5Xbp8+jVzaeNt94ssEEl4w==} + minipass@7.1.3: + resolution: {integrity: sha512-tEBHqDnIoM/1rXME1zgka9g6Q2lcoCkxHLuc7ODJ5BxbP5d4c2Z5cGgtXAku59200Cx7diuHTOYfSBD8n6mm8A==} + engines: {node: '>=16 || 14 >=14.17'} + ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} @@ -935,6 +995,9 @@ packages: resolution: {integrity: sha512-wPrq66Llhl7/4AGC6I+cqxT07LhXvWL08LNXz1fENOw0Ap4sRZZ/gZpTTJ5jpurzzzfS2W/Ge9BY3LgLjCShcw==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + package-json-from-dist@1.0.1: + resolution: {integrity: sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==} + parent-module@1.0.1: resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==} engines: {node: '>=6'} @@ -950,10 +1013,48 @@ packages: path-parse@1.0.7: resolution: {integrity: sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==} + path-scurry@2.0.2: + resolution: {integrity: sha512-3O/iVVsJAPsOnpwWIeD+d6z/7PmqApyQePUtCndjatj/9I5LylHvt5qluFaBT3I5h3r1ejfR056c+FCv+NnNXg==} + engines: {node: 18 || 20 || >=22} + peowly@1.3.3: resolution: {integrity: sha512-5UmUtvuCv3KzBX2NuQw2uF28o0t8Eq4KkPRZfUCzJs+DiNVKw7OaYn29vNDgrt/Pggs23CPlSTqgzlhHJfpT0A==} engines: {node: '>=18.6.0', typescript: '>=5.8'} + pg-cloudflare@1.3.0: + resolution: {integrity: sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==} + + pg-connection-string@2.12.0: + resolution: {integrity: sha512-U7qg+bpswf3Cs5xLzRqbXbQl85ng0mfSV/J0nnA31MCLgvEaAo7CIhmeyrmJpOr7o+zm0rXK+hNnT5l9RHkCkQ==} + + pg-int8@1.0.1: + resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} + engines: {node: '>=4.0.0'} + + pg-pool@3.13.0: + resolution: {integrity: sha512-gB+R+Xud1gLFuRD/QgOIgGOBE2KCQPaPwkzBBGC9oG69pHTkhQeIuejVIk3/cnDyX39av2AxomQiyPT13WKHQA==} + peerDependencies: + pg: '>=8.0' + + pg-protocol@1.13.0: + resolution: {integrity: sha512-zzdvXfS6v89r6v7OcFCHfHlyG/wvry1ALxZo4LqgUoy7W9xhBDMaqOuMiF3qEV45VqsN6rdlcehHrfDtlCPc8w==} + + pg-types@2.2.0: + resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} + engines: {node: '>=4'} + + pg@8.20.0: + resolution: {integrity: sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==} + engines: {node: '>= 16.0.0'} + peerDependencies: + pg-native: '>=3.0.1' + peerDependenciesMeta: + pg-native: + optional: true + + pgpass@1.0.5: + resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==} + picomatch@4.0.4: resolution: {integrity: sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==} engines: {node: '>=12'} @@ -972,6 +1073,26 @@ packages: resolution: {integrity: sha512-/+5VFTchJDoVj3bhoqi6UeymcD00DAwb1nJwamzPvHEszJ4FpF6SNNbUbOS8yI56qHzdV8eK0qEfOSiodkTdxg==} engines: {node: '>= 0.4'} + postgrator@8.0.0: + resolution: {integrity: sha512-Kaw+/Ibk59pFyFzhdTpiXB3JVi4LFGl2JZ9RiWQk7qCtKUMFY8rat6ek0S3ON7SA4gj1yt6tn5s0Ukok2xnUFQ==} + engines: {node: '>=20.0.0'} + + postgres-array@2.0.0: + resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==} + engines: {node: '>=4'} + + postgres-bytea@1.0.1: + resolution: {integrity: sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==} + engines: {node: '>=0.10.0'} + + postgres-date@1.0.7: + resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==} + engines: {node: '>=0.10.0'} + + postgres-interval@1.2.0: + resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==} + engines: {node: '>=0.10.0'} + prelude-ls@1.2.1: resolution: {integrity: sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g==} engines: {node: '>= 0.8.0'} @@ -1113,6 +1234,10 @@ packages: resolution: {integrity: sha512-ZX99e6tRweoUXqR+VBrslhda51Nh5MTQwou5tnUDgbtyM0dBgmhEDtWGP/xbKn6hqfPRHujUNwz5fy/wbbhnpw==} engines: {node: '>= 0.4'} + signal-exit@4.1.0: + resolution: {integrity: sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw==} + engines: {node: '>=14'} + sonic-boom@4.2.1: resolution: {integrity: sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q==} @@ -1263,6 +1388,10 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + xtend@4.0.2: + resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} + engines: {node: '>=0.4'} + yocto-queue@0.1.0: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} @@ -1372,6 +1501,8 @@ snapshots: '@iovalkey/commands@0.1.0': {} + '@isaacs/cliui@9.0.0': {} + '@pinojs/redact@0.4.0': {} '@stylistic/eslint-plugin@2.11.0(eslint@9.39.4)(typescript@6.0.3)': @@ -1394,6 +1525,12 @@ snapshots: dependencies: undici-types: 7.24.6 + '@types/pg@8.20.0': + dependencies: + '@types/node': 25.8.0 + pg-protocol: 1.13.0 + pg-types: 2.2.0 + '@typescript-eslint/eslint-plugin@8.59.1(@typescript-eslint/parser@8.59.1(eslint@9.39.4)(typescript@6.0.3))(eslint@9.39.4)(typescript@6.0.3)': dependencies: '@eslint-community/regexpp': 4.12.2 @@ -2012,6 +2149,11 @@ snapshots: dependencies: is-callable: 1.2.7 + foreground-child@3.3.1: + dependencies: + cross-spawn: 7.0.6 + signal-exit: 4.1.0 + function-bind@1.1.2: {} function.prototype.name@1.1.8: @@ -2059,6 +2201,15 @@ snapshots: dependencies: is-glob: 4.0.3 + glob@11.1.0: + dependencies: + foreground-child: 3.3.1 + jackspeak: 4.2.3 + minimatch: 10.2.5 + minipass: 7.1.3 + package-json-from-dist: 1.0.1 + path-scurry: 2.0.2 + globals@14.0.0: {} globals@15.15.0: {} @@ -2250,6 +2401,10 @@ snapshots: has-symbols: 1.1.0 set-function-name: 2.0.2 + jackspeak@4.2.3: + dependencies: + '@isaacs/cliui': 9.0.0 + js-tokens@4.0.0: {} js-yaml@4.1.1: @@ -2308,6 +2463,8 @@ snapshots: dependencies: js-tokens: 4.0.0 + lru-cache@11.3.6: {} + math-intrinsics@1.1.0: {} minimatch@10.2.5: @@ -2318,6 +2475,8 @@ snapshots: dependencies: brace-expansion: 1.1.14 + minipass@7.1.3: {} + ms@2.1.3: {} natural-compare@1.4.0: {} @@ -2418,6 +2577,8 @@ snapshots: dependencies: p-limit: 4.0.0 + package-json-from-dist@1.0.1: {} + parent-module@1.0.1: dependencies: callsites: 3.1.0 @@ -2428,8 +2589,48 @@ snapshots: path-parse@1.0.7: {} + path-scurry@2.0.2: + dependencies: + lru-cache: 11.3.6 + minipass: 7.1.3 + peowly@1.3.3: {} + pg-cloudflare@1.3.0: + optional: true + + pg-connection-string@2.12.0: {} + + pg-int8@1.0.1: {} + + pg-pool@3.13.0(pg@8.20.0): + dependencies: + pg: 8.20.0 + + pg-protocol@1.13.0: {} + + pg-types@2.2.0: + dependencies: + pg-int8: 1.0.1 + postgres-array: 2.0.0 + postgres-bytea: 1.0.1 + postgres-date: 1.0.7 + postgres-interval: 1.2.0 + + pg@8.20.0: + dependencies: + pg-connection-string: 2.12.0 + pg-pool: 3.13.0(pg@8.20.0) + pg-protocol: 1.13.0 + pg-types: 2.2.0 + pgpass: 1.0.5 + optionalDependencies: + pg-cloudflare: 1.3.0 + + pgpass@1.0.5: + dependencies: + split2: 4.2.0 + picomatch@4.0.4: {} pino-abstract-transport@3.0.0: @@ -2454,6 +2655,20 @@ snapshots: possible-typed-array-names@1.1.0: {} + postgrator@8.0.0: + dependencies: + glob: 11.1.0 + + postgres-array@2.0.0: {} + + postgres-bytea@1.0.1: {} + + postgres-date@1.0.7: {} + + postgres-interval@1.2.0: + dependencies: + xtend: 4.0.2 + prelude-ls@1.2.1: {} process-warning@4.0.1: {} @@ -2610,6 +2825,8 @@ snapshots: side-channel-map: 1.0.1 side-channel-weakmap: 1.0.2 + signal-exit@4.1.0: {} + sonic-boom@4.2.1: dependencies: atomic-sleep: 1.0.0 @@ -2813,6 +3030,8 @@ snapshots: wrappy@1.0.2: {} + xtend@4.0.2: {} + yocto-queue@0.1.0: {} yocto-queue@1.2.2: {} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml new file mode 100644 index 0000000..471ec4b --- /dev/null +++ b/pnpm-workspace.yaml @@ -0,0 +1,3 @@ +packages: + - '.' + - 'examples/*' diff --git a/src/helpers/lookup-and-proxy.ts b/src/helpers/lookup-and-proxy.ts index 7bd802c..c740e9f 100644 --- a/src/helpers/lookup-and-proxy.ts +++ b/src/helpers/lookup-and-proxy.ts @@ -1,6 +1,7 @@ -import type { FastifyRequest, FastifyReply, RouteHandlerMethod } from 'fastify' +import type { FastifyRequest, RouteHandlerMethod } from 'fastify' import '@fastify/reply-from' import type { Registry } from '../registry.ts' +import { proxyVia } from './proxy-via.ts' export type LookupAndProxyResult = 'hit' | 'orphan_reassigned' | 'not_found' @@ -24,16 +25,14 @@ export function lookupAndProxy ( onResult } = opts - return async function (request: FastifyRequest, reply: FastifyReply) { + return proxyVia(async (request) => { const destinationId = destinationFrom(request) const resolved = await registry.resolveDestination(destinationId, { reassignOrphans, claimOnMiss }) - if (!resolved) { onResult?.('not_found') - return reply.code(404).send({ error: notFoundMessage }) + return null } - onResult?.(resolved.reassigned ? 'orphan_reassigned' : 'hit') - return reply.from(`${resolved.address}${request.url}`) - } + return resolved + }, { notFoundMessage }) } diff --git a/src/helpers/lookup-lock-and-proxy.ts b/src/helpers/lookup-lock-and-proxy.ts new file mode 100644 index 0000000..7abb33d --- /dev/null +++ b/src/helpers/lookup-lock-and-proxy.ts @@ -0,0 +1,34 @@ +import type { FastifyRequest, RouteHandlerMethod } from 'fastify' +import '@fastify/reply-from' +import type { Registry } from '../registry.ts' +import { proxyVia } from './proxy-via.ts' + +export type LookupLockAndProxyResult = 'hit' | 'not_found' + +export interface LookupLockAndProxyOptions { + lockFrom: (req: FastifyRequest) => string + notFoundMessage?: string + onResult?: (result: LookupLockAndProxyResult) => void +} + +export function lookupLockAndProxy ( + registry: Registry, + opts: LookupLockAndProxyOptions +): RouteHandlerMethod { + const { + lockFrom, + notFoundMessage = 'Lock not found', + onResult + } = opts + + return proxyVia(async (request) => { + const lockId = lockFrom(request) + const resolved = await registry.resolveLock(lockId) + if (!resolved) { + onResult?.('not_found') + return null + } + onResult?.('hit') + return resolved + }, { notFoundMessage }) +} diff --git a/src/helpers/proxy-via.ts b/src/helpers/proxy-via.ts new file mode 100644 index 0000000..d1bff4f --- /dev/null +++ b/src/helpers/proxy-via.ts @@ -0,0 +1,25 @@ +import type { FastifyRequest, FastifyReply, RouteHandlerMethod } from 'fastify' +import '@fastify/reply-from' + +export interface ProxyTarget { + address: string +} + +export type ProxyResolver = + (request: FastifyRequest) => Promise + +export interface ProxyViaOptions { + notFoundMessage?: string +} + +export function proxyVia ( + resolve: ProxyResolver, + opts: ProxyViaOptions = {} +): RouteHandlerMethod { + const notFoundMessage = opts.notFoundMessage ?? 'Not found' + return async function (request: FastifyRequest, reply: FastifyReply) { + const resolved = await resolve(request) + if (!resolved) return reply.code(404).send({ error: notFoundMessage }) + return reply.from(`${resolved.address}${request.url}`) + } +} diff --git a/src/index.ts b/src/index.ts index 861f3ef..9b52f32 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,3 +26,9 @@ export type { PickAndRegisterOptions, PickAndRegisterResult } from './helpers/pi export { lookupAndDeregister } from './helpers/lookup-and-deregister.ts' export type { LookupAndDeregisterOptions, LookupAndDeregisterResult } from './helpers/lookup-and-deregister.ts' + +export { lookupLockAndProxy } from './helpers/lookup-lock-and-proxy.ts' +export type { LookupLockAndProxyOptions, LookupLockAndProxyResult } from './helpers/lookup-lock-and-proxy.ts' + +export { proxyVia } from './helpers/proxy-via.ts' +export type { ProxyViaOptions, ProxyResolver, ProxyTarget } from './helpers/proxy-via.ts' diff --git a/test/e2e/storage-db.test.ts b/test/e2e/storage-db.test.ts new file mode 100644 index 0000000..e45a63a --- /dev/null +++ b/test/e2e/storage-db.test.ts @@ -0,0 +1,382 @@ +import { test, before, after } from 'node:test' +import assert from 'node:assert/strict' +import { spawn, type ChildProcess } from 'node:child_process' +import { setTimeout as wait } from 'node:timers/promises' +import { randomBytes } from 'node:crypto' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' +import { Redis } from 'iovalkey' +import pg from 'pg' + +const REDIS_URL = process.env.REDIS_URL ?? 'redis://127.0.0.1:6379' +const PG_URL = process.env.PG_URL ?? 'postgresql://storage:storage@127.0.0.1:5432/storage' +const KEY_PREFIX = `e2e-${randomBytes(4).toString('hex')}` + +const COORDINATOR_PORT = 18080 +const POD_PORTS = [13001, 13002, 13003] +const POD_IDS = ['e2e-pod-1', 'e2e-pod-2', 'e2e-pod-3'] +const COORDINATOR_URL = `http://127.0.0.1:${COORDINATOR_PORT}` + +const exampleDir = resolve(dirname(fileURLToPath(import.meta.url)), '../../examples/storage-db') + +const children: ChildProcess[] = [] + +const baseEnv = { + ...process.env, + REDIS_URL, + KEY_PREFIX, + LOG_LEVEL: 'warn', + HEARTBEAT_MS: '500', + MEMBER_TTL: '3' +} + +function spawnNode (script: string, env: Record): ChildProcess { + const child = spawn(process.execPath, [script], { + cwd: exampleDir, + env: { ...baseEnv, ...env }, + stdio: ['ignore', 'inherit', 'inherit'] + }) + children.push(child) + return child +} + +async function waitForLiveMembers (n: number, timeoutMs = 10_000): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + try { + const res = await fetch(`${COORDINATOR_URL}/pods`) + if (res.ok) { + const body = await res.json() as { count: number } + if (body.count >= n) return + } + } catch { /* not up yet */ } + await wait(100) + } + throw new Error(`timed out waiting for ${n} live members`) +} + +async function waitForPort (port: number, path: string, timeoutMs = 10_000): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + try { + const res = await fetch(`http://127.0.0.1:${port}${path}`) + if (res.ok) return + } catch { /* not up yet */ } + await wait(100) + } + throw new Error(`timed out waiting for port ${port}${path}`) +} + +before(async () => { + spawnNode('src/bin/coordinator.ts', { + PORT: String(COORDINATOR_PORT), + HOST: '127.0.0.1', + STRATEGY: 'least-loaded', + CACHE_TTL_MS: '500' + }) + + for (let i = 0; i < POD_PORTS.length; i++) { + spawnNode('src/bin/pod.ts', { + PORT: String(POD_PORTS[i]), + HOST: '127.0.0.1', + MEMBER_ID: POD_IDS[i], + MEMBER_ADDRESS: `http://127.0.0.1:${POD_PORTS[i]}`, + PG_URL + }) + } + + for (const p of POD_PORTS) await waitForPort(p, '/health') + await waitForPort(COORDINATOR_PORT, '/pods') + await waitForLiveMembers(POD_PORTS.length) +}) + +after(async () => { + for (const c of children) { + if (!c.killed) c.kill('SIGTERM') + } + await wait(500) + for (const c of children) { + if (!c.killed) c.kill('SIGKILL') + } + + const redis = new Redis(REDIS_URL) + try { + const keys = await redis.keys(`${KEY_PREFIX}:*`) + if (keys.length > 0) await redis.del(...keys) + } finally { + await redis.quit() + } + + const client = new pg.Client({ connectionString: PG_URL }) + await client.connect() + try { + const result = await client.query<{ schema_name: string }>( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'tenant_%'" + ) + for (const row of result.rows) { + await client.query(`DROP SCHEMA IF EXISTS "${row.schema_name}" CASCADE`) + } + } finally { + await client.end() + } +}) + +test('GET /pods returns the three live pods', async () => { + const res = await fetch(`${COORDINATOR_URL}/pods`) + assert.equal(res.status, 200) + const body = await res.json() as { count: number, members: Array<{ memberId: string }> } + assert.equal(body.count, 3) + const ids = body.members.map(m => m.memberId).sort() + assert.deepEqual(ids, [...POD_IDS].sort()) +}) + +// Runs first among the POST tests so every pod still reports load=0 in Valkey. +// Once any tenant is created and its heartbeat fires (~500ms), the strategy +// would correctly pack new tenants on the remaining cold pod(s) instead of +// distributing - that is the documented behaviour of least-loaded. +test('least-loaded: on cold start, six POSTs distribute across all three pods', async () => { + const tenantNames = ['cold-1', 'cold-2', 'cold-3', 'cold-4', 'cold-5', 'cold-6'] + const owners: string[] = [] + for (const t of tenantNames) { + const res = await fetch(`${COORDINATOR_URL}/tenants/${t}`, { method: 'POST' }) + const body = await res.json() as { memberId: string } + owners.push(body.memberId) + } + const unique = new Set(owners) + assert.equal(unique.size, 3, `expected all 3 pods to be picked, got ${[...unique].join(',')}`) +}) + +test('POST /tenants/:id picks a pod, binds it, returns the memberId', async () => { + const res = await fetch(`${COORDINATOR_URL}/tenants/alpha`, { method: 'POST' }) + assert.equal(res.status, 201) + const body = await res.json() as { tenantId: string, memberId: string } + assert.equal(body.tenantId, 'alpha') + assert.ok(POD_IDS.includes(body.memberId), `unexpected memberId ${body.memberId}`) +}) + +test('PUT then GET round-trips through the same pod', async () => { + const create = await fetch(`${COORDINATOR_URL}/tenants/bravo`, { method: 'POST' }) + const { memberId: ownerId } = await create.json() as { memberId: string } + + const put = await fetch(`${COORDINATOR_URL}/tenants/bravo/keys/hello`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'world' }) + }) + assert.equal(put.status, 204) + + const get = await fetch(`${COORDINATOR_URL}/tenants/bravo/keys/hello`) + assert.equal(get.status, 200) + const body = await get.json() as { value: string, memberId: string } + assert.equal(body.value, 'world') + assert.equal(body.memberId, ownerId) +}) + +test('postgres state: migrations ran and writes land in the tenant schema', async () => { + const create = await fetch(`${COORDINATOR_URL}/tenants/pgcheck`, { method: 'POST' }) + assert.equal(create.status, 201) + const { memberId: owner } = await create.json() as { memberId: string } + + await fetch(`${COORDINATOR_URL}/tenants/pgcheck/keys/alpha`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'one' }) + }) + await fetch(`${COORDINATOR_URL}/tenants/pgcheck/keys/beta`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'two' }) + }) + + const client = new pg.Client({ connectionString: PG_URL }) + await client.connect() + try { + const schemaRows = await client.query<{ schema_name: string }>( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'tenant_pgcheck'" + ) + assert.equal(schemaRows.rowCount, 1, 'tenant schema must exist in Postgres') + + const versionRows = await client.query<{ version: string }>( + 'SELECT version FROM tenant_pgcheck.schemaversion ORDER BY version DESC LIMIT 1' + ) + assert.equal(versionRows.rows[0]?.version, '1', 'postgrator must have applied migration 001') + + const tableRows = await client.query<{ key: string, value: string, updated_at: Date }>( + 'SELECT key, value, updated_at FROM tenant_pgcheck.kv ORDER BY key' + ) + assert.equal(tableRows.rowCount, 2) + assert.deepEqual( + tableRows.rows.map(r => ({ key: r.key, value: r.value })), + [{ key: 'alpha', value: 'one' }, { key: 'beta', value: 'two' }] + ) + assert.ok(tableRows.rows[0].updated_at instanceof Date, 'updated_at must be a real timestamp') + + const otherSchema = await client.query( + "SELECT 1 FROM information_schema.tables WHERE table_schema = 'tenant_pgcheck' AND table_name = 'kv'" + ) + assert.equal(otherSchema.rowCount, 1) + + assert.ok(POD_IDS.includes(owner)) + } finally { + await client.end() + } +}) + +test('transactions: commit makes the write visible outside the txn', async () => { + const create = await fetch(`${COORDINATOR_URL}/tenants/txn-commit`, { method: 'POST' }) + const { memberId: owner } = await create.json() as { memberId: string } + + const begin = await fetch(`${COORDINATOR_URL}/tenants/txn-commit/transactions`, { method: 'POST' }) + assert.equal(begin.status, 201) + const { lockId, memberId: txnOwner } = await begin.json() as { lockId: string, memberId: string } + assert.equal(txnOwner, owner, 'transaction must start on the tenant-owning pod') + + const outsideBefore = await fetch(`${COORDINATOR_URL}/tenants/txn-commit/keys/k`) + assert.equal(outsideBefore.status, 404, 'key must not exist before the write') + + const put = await fetch(`${COORDINATOR_URL}/transactions/${lockId}/keys/k`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'committed-value' }) + }) + assert.equal(put.status, 204) + + const insideRead = await fetch(`${COORDINATOR_URL}/transactions/${lockId}/keys/k`) + assert.equal(insideRead.status, 200) + const inside = await insideRead.json() as { value: string, memberId: string, lockId: string } + assert.equal(inside.value, 'committed-value') + assert.equal(inside.memberId, owner, 'read must hit the same pod via lock routing') + + const outsideMidTxn = await fetch(`${COORDINATOR_URL}/tenants/txn-commit/keys/k`) + assert.equal(outsideMidTxn.status, 404, 'uncommitted write must not be visible from outside') + + const commit = await fetch(`${COORDINATOR_URL}/transactions/${lockId}/commit`, { method: 'POST' }) + assert.equal(commit.status, 204) + + const outsideAfter = await fetch(`${COORDINATOR_URL}/tenants/txn-commit/keys/k`) + assert.equal(outsideAfter.status, 200) + const after = await outsideAfter.json() as { value: string } + assert.equal(after.value, 'committed-value') +}) + +test('transactions: rollback discards the write', async () => { + await fetch(`${COORDINATOR_URL}/tenants/txn-rollback`, { method: 'POST' }) + + await fetch(`${COORDINATOR_URL}/tenants/txn-rollback/keys/k`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'original' }) + }) + + const begin = await fetch(`${COORDINATOR_URL}/tenants/txn-rollback/transactions`, { method: 'POST' }) + const { lockId } = await begin.json() as { lockId: string } + + await fetch(`${COORDINATOR_URL}/transactions/${lockId}/keys/k`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'overwritten' }) + }) + + const insideRead = await fetch(`${COORDINATOR_URL}/transactions/${lockId}/keys/k`) + const inside = await insideRead.json() as { value: string } + assert.equal(inside.value, 'overwritten', 'txn sees its own write') + + const rollback = await fetch(`${COORDINATOR_URL}/transactions/${lockId}/rollback`, { method: 'POST' }) + assert.equal(rollback.status, 204) + + const after = await fetch(`${COORDINATOR_URL}/tenants/txn-rollback/keys/k`) + const body = await after.json() as { value: string } + assert.equal(body.value, 'original', 'rollback restores the prior value') +}) + +test('transactions: unknown lockId returns 404', async () => { + const res = await fetch(`${COORDINATOR_URL}/transactions/tx-does-not-exist/commit`, { method: 'POST' }) + assert.equal(res.status, 404) + const body = await res.json() as { error: string } + assert.equal(body.error, 'transaction not found') +}) + +test('failover (graceful): SIGTERM the owning pod, next request reassigns and preserves data', async () => { + const create = await fetch(`${COORDINATOR_URL}/tenants/failover-graceful`, { method: 'POST' }) + const { memberId: originalOwner } = await create.json() as { memberId: string } + + const put = await fetch(`${COORDINATOR_URL}/tenants/failover-graceful/keys/k`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'before-failover' }) + }) + assert.equal(put.status, 204) + + const warmRead = await fetch(`${COORDINATOR_URL}/tenants/failover-graceful/keys/k`) + const warm = await warmRead.json() as { value: string, memberId: string } + assert.equal(warm.memberId, originalOwner, 'warmup read must hit the original pod and cache the resolve') + + const ownerIdx = POD_IDS.indexOf(originalOwner) + assert.ok(ownerIdx >= 0) + const ownerChild = children[ownerIdx + 1] + + const exited = new Promise((resolve) => ownerChild.once('exit', () => resolve())) + ownerChild.kill('SIGTERM') + await Promise.race([exited, wait(5_000)]) + + const redis = new Redis(REDIS_URL) + try { + const hashExists = await redis.exists(`${KEY_PREFIX}:member:${originalOwner}`) + assert.equal(hashExists, 0, 'graceful deregister must delete the member hash') + } finally { + await redis.quit() + } + + await wait(700) + + const read = await fetch(`${COORDINATOR_URL}/tenants/failover-graceful/keys/k`) + assert.equal(read.status, 200) + const body = await read.json() as { value: string, memberId: string } + assert.equal(body.value, 'before-failover', 'data must survive the failover') + assert.notEqual(body.memberId, originalOwner, 'must be served by a different pod') + + const writeAfter = await fetch(`${COORDINATOR_URL}/tenants/failover-graceful/keys/k2`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'after-failover' }) + }) + assert.equal(writeAfter.status, 204) + + const client = new pg.Client({ connectionString: PG_URL }) + await client.connect() + try { + const rows = await client.query<{ key: string, value: string }>( + 'SELECT key, value FROM tenant_failover_graceful.kv ORDER BY key' + ) + assert.deepEqual( + rows.rows, + [{ key: 'k', value: 'before-failover' }, { key: 'k2', value: 'after-failover' }] + ) + } finally { + await client.end() + } +}) + +test('orphan reassignment: killing the owning pod reroutes the tenant', async () => { + const create = await fetch(`${COORDINATOR_URL}/tenants/orphan-test`, { method: 'POST' }) + const { memberId: originalOwner } = await create.json() as { memberId: string } + + const ownerIdx = POD_IDS.indexOf(originalOwner) + assert.ok(ownerIdx >= 0) + const ownerChild = children[ownerIdx + 1] + ownerChild.kill('SIGKILL') + + await wait(4_000) + + const res = await fetch(`${COORDINATOR_URL}/tenants/orphan-test/keys/k`, { + method: 'PUT', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ value: 'after-failover' }) + }) + assert.equal(res.status, 204) + + const get = await fetch(`${COORDINATOR_URL}/tenants/orphan-test/keys/k`) + const body = await get.json() as { value: string, memberId: string } + assert.equal(body.value, 'after-failover') + assert.notEqual(body.memberId, originalOwner) +}) diff --git a/test/helpers.test.ts b/test/helpers.test.ts index 32384b6..5e04fe6 100644 --- a/test/helpers.test.ts +++ b/test/helpers.test.ts @@ -8,6 +8,7 @@ import { Registry } from '../src/registry.ts' import { lookupAndProxy } from '../src/helpers/lookup-and-proxy.ts' import { pickAndRegister } from '../src/helpers/pick-and-register.ts' import { lookupAndDeregister } from '../src/helpers/lookup-and-deregister.ts' +import { lookupLockAndProxy } from '../src/helpers/lookup-lock-and-proxy.ts' import { REDIS_URL } from './redis-url.ts' const PREFIX = `test-${randomBytes(4).toString('hex')}` @@ -15,6 +16,7 @@ const PREFIX = `test-${randomBytes(4).toString('hex')}` const membersKey = (): string => `${PREFIX}:members` const memberKey = (id: string): string => `${PREFIX}:member:${id}` const destinationKey = (id: string): string => `${PREFIX}:destination:${id}` +const lockKey = (id: string): string => `${PREFIX}:lock:${id}` interface MockPod { app: ReturnType @@ -54,6 +56,11 @@ async function createMockPod (): Promise { return reply.code(204).send() }) + app.post('/locks/:lockId/echo', async (req, reply) => { + const { lockId } = req.params as { lockId: string } + return reply.code(200).send({ lockId, body: req.body }) + }) + await app.listen({ port: 0, host: '127.0.0.1' }) const addr = app.server.address() as any return { app, address: `http://127.0.0.1:${addr.port}`, resources } @@ -83,6 +90,12 @@ async function createCoordinator (registry: Registry): Promise req.params.id })) + app.post('/locks/:lockId/echo', + { schema: { body: { type: 'object', properties: { msg: { type: 'string' } } } } }, + lookupLockAndProxy(registry, { + lockFrom: (req: any) => req.params.lockId + })) + return app } @@ -219,4 +232,42 @@ test('Coordinator helpers', async (t) => { const res = await coordinator.inject({ method: 'DELETE', url: '/resources/never-existed' }) strictEqual(res.statusCode, 404) }) + + await t.test('lookupLockAndProxy: routes to the pod that owns the lock', async () => { + const lockId = `lock-${randomBytes(3).toString('hex')}` + await redis.hset(lockKey(lockId), { podId: memberId2, destinationId: 'unused' }) + + const res = await coordinator.inject({ + method: 'POST', + url: `/locks/${lockId}/echo`, + payload: { msg: 'pinned' } + }) + strictEqual(res.statusCode, 200) + const body = res.json() as any + strictEqual(body.lockId, lockId) + strictEqual(body.body.msg, 'pinned') + }) + + await t.test('lookupLockAndProxy: returns 404 for unknown lock', async () => { + const res = await coordinator.inject({ + method: 'POST', + url: '/locks/never-existed/echo', + payload: { msg: 'x' } + }) + strictEqual(res.statusCode, 404) + const body = res.json() as any + strictEqual(body.error, 'Lock not found') + }) + + await t.test('lookupLockAndProxy: returns 404 when the owning pod is dead', async () => { + const lockId = `lock-dead-${randomBytes(3).toString('hex')}` + await redis.hset(lockKey(lockId), { podId: 'dead-pod', destinationId: 'unused' }) + + const res = await coordinator.inject({ + method: 'POST', + url: `/locks/${lockId}/echo`, + payload: { msg: 'x' } + }) + strictEqual(res.statusCode, 404) + }) })