Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 174 additions & 11 deletions car-sharing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ showing three things that are normally hard, wired straight from the framework:
- **Gateway auth at the edge** — JWT authentication + proto-driven authorization
(`@connectum/auth`) on the public-facing `trips` service.
- **Cross-service `ctx.call` across split pods** — the trip handler calls fleet
and billing through the typed service catalog; the framework picks in-process
or network transport from env, no handler changes.
through the typed service catalog (availability pre-check); the framework picks
in-process or network transport from env, no handler changes.
- **OpenTelemetry observability** — RPC tracing/metrics (`@connectum/otel`),
enabled per role by env, on top of Istio's mesh telemetry.

Expand Down Expand Up @@ -60,7 +60,8 @@ flowchart TB
ingress -->|"AuthorizationPolicy:<br/>ingress SA only"| tripsApp
tripsApp -->|"ctx.call GetVehicle<br/>(pre-check, FLEET_ADDR)"| fleetApp
tripsApp -.->|"starts TripWorkflow<br/>(Temporal client)"| temporal[(Temporal<br/>cluster)]
temporal -->|"worker activities<br/>(ReserveVehicle, RecordTrip…)"| fleetApp
temporal -->|"worker activities<br/>(ReserveVehicle…)"| fleetApp
temporal -->|"worker activities<br/>(RecordTrip, EndTrip…)"| tripsApp
temporal -->|"worker activities<br/>(OpenTab, AddCharge, Settle…)"| billingApp

fleetApp -. "AuthorizationPolicy:<br/>trips SA only" .-> tripsApp
Expand Down Expand Up @@ -301,6 +302,166 @@ The saga is covered without Docker or a Temporal cluster:
valid `StartTrip` returns `{ trip, workflow_id }` and starts exactly one
workflow keyed by the trip id.

## Phase 3 — EventBus broadcast (fan-out)

Phases 1 and 2 are about **driving** a flow: `ctx.call` is a synchronous request
that returns a value; the saga is a durable orchestration that retries and
compensates. Phase 3 adds the **third, orthogonal** mechanism — a
**fire-and-forget 1→N broadcast**. When a trip is **SETTLED**, the single domain
fact "this trip completed" is published **once** as `TripCompleted` and consumed
**independently** by three reactors that the saga knows nothing about.

| Mechanism | Shape | Guarantee |
| -------------------- | ---------------------------------- | -------------------------------------------------- |
| `ctx.call` (Phase 1) | synchronous typed RPC | request/response |
| Temporal saga (Phase 2) | durable orchestration + compensation | exactly-once, retried, **durable** |
| **EventBus broadcast (Phase 3)** | **fire-and-forget 1→N** | at-least-once per subscriber, **non-durable**, order-agnostic |

### Broadcast, not orchestration

EventBus is used here for **broadcast only** — never to drive the trip. The
publisher emits one event to one topic; three **independent** subscriber buses,
each its **own consumer group**, each get their own copy:

```
status = SETTLED (Phase 2 saga, unchanged)
▼ NEW terminal activity (worker, full Node)
acts.publishTripCompleted({ tripId, userId, vehicleId, durationMs })
│ publisherBus.publish(TripCompletedSchema, …) // NO {topic}
│ topic resolved = "trips.completed" (proto option via publishes)
┌──────────────────── topic: trips.completed ────────────────────┐
│ (NATS JetStream) │
└───────┬───────────────────────┬───────────────────────┬───────┘
│ group=cs-pricing │ group=cs-audit │ group=cs-notify
▼ (own durable consumer) ▼ (own durable consumer)▼ (own durable consumer)
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ PRICING/ANALYTICS │ │ AUDIT-LOG │ │ NOTIFICATIONS │
│ trip count + revenue│ │ append 1 record │ │ "receipt sent" │
└──────────────────┘ └─────────────────┘ └──────────────────┘
each reacts on its own, order-agnostic, failure-isolated
```

Two independent reasons this is **fan-out** and not load-balance:

1. **Framework-level** — two routes resolving to the same topic **cannot share
one bus** (the EventBus throws a *duplicate-topic* error at `start()`), so the
three reactors on `trips.completed` are **forced** onto three separate buses.
2. **Broker-level** — each bus uses a **distinct consumer group**, so NATS
JetStream gives each its own **durable consumer** → every reactor receives
every event. A **shared** group would load-balance (one reactor steals each
event) = a queue, not a broadcast.

So the rule the example teaches: **N independent consumers ⇒ N buses, each its
own group** — never one bus with N handlers on the same topic.

### Topic from the proto option (no raw strings)

The topic is declared **once**, on the proto method, and used end-to-end with no
hand-passed topic string:

```proto
service TripEventHandlers {
rpc OnTripCompleted(TripCompleted) returns (google.protobuf.Empty) {
option (connectum.events.v1.event).topic = "trips.completed";
}
}
```

The publisher bus lists this service in **`publishes`**, which populates its
publish-topic lookup from the proto option — so `publish(TripCompletedSchema, …)`
resolves `trips.completed` with **no `{topic}` argument**. (A pure publisher has
no subscriber `routes`, so without `publishes` the topic would silently fall back
to the message `typeName`.) Each reactor registers the **same** service via its
own bus's `routes`, subscribing to the same proto-resolved topic.

### Failure semantics — a lost broadcast must never reverse a paid trip

This is the crux of "EventBus is broadcast, Temporal is durability". The publish
is a **success-only tail** placed **outside** the saga's `try/catch`, reached
**only** when the trip is `SETTLED`, in its own `try/catch` that does **not**
rethrow:

```ts
try {
…reserve → record → end → openTab → addCharge → settle…
status = TripStatus.SETTLED;
} catch (err) {
…run compensations LIFO…; status = TripStatus.CANCELLED; throw …; // rethrows
}
// SUCCESS-ONLY TAIL — reached only when status === SETTLED:
try {
await broadcast.publishTripCompleted({ tripId, userId, vehicleId, durationMs });
} catch (err) {
log.warn("broadcast failed; trip stays SETTLED", { tripId });
}
return status;
```

Because this block is reached only on success and its `catch` does not rethrow, a
failed or timed-out broadcast **can never trigger compensation** on a settled,
paid trip. The reactors' work (an analytics tally, an audit line, a receipt) is
non-critical and reconstructable — losing one is acceptable; reversing a settled
financial trip is not. The compensation guard is this **structural placement**,
not the activity's retry count.

> **Rule:** if a side effect must be **durable / retried**, it belongs in
> **Temporal** (its own activity with a retry policy), **not** on the EventBus.
> Don't copy the swallow into a place that needs durability.

### Idempotent reactors

On a real broker the broadcast is **at-least-once**: a worker crash after publish
but before ack re-fires the reactors, and a redelivery would otherwise
double-count an analytics tally or send a second receipt. Each reactor therefore
**dedupes by `tripId`** in its in-memory store. This is the reason broadcast
reactors are written idempotent — broadcast trades durability for decoupling, and
idempotency absorbs the resulting redelivery.

### Processes and the dockerless test

Same "one image, role by env" story as the RPC roles: the worker hosts the
**publish-only** bus, and each reactor is `node src/reactor.ts` with a `REACTOR`
selector picking its route and group.

| Process | Entry | Bus |
| ------------------ | ---------------------------------- | ----------------------------------------- |
| Worker (publisher) | `node src/worker.ts` | publish-only (`publishes: [TripEventHandlers]`) |
| `reactor-pricing` | `REACTOR=pricing node src/reactor.ts` | route `OnTripCompleted`, group `cs-pricing` |
| `reactor-audit` | `REACTOR=audit node src/reactor.ts` | route `OnTripCompleted`, group `cs-audit` |
| `reactor-notify` | `REACTOR=notify node src/reactor.ts` | route `OnTripCompleted`, group `cs-notify` |

`tests/e2e/broadcast.test.ts` proves the fan-out **without a broker**: one shared
`MemoryAdapter()` feeds the publisher bus and all three reactor buses, the real
`publishTripCompleted` activity is driven through `MockActivityEnvironment`, and
all three reactors are asserted to fire — with the **full five-field
`TripCompleted` shape** verified field-by-field against the documented contract.
The workflow test additionally proves the broadcast runs **only** on the SETTLED
path (never on a compensated run) and that a **failed** broadcast leaves the trip
`SETTLED` with no compensation.

### Run it and watch the fan-out

```bash
docker compose up -d postgres
DATABASE_URL=postgresql://car_sharing:car_sharing@localhost:5432/car_sharing pnpm db:migrate
DATABASE_URL=postgresql://car_sharing:car_sharing@localhost:5432/car_sharing pnpm db:seed
docker compose --profile saga up --build # roles + worker + Temporal + NATS + 3 reactors
```

Start a trip on the `trips` role (`:5002`); when the saga settles, all **three**
reactor logs each print their independent reaction to the one `trips.completed`
event. Stop one reactor and replay — the others still receive (broadcast, not
steal).

> **Version note.** This phase uses the `publishes` option on `createEventBus`,
> added after `@connectum/events@1.0.0`. Until events `>= 1.1.0` is published,
> the example typechecks and runs against the **local** packages (the repo's
> local-package test flow), not the published `1.0.0` — against `1.0.0` the topic
> would fall back to the message `typeName`. Run with the local packages while
> the option is unreleased.

## Build the image

```bash
Expand Down Expand Up @@ -376,38 +537,40 @@ Every role sets `OTEL_EXPORTER_OTLP_ENDPOINT` in its ConfigMap, which turns on t
`@connectum/otel` server interceptor (the app enables OTel only when that env var
is present — see `src/observability.ts`). Spans carry a `connectum.transport`
attribute distinguishing in-process `ctx.call`s from network hops, and
`trustRemote: true` stitches gateway → fleet billing into one distributed trace.
`trustRemote: true` stitches gateway → fleet (and worker → fleet/trips/billing activities) into one distributed trace.
Point `OTEL_EXPORTER_OTLP_ENDPOINT` at your Collector (the manifests assume
`otel-collector.observability.svc.cluster.local:4317`).

## Layout

```
car-sharing/
├── proto/ fleet, trips, billing protos (+ auth options import)
├── proto/ fleet, trips, billing protos + trips/v1/trip_events.proto (TripCompleted + topic option)
├── src/
│ ├── db/ Drizzle: schema.ts, client.ts (Db DI), seed.ts
│ ├── events/ eventBus.ts (publisher + reactor bus factories), reactors.ts (3 idempotent reactors)
│ ├── services/ fleetService (Drizzle), billingService, tripService
│ ├── temporal/ TripWorkflow, activities, workflowClient, clients, config, tripStatus
│ ├── temporal/ TripWorkflow (+ broadcast tail), activities (+ publishTripCompleted), workflowClient, clients, config, tripStatus
│ ├── topology.ts env → mono/split (SERVICES + perServiceEnvResolver)
│ ├── auth.ts JWT + proto authz interceptors (uniform chain)
│ ├── observability.ts env-gated OpenTelemetry wiring
│ ├── server.ts buildServer() — services + catalog + interceptors + db
│ ├── worker.ts Temporal worker process (bundles workflow via swc; `pnpm worker`)
│ ├── worker.ts Temporal worker process (bundles workflow via swc; builds the publish-only bus; `pnpm worker`)
│ ├── reactor.ts broadcast subscriber process (role by REACTOR=pricing|audit|notify)
│ └── index.ts entry point (role by SERVICES)
├── drizzle/ generated SQL migrations (single source of truth)
├── drizzle.config.ts drizzle-kit config (schema → migrations / push)
├── tests/
│ ├── helpers/db.ts PGlite test db (migrate + seed), injected via DI
│ ├── workflow/ TripWorkflow: forward order + reverse compensation (mocked activities)
│ ├── workflow/ TripWorkflow: forward order + reverse compensation + broadcast tail (mocked activities)
│ ├── activity/ Activity bodies: RPC wiring + compensation idempotency (in-process monolith)
│ └── e2e/ e2e.test.ts (gateway/pre-check/auth, stub workflow client) + fleet.test.ts (persistence)
│ └── e2e/ e2e.test.ts (gateway/pre-check/auth, stub workflow client) + fleet.test.ts (persistence) + broadcast.test.ts (fan-out, MemoryAdapter)
├── k8s/ namespace, rbac, secret, configmap, deployments,
│ services, hpa
├── istio/ peer-auth, authz, destination-rule, virtual-service,
│ gateway, canary
├── docker-compose.yml profiles: `mono` (monolith + Postgres), `saga` (roles + worker + Temporal + Postgres)
├── docker-compose.yml profiles: `mono` (monolith + Postgres), `saga` (roles + worker + Temporal + Postgres + NATS + 3 reactors)
├── Dockerfile one multi-stage image, role by env
├── buf.yaml / buf.gen.yaml dual-module (own protos + auth options) + catalog plugin
├── buf.yaml / buf.gen.yaml multi-module (own protos + auth + events options) + catalog plugin
├── package.json / tsconfig.json / pnpm-workspace.yaml
```
8 changes: 8 additions & 0 deletions car-sharing/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ modules:
# `import "connectum/auth/v1/options.proto"` in the service protos resolves.
# `pnpm install` MUST run before `buf generate` (it populates node_modules).
- path: node_modules/@connectum/auth/proto
# The events options.proto (connectum.events.v1.event), shipped inside
# @connectum/events. Declared as a workspace module so the
# `import "connectum/events/v1/options.proto"` in trip_events.proto resolves —
# the same mechanism as the auth module above (no BSR / vendoring).
- path: node_modules/@connectum/events/proto
lint:
use:
- STANDARD
except:
- RPC_REQUEST_STANDARD_NAME
- RPC_RESPONSE_STANDARD_NAME
- RPC_REQUEST_RESPONSE_UNIQUE
# TripEventHandlers is an event-handler service (each RPC is a subscription),
# so the conventional "...Service" suffix does not apply.
- SERVICE_SUFFIX
breaking:
use:
- FILE
73 changes: 73 additions & 0 deletions car-sharing/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,28 @@ services:
- car-sharing
profiles: ["saga"]

# ── NATS (JetStream) — the broadcast broker for Phase 3 ────────────────────
#
# The trip saga publishes ONE `trips.completed` event on settle; the three
# reactor processes (below) each subscribe with a DISTINCT consumer group, so
# each gets its OWN durable consumer = fan-out (a shared group would load-
# balance / steal). JetStream (`-js`) is REQUIRED because the NATS adapter uses
# durable consumers. `-m 8222` exposes the monitoring endpoint for the probe.
nats:
image: nats:2.10-alpine
command: ["-js", "-m", "8222"]
ports:
- "4222:4222"
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:8222/healthz"]
interval: 5s
timeout: 3s
retries: 10
start_period: 5s
networks:
- car-sharing
profiles: ["saga"]

# ── FLEET role — the persistent leaf (Drizzle + app Postgres) ──────────────
fleet:
build: .
Expand Down Expand Up @@ -227,6 +249,9 @@ services:
- FLEET_ADDR=http://fleet:5001
- TRIPS_ADDR=http://trips:5002
- BILLING_ADDR=http://billing:5003
# The worker hosts the publish-only EventBus: on settle it broadcasts ONE
# `trips.completed` event here (the reactors below consume it).
- NATS_URL=nats://nats:4222
depends_on:
temporal:
condition: service_healthy
Expand All @@ -236,6 +261,54 @@ services:
condition: service_healthy
billing:
condition: service_healthy
nats:
condition: service_healthy
networks:
- car-sharing
profiles: ["saga"]

# ── REACTORS — three INDEPENDENT broadcast subscribers (node src/reactor.ts) ─
#
# Each is the SAME image with a different `REACTOR` selector, on its OWN
# consumer group (cs-pricing / cs-audit / cs-notify), so NATS gives each its
# own durable consumer and every reactor receives every `trips.completed` =
# fan-out. Co-hosting all three buses in one process would change the process
# count, NOT the fan-out semantics (still three durable consumers).
reactor-pricing:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=pricing
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- car-sharing
profiles: ["saga"]

reactor-audit:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=audit
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- car-sharing
profiles: ["saga"]

reactor-notify:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=notify
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- car-sharing
profiles: ["saga"]
Expand Down
2 changes: 2 additions & 0 deletions car-sharing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
"@connectrpc/connect-node": "^2.1.1",
"@connectum/auth": "^1.0.0",
"@connectum/core": "^1.0.0",
"@connectum/events": "^1.0.0",
"@connectum/events-nats": "^1.0.0",
"@connectum/healthcheck": "^1.0.0",
"@connectum/interceptors": "^1.0.0",
"@connectum/otel": "^1.0.0",
Expand Down
Loading