diff --git a/car-sharing/README.md b/car-sharing/README.md
index 08dcaa1..c8ecdbd 100644
--- a/car-sharing/README.md
+++ b/car-sharing/README.md
@@ -7,8 +7,8 @@ showing three things that are normally hard, wired straight from the framework:
- **Gateway auth at the edge** — JWT authentication + proto-driven authorization
(`@connectum/auth`) on the public-facing `trips` service.
- **Cross-service `ctx.call` across split pods** — the trip handler calls fleet
- and billing through the typed service catalog; the framework picks in-process
- or network transport from env, no handler changes.
+ through the typed service catalog (availability pre-check); the framework picks
+ in-process or network transport from env, no handler changes.
- **OpenTelemetry observability** — RPC tracing/metrics (`@connectum/otel`),
enabled per role by env, on top of Istio's mesh telemetry.
@@ -60,7 +60,8 @@ flowchart TB
ingress -->|"AuthorizationPolicy:
ingress SA only"| tripsApp
tripsApp -->|"ctx.call GetVehicle
(pre-check, FLEET_ADDR)"| fleetApp
tripsApp -.->|"starts TripWorkflow
(Temporal client)"| temporal[(Temporal
cluster)]
- temporal -->|"worker activities
(ReserveVehicle, RecordTrip…)"| fleetApp
+ temporal -->|"worker activities
(ReserveVehicle…)"| fleetApp
+ temporal -->|"worker activities
(RecordTrip, EndTrip…)"| tripsApp
temporal -->|"worker activities
(OpenTab, AddCharge, Settle…)"| billingApp
fleetApp -. "AuthorizationPolicy:
trips SA only" .-> tripsApp
@@ -301,6 +302,166 @@ The saga is covered without Docker or a Temporal cluster:
valid `StartTrip` returns `{ trip, workflow_id }` and starts exactly one
workflow keyed by the trip id.
+## Phase 3 — EventBus broadcast (fan-out)
+
+Phases 1 and 2 are about **driving** a flow: `ctx.call` is a synchronous request
+that returns a value; the saga is a durable orchestration that retries and
+compensates. Phase 3 adds the **third, orthogonal** mechanism — a
+**fire-and-forget 1→N broadcast**. When a trip is **SETTLED**, the single domain
+fact "this trip completed" is published **once** as `TripCompleted` and consumed
+**independently** by three reactors that the saga knows nothing about.
+
+| Mechanism | Shape | Guarantee |
+| -------------------- | ---------------------------------- | -------------------------------------------------- |
+| `ctx.call` (Phase 1) | synchronous typed RPC | request/response |
+| Temporal saga (Phase 2) | durable orchestration + compensation | exactly-once, retried, **durable** |
+| **EventBus broadcast (Phase 3)** | **fire-and-forget 1→N** | at-least-once per subscriber, **non-durable**, order-agnostic |
+
+### Broadcast, not orchestration
+
+EventBus is used here for **broadcast only** — never to drive the trip. The
+publisher emits one event to one topic; three **independent** subscriber buses,
+each its **own consumer group**, each get their own copy:
+
+```
+ status = SETTLED (Phase 2 saga, unchanged)
+ │
+ ▼ NEW terminal activity (worker, full Node)
+ acts.publishTripCompleted({ tripId, userId, vehicleId, durationMs })
+ │ publisherBus.publish(TripCompletedSchema, …) // NO {topic}
+ │ topic resolved = "trips.completed" (proto option via publishes)
+ ▼
+ ┌──────────────────── topic: trips.completed ────────────────────┐
+ │ (NATS JetStream) │
+ └───────┬───────────────────────┬───────────────────────┬───────┘
+ │ group=cs-pricing │ group=cs-audit │ group=cs-notify
+ ▼ (own durable consumer) ▼ (own durable consumer)▼ (own durable consumer)
+ ┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐
+ │ PRICING/ANALYTICS │ │ AUDIT-LOG │ │ NOTIFICATIONS │
+ │ trip count + revenue│ │ append 1 record │ │ "receipt sent" │
+ └──────────────────┘ └─────────────────┘ └──────────────────┘
+ each reacts on its own, order-agnostic, failure-isolated
+```
+
+Two independent reasons this is **fan-out** and not load-balance:
+
+1. **Framework-level** — two routes resolving to the same topic **cannot share
+ one bus** (the EventBus throws a *duplicate-topic* error at `start()`), so the
+ three reactors on `trips.completed` are **forced** onto three separate buses.
+2. **Broker-level** — each bus uses a **distinct consumer group**, so NATS
+ JetStream gives each its own **durable consumer** → every reactor receives
+ every event. A **shared** group would load-balance (one reactor steals each
+ event) = a queue, not a broadcast.
+
+So the rule the example teaches: **N independent consumers ⇒ N buses, each its
+own group** — never one bus with N handlers on the same topic.
+
+### Topic from the proto option (no raw strings)
+
+The topic is declared **once**, on the proto method, and used end-to-end with no
+hand-passed topic string:
+
+```proto
+service TripEventHandlers {
+ rpc OnTripCompleted(TripCompleted) returns (google.protobuf.Empty) {
+ option (connectum.events.v1.event).topic = "trips.completed";
+ }
+}
+```
+
+The publisher bus lists this service in **`publishes`**, which populates its
+publish-topic lookup from the proto option — so `publish(TripCompletedSchema, …)`
+resolves `trips.completed` with **no `{topic}` argument**. (A pure publisher has
+no subscriber `routes`, so without `publishes` the topic would silently fall back
+to the message `typeName`.) Each reactor registers the **same** service via its
+own bus's `routes`, subscribing to the same proto-resolved topic.
+
+### Failure semantics — a lost broadcast must never reverse a paid trip
+
+This is the crux of "EventBus is broadcast, Temporal is durability". The publish
+is a **success-only tail** placed **outside** the saga's `try/catch`, reached
+**only** when the trip is `SETTLED`, in its own `try/catch` that does **not**
+rethrow:
+
+```ts
+try {
+ …reserve → record → end → openTab → addCharge → settle…
+ status = TripStatus.SETTLED;
+} catch (err) {
+ …run compensations LIFO…; status = TripStatus.CANCELLED; throw …; // rethrows
+}
+// SUCCESS-ONLY TAIL — reached only when status === SETTLED:
+try {
+ await broadcast.publishTripCompleted({ tripId, userId, vehicleId, durationMs });
+} catch (err) {
+ log.warn("broadcast failed; trip stays SETTLED", { tripId });
+}
+return status;
+```
+
+Because this block is reached only on success and its `catch` does not rethrow, a
+failed or timed-out broadcast **can never trigger compensation** on a settled,
+paid trip. The reactors' work (an analytics tally, an audit line, a receipt) is
+non-critical and reconstructable — losing one is acceptable; reversing a settled
+financial trip is not. The compensation guard is this **structural placement**,
+not the activity's retry count.
+
+> **Rule:** if a side effect must be **durable / retried**, it belongs in
+> **Temporal** (its own activity with a retry policy), **not** on the EventBus.
+> Don't copy the swallow into a place that needs durability.
+
+### Idempotent reactors
+
+On a real broker the broadcast is **at-least-once**: a worker crash after publish
+but before ack re-fires the reactors, and a redelivery would otherwise
+double-count an analytics tally or send a second receipt. Each reactor therefore
+**dedupes by `tripId`** in its in-memory store. This is the reason broadcast
+reactors are written idempotent — broadcast trades durability for decoupling, and
+idempotency absorbs the resulting redelivery.
+
+### Processes and the dockerless test
+
+Same "one image, role by env" story as the RPC roles: the worker hosts the
+**publish-only** bus, and each reactor is `node src/reactor.ts` with a `REACTOR`
+selector picking its route and group.
+
+| Process | Entry | Bus |
+| ------------------ | ---------------------------------- | ----------------------------------------- |
+| Worker (publisher) | `node src/worker.ts` | publish-only (`publishes: [TripEventHandlers]`) |
+| `reactor-pricing` | `REACTOR=pricing node src/reactor.ts` | route `OnTripCompleted`, group `cs-pricing` |
+| `reactor-audit` | `REACTOR=audit node src/reactor.ts` | route `OnTripCompleted`, group `cs-audit` |
+| `reactor-notify` | `REACTOR=notify node src/reactor.ts` | route `OnTripCompleted`, group `cs-notify` |
+
+`tests/e2e/broadcast.test.ts` proves the fan-out **without a broker**: one shared
+`MemoryAdapter()` feeds the publisher bus and all three reactor buses, the real
+`publishTripCompleted` activity is driven through `MockActivityEnvironment`, and
+all three reactors are asserted to fire — with the **full five-field
+`TripCompleted` shape** verified field-by-field against the documented contract.
+The workflow test additionally proves the broadcast runs **only** on the SETTLED
+path (never on a compensated run) and that a **failed** broadcast leaves the trip
+`SETTLED` with no compensation.
+
+### Run it and watch the fan-out
+
+```bash
+docker compose up -d postgres
+DATABASE_URL=postgresql://car_sharing:car_sharing@localhost:5432/car_sharing pnpm db:migrate
+DATABASE_URL=postgresql://car_sharing:car_sharing@localhost:5432/car_sharing pnpm db:seed
+docker compose --profile saga up --build # roles + worker + Temporal + NATS + 3 reactors
+```
+
+Start a trip on the `trips` role (`:5002`); when the saga settles, all **three**
+reactor logs each print their independent reaction to the one `trips.completed`
+event. Stop one reactor and replay — the others still receive (broadcast, not
+steal).
+
+> **Version note.** This phase uses the `publishes` option on `createEventBus`,
+> added after `@connectum/events@1.0.0`. Until events `>= 1.1.0` is published,
+> the example typechecks and runs against the **local** packages (the repo's
+> local-package test flow), not the published `1.0.0` — against `1.0.0` the topic
+> would fall back to the message `typeName`. Run with the local packages while
+> the option is unreleased.
+
## Build the image
```bash
@@ -376,7 +537,7 @@ Every role sets `OTEL_EXPORTER_OTLP_ENDPOINT` in its ConfigMap, which turns on t
`@connectum/otel` server interceptor (the app enables OTel only when that env var
is present — see `src/observability.ts`). Spans carry a `connectum.transport`
attribute distinguishing in-process `ctx.call`s from network hops, and
-`trustRemote: true` stitches gateway → fleet → billing into one distributed trace.
+`trustRemote: true` stitches gateway → fleet (and worker → fleet/trips/billing activities) into one distributed trace.
Point `OTEL_EXPORTER_OTLP_ENDPOINT` at your Collector (the manifests assume
`otel-collector.observability.svc.cluster.local:4317`).
@@ -384,30 +545,32 @@ Point `OTEL_EXPORTER_OTLP_ENDPOINT` at your Collector (the manifests assume
```
car-sharing/
-├── proto/ fleet, trips, billing protos (+ auth options import)
+├── proto/ fleet, trips, billing protos + trips/v1/trip_events.proto (TripCompleted + topic option)
├── src/
│ ├── db/ Drizzle: schema.ts, client.ts (Db DI), seed.ts
+│ ├── events/ eventBus.ts (publisher + reactor bus factories), reactors.ts (3 idempotent reactors)
│ ├── services/ fleetService (Drizzle), billingService, tripService
-│ ├── temporal/ TripWorkflow, activities, workflowClient, clients, config, tripStatus
+│ ├── temporal/ TripWorkflow (+ broadcast tail), activities (+ publishTripCompleted), workflowClient, clients, config, tripStatus
│ ├── topology.ts env → mono/split (SERVICES + perServiceEnvResolver)
│ ├── auth.ts JWT + proto authz interceptors (uniform chain)
│ ├── observability.ts env-gated OpenTelemetry wiring
│ ├── server.ts buildServer() — services + catalog + interceptors + db
-│ ├── worker.ts Temporal worker process (bundles workflow via swc; `pnpm worker`)
+│ ├── worker.ts Temporal worker process (bundles workflow via swc; builds the publish-only bus; `pnpm worker`)
+│ ├── reactor.ts broadcast subscriber process (role by REACTOR=pricing|audit|notify)
│ └── index.ts entry point (role by SERVICES)
├── drizzle/ generated SQL migrations (single source of truth)
├── drizzle.config.ts drizzle-kit config (schema → migrations / push)
├── tests/
│ ├── helpers/db.ts PGlite test db (migrate + seed), injected via DI
-│ ├── workflow/ TripWorkflow: forward order + reverse compensation (mocked activities)
+│ ├── workflow/ TripWorkflow: forward order + reverse compensation + broadcast tail (mocked activities)
│ ├── activity/ Activity bodies: RPC wiring + compensation idempotency (in-process monolith)
-│ └── e2e/ e2e.test.ts (gateway/pre-check/auth, stub workflow client) + fleet.test.ts (persistence)
+│ └── e2e/ e2e.test.ts (gateway/pre-check/auth, stub workflow client) + fleet.test.ts (persistence) + broadcast.test.ts (fan-out, MemoryAdapter)
├── k8s/ namespace, rbac, secret, configmap, deployments,
│ services, hpa
├── istio/ peer-auth, authz, destination-rule, virtual-service,
│ gateway, canary
-├── docker-compose.yml profiles: `mono` (monolith + Postgres), `saga` (roles + worker + Temporal + Postgres)
+├── docker-compose.yml profiles: `mono` (monolith + Postgres), `saga` (roles + worker + Temporal + Postgres + NATS + 3 reactors)
├── Dockerfile one multi-stage image, role by env
-├── buf.yaml / buf.gen.yaml dual-module (own protos + auth options) + catalog plugin
+├── buf.yaml / buf.gen.yaml multi-module (own protos + auth + events options) + catalog plugin
├── package.json / tsconfig.json / pnpm-workspace.yaml
```
diff --git a/car-sharing/buf.yaml b/car-sharing/buf.yaml
index 76c4065..bd83a13 100644
--- a/car-sharing/buf.yaml
+++ b/car-sharing/buf.yaml
@@ -7,6 +7,11 @@ modules:
# `import "connectum/auth/v1/options.proto"` in the service protos resolves.
# `pnpm install` MUST run before `buf generate` (it populates node_modules).
- path: node_modules/@connectum/auth/proto
+ # The events options.proto (connectum.events.v1.event), shipped inside
+ # @connectum/events. Declared as a workspace module so the
+ # `import "connectum/events/v1/options.proto"` in trip_events.proto resolves —
+ # the same mechanism as the auth module above (no BSR / vendoring).
+ - path: node_modules/@connectum/events/proto
lint:
use:
- STANDARD
@@ -14,6 +19,9 @@ lint:
- RPC_REQUEST_STANDARD_NAME
- RPC_RESPONSE_STANDARD_NAME
- RPC_REQUEST_RESPONSE_UNIQUE
+ # TripEventHandlers is an event-handler service (each RPC is a subscription),
+ # so the conventional "...Service" suffix does not apply.
+ - SERVICE_SUFFIX
breaking:
use:
- FILE
diff --git a/car-sharing/docker-compose.yml b/car-sharing/docker-compose.yml
index b9f1530..d0953e7 100644
--- a/car-sharing/docker-compose.yml
+++ b/car-sharing/docker-compose.yml
@@ -131,6 +131,28 @@ services:
- car-sharing
profiles: ["saga"]
+ # ── NATS (JetStream) — the broadcast broker for Phase 3 ────────────────────
+ #
+ # The trip saga publishes ONE `trips.completed` event on settle; the three
+ # reactor processes (below) each subscribe with a DISTINCT consumer group, so
+ # each gets its OWN durable consumer = fan-out (a shared group would load-
+ # balance / steal). JetStream (`-js`) is REQUIRED because the NATS adapter uses
+ # durable consumers. `-m 8222` exposes the monitoring endpoint for the probe.
+ nats:
+ image: nats:2.10-alpine
+ command: ["-js", "-m", "8222"]
+ ports:
+ - "4222:4222"
+ healthcheck:
+ test: ["CMD", "wget", "-q", "--spider", "http://localhost:8222/healthz"]
+ interval: 5s
+ timeout: 3s
+ retries: 10
+ start_period: 5s
+ networks:
+ - car-sharing
+ profiles: ["saga"]
+
# ── FLEET role — the persistent leaf (Drizzle + app Postgres) ──────────────
fleet:
build: .
@@ -227,6 +249,9 @@ services:
- FLEET_ADDR=http://fleet:5001
- TRIPS_ADDR=http://trips:5002
- BILLING_ADDR=http://billing:5003
+ # The worker hosts the publish-only EventBus: on settle it broadcasts ONE
+ # `trips.completed` event here (the reactors below consume it).
+ - NATS_URL=nats://nats:4222
depends_on:
temporal:
condition: service_healthy
@@ -236,6 +261,54 @@ services:
condition: service_healthy
billing:
condition: service_healthy
+ nats:
+ condition: service_healthy
+ networks:
+ - car-sharing
+ profiles: ["saga"]
+
+ # ── REACTORS — three INDEPENDENT broadcast subscribers (node src/reactor.ts) ─
+ #
+ # Each is the SAME image with a different `REACTOR` selector, on its OWN
+ # consumer group (cs-pricing / cs-audit / cs-notify), so NATS gives each its
+ # own durable consumer and every reactor receives every `trips.completed` =
+ # fan-out. Co-hosting all three buses in one process would change the process
+ # count, NOT the fan-out semantics (still three durable consumers).
+ reactor-pricing:
+ build: .
+ command: ["node", "src/reactor.ts"]
+ environment:
+ - REACTOR=pricing
+ - NATS_URL=nats://nats:4222
+ depends_on:
+ nats:
+ condition: service_healthy
+ networks:
+ - car-sharing
+ profiles: ["saga"]
+
+ reactor-audit:
+ build: .
+ command: ["node", "src/reactor.ts"]
+ environment:
+ - REACTOR=audit
+ - NATS_URL=nats://nats:4222
+ depends_on:
+ nats:
+ condition: service_healthy
+ networks:
+ - car-sharing
+ profiles: ["saga"]
+
+ reactor-notify:
+ build: .
+ command: ["node", "src/reactor.ts"]
+ environment:
+ - REACTOR=notify
+ - NATS_URL=nats://nats:4222
+ depends_on:
+ nats:
+ condition: service_healthy
networks:
- car-sharing
profiles: ["saga"]
diff --git a/car-sharing/package.json b/car-sharing/package.json
index eb3dc54..d58c107 100644
--- a/car-sharing/package.json
+++ b/car-sharing/package.json
@@ -49,6 +49,8 @@
"@connectrpc/connect-node": "^2.1.1",
"@connectum/auth": "^1.0.0",
"@connectum/core": "^1.0.0",
+ "@connectum/events": "^1.0.0",
+ "@connectum/events-nats": "^1.0.0",
"@connectum/healthcheck": "^1.0.0",
"@connectum/interceptors": "^1.0.0",
"@connectum/otel": "^1.0.0",
diff --git a/car-sharing/proto/trips/v1/trip_events.proto b/car-sharing/proto/trips/v1/trip_events.proto
new file mode 100644
index 0000000..b00b467
--- /dev/null
+++ b/car-sharing/proto/trips/v1/trip_events.proto
@@ -0,0 +1,45 @@
+syntax = "proto3";
+
+package trips.v1;
+
+import "connectum/events/v1/options.proto";
+import "google/protobuf/empty.proto";
+
+// TripCompleted is the integration event broadcast EXACTLY ONCE when a trip's
+// billing tab is SETTLED (the saga's terminal happy-path state). It is consumed
+// INDEPENDENTLY by three reactors — pricing/analytics, audit-log, and
+// notifications — each on its own EventBus with its own consumer group, so the
+// single domain fact fans out 1→N rather than being orchestrated.
+//
+// EventBus is used here for BROADCAST only; durability/ordering stays Temporal's
+// job. A lost broadcast must NEVER roll back a settled, paid trip.
+message TripCompleted {
+ // The trip's id (also the reactor idempotency key).
+ string trip_id = 1;
+ // The renter who took the trip — the notification target.
+ string user_id = 2;
+ // The vehicle that was driven.
+ string vehicle_id = 3;
+ // The settled charge in minor units (cents), for analytics / the receipt.
+ int64 amount_cents = 4;
+ // The drive duration in milliseconds, for analytics.
+ int64 duration_ms = 5;
+}
+
+// TripEventHandlers is an event-handler service: each RPC is a subscription.
+// The (connectum.events.v1.event).topic option pins the topic so the PUBLISHER
+// (which lists this service in `publishes`) and EACH subscriber (which registers
+// this service via `routes` on its OWN bus) agree on the topic from the proto
+// option alone — no raw topic string is ever hand-passed to publish().
+//
+// One shared service is reused by the publisher and all three reactors;
+// independence between reactors comes from SEPARATE buses + DISTINCT consumer
+// groups, not from separate proto services (registering the same service on
+// three separate buses is fine — the duplicate-topic guard is per-bus).
+service TripEventHandlers {
+ // OnTripCompleted reacts to a settled trip. Each reactor binds its OWN handler
+ // function to this RPC on its OWN bus.
+ rpc OnTripCompleted(TripCompleted) returns (google.protobuf.Empty) {
+ option (connectum.events.v1.event).topic = "trips.completed";
+ }
+}
diff --git a/car-sharing/src/events/eventBus.ts b/car-sharing/src/events/eventBus.ts
new file mode 100644
index 0000000..92f222e
--- /dev/null
+++ b/car-sharing/src/events/eventBus.ts
@@ -0,0 +1,104 @@
+/**
+ * EventBus factories for the Phase 3 broadcast — ONE publisher, THREE reactors.
+ *
+ * Phase 3 adds the third orthogonal interaction mechanism to the example:
+ * fire-and-forget 1→N broadcast (`ctx.call` is sync RPC; the Temporal saga is
+ * durable orchestration). The single domain fact "a trip was settled" is
+ * published ONCE as `TripCompleted` and consumed INDEPENDENTLY by three
+ * reactors. EventBus is used for BROADCAST only — never orchestration.
+ *
+ * The fan-out is achieved with FOUR separate buses, not one bus with three
+ * handlers:
+ *
+ * - the PUBLISHER bus is publish-only (`routes: []`, `publishes:
+ * [TripEventHandlers]`). Listing the event service in `publishes` populates
+ * the publish-topic lookup from the proto `(connectum.events.v1.event).topic`
+ * option, so `publish(TripCompletedSchema, …)` resolves `trips.completed`
+ * with NO raw `{topic}` hand-passed. This is the whole point of `publishes`:
+ * a pure publisher has no subscriber routes, so without it the topic would
+ * silently fall back to the message `typeName`.
+ * - each REACTOR bus has ONE route and its OWN DISTINCT consumer group. Two
+ * routes resolving to the same topic CANNOT share one bus (the duplicate-
+ * topic guard throws at `start()`), so three reactors on `trips.completed`
+ * are FORCED onto three buses. On a real broker, distinct groups give
+ * distinct durable consumers → each gets every event = fan-out. A shared
+ * group would load-balance (one reactor steals each event) = queue.
+ *
+ * The adapter is pluggable: NATS by default (`NATS_URL`), but tests pass ONE
+ * shared `MemoryAdapter()` to all four buses so a single publish reaches all
+ * three reactors in-process with no broker (MemoryAdapter broadcasts to every
+ * matching subscription and ignores group; the distinct groups are still
+ * written so the SAME wiring fans out on NATS).
+ *
+ * @module events/eventBus
+ */
+
+import { createEventBus } from "@connectum/events";
+import type { EventAdapter, EventBus, EventRoute } from "@connectum/events";
+import type { EventBusLike } from "@connectum/core";
+import { NatsAdapter } from "@connectum/events-nats";
+import { TripEventHandlers } from "#gen/trips/v1/trip_events_pb.ts";
+
+/** A reactor's stable identity: which side effect + which consumer group. */
+export const REACTOR_GROUP = {
+ /** Pricing / analytics reactor — tallies trip count + revenue. */
+ pricing: "cs-pricing",
+ /** Audit-log reactor — appends one immutable record per settled trip. */
+ audit: "cs-audit",
+ /** Notifications reactor — "sends" a receipt to the renter. */
+ notify: "cs-notify",
+} as const;
+
+/** One of the reactor selector keys (`pricing` | `audit` | `notify`). */
+export type ReactorKey = keyof typeof REACTOR_GROUP;
+
+/** A bus that can both `publish` (the `EventBus` API) and start/stop (`EventBusLike`). */
+export type ManagedBus = EventBus & EventBusLike;
+
+/** Default NATS JetStream stream for the example's broadcast topic. */
+const NATS_STREAM = "car-sharing";
+
+/**
+ * Build the NATS adapter for a process, reading `NATS_URL` (default
+ * `nats://localhost:4222`). One adapter instance per bus in the split topology,
+ * so each reactor owns an independent broker connection + durable consumer.
+ */
+function natsAdapter(): EventAdapter {
+ return NatsAdapter({ servers: process.env.NATS_URL ?? "nats://localhost:4222", stream: NATS_STREAM });
+}
+
+/**
+ * Build the PUBLISH-ONLY bus that the worker (the saga's terminal activity) uses
+ * to broadcast `TripCompleted`.
+ *
+ * It has NO routes and lists `TripEventHandlers` in `publishes`, so the topic
+ * `trips.completed` is resolved from the proto option — the publisher passes NO
+ * raw topic.
+ *
+ * @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`);
+ * defaults to a NATS adapter from `NATS_URL`.
+ */
+export function buildPublisherBus(options: { readonly adapter?: EventAdapter } = {}): ManagedBus {
+ return createEventBus({
+ adapter: options.adapter ?? natsAdapter(),
+ routes: [],
+ publishes: [TripEventHandlers],
+ });
+}
+
+/**
+ * Build ONE reactor bus: a single route subscribed to `trips.completed` (topic
+ * from the route's proto option) under its OWN distinct consumer group.
+ *
+ * @param options.key - The reactor selector, fixing its consumer group.
+ * @param options.route - The reactor's event route (its `OnTripCompleted` handler).
+ * @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`);
+ * defaults to a NATS adapter from `NATS_URL`.
+ */
+export function buildReactorBus(options: { readonly key: ReactorKey; readonly route: EventRoute; readonly adapter?: EventAdapter }): ManagedBus {
+ return createEventBus({
+ adapter: options.adapter ?? natsAdapter(),
+ routes: [options.route],
+ group: REACTOR_GROUP[options.key],
+ });
+}
diff --git a/car-sharing/src/events/reactors.ts b/car-sharing/src/events/reactors.ts
new file mode 100644
index 0000000..09267e6
--- /dev/null
+++ b/car-sharing/src/events/reactors.ts
@@ -0,0 +1,175 @@
+/**
+ * The three independent `TripCompleted` reactors and their in-memory state.
+ *
+ * Each reactor is a SEPARATE consumer of the ONE `trips.completed` broadcast:
+ *
+ * - PRICING / ANALYTICS — tallies trip count + total settled revenue.
+ * - AUDIT-LOG — appends one immutable record per settled trip.
+ * - NOTIFICATIONS — "sends" a receipt to the renter.
+ *
+ * Every reactor binds its OWN handler function to the SAME `TripEventHandlers`
+ * service descriptor, but on its OWN bus with its OWN consumer group (built in
+ * `eventBus.ts`). Reusing one proto service across three buses is fine — the
+ * duplicate-topic guard is per-bus — and independence comes from the separate
+ * buses + distinct groups, not from separate proto services.
+ *
+ * IDEMPOTENCY — every reactor dedupes by `tripId`. On a real broker the
+ * broadcast is at-least-once (a worker crash after publish-before-ack re-fires
+ * the reactors), and a redelivery would otherwise double-count an analytics
+ * tally or send a second receipt. Deduping by `tripId` absorbs the redelivery
+ * and also makes the dockerless test deterministic. This is the reason broadcast
+ * reactors are written idempotent (durable/ordered delivery is Temporal's job,
+ * not the EventBus's).
+ *
+ * `reset*()` / inspect helpers are the test seams (mirroring the billing/payroll
+ * services' `reset*`/count helpers).
+ *
+ * @module events/reactors
+ */
+
+import type { EventRoute } from "@connectum/events";
+import type { TripCompleted } from "#gen/trips/v1/trip_events_pb.ts";
+import { TripEventHandlers } from "#gen/trips/v1/trip_events_pb.ts";
+
+// ── Pricing / analytics reactor ─────────────────────────────────────────────
+
+/** Trip ids already counted by the pricing reactor (idempotency set). */
+const pricingSeen = new Set();
+/** Running tally of settled trips (deduped by `tripId`). */
+let pricingTripCount = 0;
+/** Running tally of settled revenue in minor units (cents). */
+let pricingRevenueCents = 0n;
+
+/** Number of distinct trips the pricing reactor has tallied. */
+export function pricingTripCountValue(): number {
+ return pricingTripCount;
+}
+
+/** Total settled revenue (cents) the pricing reactor has accumulated. */
+export function pricingRevenueCentsValue(): bigint {
+ return pricingRevenueCents;
+}
+
+/** Reset the pricing reactor's state — used between tests. */
+export function resetPricing(): void {
+ pricingSeen.clear();
+ pricingTripCount = 0;
+ pricingRevenueCents = 0n;
+}
+
+/**
+ * Pricing/analytics route: on each `TripCompleted`, tally the trip and its
+ * revenue ONCE per `tripId` (a redelivery is a no-op).
+ */
+export const pricingReactorRoutes: EventRoute = (events) => {
+ events.service(TripEventHandlers, {
+ async onTripCompleted(event, ctx) {
+ if (!pricingSeen.has(event.tripId)) {
+ pricingSeen.add(event.tripId);
+ pricingTripCount += 1;
+ pricingRevenueCents += event.amountCents;
+ }
+ await ctx.ack();
+ },
+ });
+};
+
+// ── Audit-log reactor ───────────────────────────────────────────────────────
+
+/** One immutable audit record per settled trip (the full event shape). */
+export interface AuditRecord {
+ readonly tripId: string;
+ readonly userId: string;
+ readonly vehicleId: string;
+ readonly amountCents: bigint;
+ readonly durationMs: bigint;
+}
+
+/** Append-only audit log (one record per distinct `tripId`). */
+const auditLog: AuditRecord[] = [];
+/** Trip ids already audited (idempotency set). */
+const auditSeen = new Set();
+
+/** A snapshot copy of the audit log (newest last). */
+export function auditRecords(): readonly AuditRecord[] {
+ return [...auditLog];
+}
+
+/** Reset the audit reactor's state — used between tests. */
+export function resetAudit(): void {
+ auditLog.length = 0;
+ auditSeen.clear();
+}
+
+/**
+ * Audit-log route: append the FULL `TripCompleted` shape ONCE per `tripId`. The
+ * record is the documented contract (all five fields), so it doubles as the
+ * end-to-end shape oracle in the broadcast test.
+ */
+export const auditReactorRoutes: EventRoute = (events) => {
+ events.service(TripEventHandlers, {
+ async onTripCompleted(event: TripCompleted, ctx) {
+ if (!auditSeen.has(event.tripId)) {
+ auditSeen.add(event.tripId);
+ auditLog.push({
+ tripId: event.tripId,
+ userId: event.userId,
+ vehicleId: event.vehicleId,
+ amountCents: event.amountCents,
+ durationMs: event.durationMs,
+ });
+ }
+ await ctx.ack();
+ },
+ });
+};
+
+// ── Notifications reactor ───────────────────────────────────────────────────
+
+/** A "sent" receipt notification (who it targeted + the charged amount). */
+export interface SentNotification {
+ readonly tripId: string;
+ readonly userId: string;
+ readonly amountCents: bigint;
+}
+
+/** Receipts the notifications reactor has "sent" (one per distinct `tripId`). */
+const sentNotifications: SentNotification[] = [];
+/** Trip ids already notified (idempotency set). */
+const notifySeen = new Set();
+
+/** A snapshot copy of the receipts the notifications reactor has sent. */
+export function sentReceipts(): readonly SentNotification[] {
+ return [...sentNotifications];
+}
+
+/** Reset the notifications reactor's state — used between tests. */
+export function resetNotify(): void {
+ sentNotifications.length = 0;
+ notifySeen.clear();
+}
+
+/**
+ * Notifications route: "send" a receipt to the renter ONCE per `tripId`. A
+ * redelivery must NOT send a second receipt — hence the idempotency set.
+ */
+export const notifyReactorRoutes: EventRoute = (events) => {
+ events.service(TripEventHandlers, {
+ async onTripCompleted(event, ctx) {
+ if (!notifySeen.has(event.tripId)) {
+ notifySeen.add(event.tripId);
+ sentNotifications.push({ tripId: event.tripId, userId: event.userId, amountCents: event.amountCents });
+ }
+ await ctx.ack();
+ },
+ });
+};
+
+// ── Combined reset (test convenience) ───────────────────────────────────────
+
+/** Reset all three reactors' in-memory state. */
+export function resetAllReactors(): void {
+ resetPricing();
+ resetAudit();
+ resetNotify();
+}
diff --git a/car-sharing/src/reactor.ts b/car-sharing/src/reactor.ts
new file mode 100644
index 0000000..026cb5e
--- /dev/null
+++ b/car-sharing/src/reactor.ts
@@ -0,0 +1,61 @@
+/**
+ * Reactor entry — a long-lived `TripCompleted` broadcast subscriber.
+ *
+ * One image, role by env: `REACTOR=pricing|audit|notify` selects WHICH reactor
+ * route this process mounts and WHICH consumer group it joins. Each reactor runs
+ * as its OWN process with its OWN EventBus + DISTINCT group, so on NATS each gets
+ * its own durable consumer → every process receives every `trips.completed`
+ * event = fan-out (a shared group would load-balance / steal instead).
+ *
+ * This mirrors `index.ts` (role by `SERVICES`) and `worker.ts` (the saga host):
+ * the same binary, a different process boundary chosen by env. Co-hosting all
+ * three reactor buses in one process would change the process count but NOT the
+ * fan-out semantics (still three durable consumers).
+ *
+ * The bus is NATS-backed (`NATS_URL`); there is no inbound RPC and no HTTP
+ * server — a reactor only subscribes. SIGINT/SIGTERM stop the bus cleanly.
+ *
+ * @module reactor
+ */
+
+import type { EventRoute } from "@connectum/events";
+import { buildReactorBus } from "#events/eventBus.ts";
+import type { ManagedBus, ReactorKey } from "#events/eventBus.ts";
+import { auditReactorRoutes, notifyReactorRoutes, pricingReactorRoutes } from "#events/reactors.ts";
+
+/** Map each reactor selector to its route. The group is fixed by the key in `buildReactorBus`. */
+const REACTOR_ROUTES: Readonly> = {
+ pricing: pricingReactorRoutes,
+ audit: auditReactorRoutes,
+ notify: notifyReactorRoutes,
+};
+
+/** Read + validate the `REACTOR` selector from env. */
+function selectReactor(): ReactorKey {
+ const key = process.env.REACTOR;
+ if (key === "pricing" || key === "audit" || key === "notify") {
+ return key;
+ }
+ throw new Error(`REACTOR must be one of pricing|audit|notify (got ${key === undefined ? "" : `"${key}"`})`);
+}
+
+async function main(): Promise {
+ const key = selectReactor();
+ const bus: ManagedBus = buildReactorBus({ key, route: REACTOR_ROUTES[key] });
+
+ await bus.start();
+ console.log(`car-sharing reactor ready — REACTOR=${key} topic=trips.completed nats=${process.env.NATS_URL ?? "nats://localhost:4222"}`);
+
+ const stop = async (): Promise => {
+ await bus.stop();
+ console.log(`car-sharing reactor stopped — REACTOR=${key}`);
+ process.exit(0);
+ };
+ process.on("SIGINT", () => void stop());
+ process.on("SIGTERM", () => void stop());
+}
+
+main().catch((err: unknown) => {
+ console.error("car-sharing reactor error:", err);
+ process.exitCode = 1;
+});
diff --git a/car-sharing/src/temporal/activities.ts b/car-sharing/src/temporal/activities.ts
index 00be09b..80ae6ec 100644
--- a/car-sharing/src/temporal/activities.ts
+++ b/car-sharing/src/temporal/activities.ts
@@ -28,7 +28,10 @@ import { Code, ConnectError } from "@connectrpc/connect";
import { ApplicationFailure } from "@temporalio/activity";
import { AddChargeRequestSchema, OpenTabRequestSchema, RefundChargeRequestSchema, SettleRequestSchema, VoidTabRequestSchema } from "#gen/billing/v1/billing_pb.ts";
import { ReleaseVehicleRequestSchema, ReserveVehicleRequestSchema } from "#gen/fleet/v1/fleet_pb.ts";
+import { TripCompletedSchema } from "#gen/trips/v1/trip_events_pb.ts";
import { EndTripRequestSchema, RecordTripRequestSchema } from "#gen/trips/v1/trips_pb.ts";
+import type { ManagedBus } from "#events/eventBus.ts";
+import { buildPublisherBus } from "#events/eventBus.ts";
import type { ServiceClients } from "#temporal/clients.ts";
import { createServiceClients } from "#temporal/clients.ts";
import { TripStatus } from "#temporal/tripStatus.ts";
@@ -59,6 +62,39 @@ function clients(): ServiceClients {
return sharedClients;
}
+/**
+ * The publish-only EventBus used by `publishTripCompleted` to broadcast
+ * `TripCompleted`. Injected once (the worker builds + STARTS it before
+ * `worker.run()`; tests inject a `MemoryAdapter`-backed bus), and lazily built
+ * from `NATS_URL` if never injected — the same seam as {@link clients}.
+ */
+let publisherBus: ManagedBus | undefined;
+
+/**
+ * Inject the publisher bus (the worker passes its STARTED NATS bus; tests pass a
+ * started `MemoryAdapter`-backed bus). The caller owns the bus lifecycle
+ * (`start()`/`stop()`); this activity only `publish()`es on it.
+ *
+ * @param bus - A started publish-only bus, or `undefined` to reset (tests).
+ */
+export function setPublisherBus(bus: ManagedBus | undefined): void {
+ publisherBus = bus;
+}
+
+/**
+ * Get the publisher bus, building + STARTING a NATS-backed one lazily if none
+ * was injected. The lazily-built bus is cached for the worker's lifetime; it is
+ * NOT stopped here (the worker stops the injected one in its `finally`).
+ */
+async function getPublisherBus(): Promise {
+ if (publisherBus === undefined) {
+ const bus = buildPublisherBus();
+ await bus.start();
+ publisherBus = bus;
+ }
+ return publisherBus;
+}
+
/** Compute a demo charge (in cents) from a trip's duration in milliseconds. */
function chargeCents(durationMs: number): bigint {
const seconds = Math.max(1, Math.ceil(durationMs / 1000));
@@ -140,3 +176,44 @@ export async function refundCharge(input: { tripId: string; chargeId: string }):
export async function settle(input: { tripId: string }): Promise {
await clients().billing.settle(create(SettleRequestSchema, { tripId: input.tripId }));
}
+
+// ── Terminal broadcast (Phase 3) ────────────────────────────────────────────
+
+/**
+ * Broadcast `TripCompleted` ONCE, after the trip is SETTLED.
+ *
+ * This is the saga's TERMINAL side effect: a fire-and-forget 1→N broadcast on
+ * the publish-only EventBus, fanned out to three independent reactors. It is the
+ * THIRD interaction mechanism (alongside sync `ctx.call` and the durable saga).
+ *
+ * Topic resolution: the bus lists `TripEventHandlers` in `publishes`, so
+ * `publish(TripCompletedSchema, …)` resolves `trips.completed` from the proto
+ * `(connectum.events.v1.event).topic` option — NO raw `{topic}` is passed.
+ *
+ * `amountCents` is RECOMPUTED here via the same `chargeCents(durationMs)` the
+ * billing charge used, because the workflow only carries `durationMs` (the
+ * saga's `addCharge` returns just the charge id). Recompute == the settled
+ * charge, so the event still carries all five documented fields.
+ *
+ * Failure semantics: this activity is invoked from a SUCCESS-ONLY workflow tail
+ * OUTSIDE the saga's try/catch (see `workflows.ts`); a failed broadcast can
+ * never reach compensation. The reactors' work (analytics / audit / receipt) is
+ * non-durable and reconstructable — losing one is acceptable; reversing a
+ * settled, paid trip is not. Anything that needs durable delivery belongs in
+ * Temporal (its own activity with retry), NOT on the EventBus.
+ *
+ * @param input - `{ tripId, userId, vehicleId, durationMs }`.
+ */
+export async function publishTripCompleted(input: { tripId: string; userId: string; vehicleId: string; durationMs: number }): Promise {
+ const bus = await getPublisherBus();
+ await bus.publish(
+ TripCompletedSchema,
+ create(TripCompletedSchema, {
+ tripId: input.tripId,
+ userId: input.userId,
+ vehicleId: input.vehicleId,
+ amountCents: chargeCents(input.durationMs),
+ durationMs: BigInt(input.durationMs),
+ }),
+ );
+}
diff --git a/car-sharing/src/temporal/workflows.ts b/car-sharing/src/temporal/workflows.ts
index a468269..d0b3a2c 100644
--- a/car-sharing/src/temporal/workflows.ts
+++ b/car-sharing/src/temporal/workflows.ts
@@ -66,6 +66,20 @@ const acts = proxyActivities({
},
});
+/**
+ * SEPARATE proxy for the terminal broadcast, so its retry level is independent
+ * of the saga steps. `maximumAttempts` here is a delivery-vs-double-fire DIAL,
+ * NOT a safety lever: more attempts = better delivery but at-least-once
+ * duplication (absorbed by idempotent reactors); fewer = lost-on-blip. The
+ * compensation guard is the SUCCESS-ONLY tail below (outside the saga
+ * try/catch), NOT this attempt count — a failed broadcast can never roll back a
+ * settled trip regardless of the value here.
+ */
+const broadcast = proxyActivities({
+ startToCloseTimeout: "30 seconds",
+ retry: { initialInterval: "1 second", maximumAttempts: 3 },
+});
+
/** Simulated drive duration (time-skipped instantly in tests). */
const DRIVE_DURATION_MS = 60_000;
@@ -114,8 +128,6 @@ export async function TripWorkflow(input: TripWorkflowInput): Promise {
const connection = await NativeConnection.connect({ address: TEMPORAL_ADDRESS });
+ // Build + start the publish-only EventBus (Phase 3 broadcast) and inject it
+ // into the activities module BEFORE the worker polls, so the terminal
+ // `publishTripCompleted` activity publishes on a ready bus. The worker owns
+ // its lifecycle: stopped in the `finally` below.
+ const publisherBus = buildPublisherBus();
+ await publisherBus.start();
+ activities.setPublisherBus(publisherBus);
try {
const worker = await Worker.create({
connection,
@@ -36,6 +44,7 @@ async function main(): Promise {
console.log(`car-sharing temporal worker ready — taskQueue=${TEMPORAL_TASK_QUEUE} namespace=${TEMPORAL_NAMESPACE} temporal=${TEMPORAL_ADDRESS}`);
await worker.run();
} finally {
+ await publisherBus.stop();
await connection.close();
}
}
diff --git a/car-sharing/tests/e2e/broadcast.test.ts b/car-sharing/tests/e2e/broadcast.test.ts
new file mode 100644
index 0000000..6ddea22
--- /dev/null
+++ b/car-sharing/tests/e2e/broadcast.test.ts
@@ -0,0 +1,153 @@
+/**
+ * Phase 3 EventBus broadcast / fan-out tests — DOCKERLESS.
+ *
+ * Proves the third interaction mechanism: ONE `TripCompleted` published on the
+ * saga's terminal step fans out to THREE INDEPENDENT reactors. No broker — one
+ * shared `MemoryAdapter()` feeds the publisher bus AND all three reactor buses,
+ * so a single publish reaches all three in-process (MemoryAdapter broadcasts to
+ * every matching subscription and ignores group; the distinct groups are still
+ * written so the SAME wiring fans out on NATS).
+ *
+ * Tests:
+ *
+ * 1. PRIMARY — drives the actual publish SITE: injects the publisher bus into
+ * the activities module (the same `setPublisherBus` seam the worker uses),
+ * runs the REAL `publishTripCompleted` activity body via
+ * `MockActivityEnvironment`, and asserts ALL THREE reactors fired with the
+ * FULL `TripCompleted` shape (contract-conformance: the audit record equals
+ * the documented five-field contract, not just "an event arrived"). This is
+ * the only test that ties the publish-site decision (D1) to behavior.
+ * 2. TOPIC — `resolveTopicName(TripEventHandlers.method.onTripCompleted)` is
+ * exactly `"trips.completed"`, pinning the topic to the proto option (not the
+ * `typeName` fallback). No raw `{topic}` is passed anywhere.
+ * 3. NEGATIVE — an off-topic subscriber (pattern `trips.other`) receives 0,
+ * proving the broadcast is scoped to `trips.completed`, not "everything".
+ * 4. IDEMPOTENCY — a redelivery of the same `tripId` does NOT double-apply.
+ *
+ * @module tests/e2e/broadcast
+ */
+
+import assert from "node:assert/strict";
+import { after, afterEach, before, beforeEach, describe, it } from "node:test";
+import { create, toBinary } from "@bufbuild/protobuf";
+import { MemoryAdapter, resolveTopicName } from "@connectum/events";
+import type { EventAdapter, EventSubscription } from "@connectum/events";
+import { MockActivityEnvironment } from "@temporalio/testing";
+import { TripCompletedSchema, TripEventHandlers } from "#gen/trips/v1/trip_events_pb.ts";
+import { buildPublisherBus, buildReactorBus } from "#events/eventBus.ts";
+import type { ManagedBus } from "#events/eventBus.ts";
+import { auditRecords, notifyReactorRoutes, auditReactorRoutes, pricingReactorRoutes, pricingRevenueCentsValue, pricingTripCountValue, resetAllReactors, sentReceipts } from "#events/reactors.ts";
+import * as activities from "#temporal/activities.ts";
+
+const env = new MockActivityEnvironment();
+
+/** Run a real activity body inside a mocked Temporal Activity Context. */
+function run(fn: (...args: A) => Promise, ...args: A): Promise {
+ return env.run(fn, ...args);
+}
+
+describe("Phase 3 broadcast: one TripCompleted fans out to three independent reactors (dockerless, MemoryAdapter)", () => {
+ let adapter: EventAdapter;
+ let publisher: ManagedBus;
+ let pricing: ManagedBus;
+ let audit: ManagedBus;
+ let notify: ManagedBus;
+
+ before(async () => {
+ // ONE shared in-memory adapter feeds all four buses (F8): a publish on
+ // the publisher bus reaches every reactor's subscription on the SAME
+ // adapter. Groups are ignored in-memory but written so the wiring fans
+ // out on NATS.
+ adapter = MemoryAdapter();
+ publisher = buildPublisherBus({ adapter });
+ pricing = buildReactorBus({ key: "pricing", route: pricingReactorRoutes, adapter });
+ audit = buildReactorBus({ key: "audit", route: auditReactorRoutes, adapter });
+ notify = buildReactorBus({ key: "notify", route: notifyReactorRoutes, adapter });
+ await Promise.all([publisher.start(), pricing.start(), audit.start(), notify.start()]);
+ // Inject the publisher bus into the activities module — the SAME seam
+ // the worker uses — so the REAL activity publishes on it.
+ activities.setPublisherBus(publisher);
+ });
+
+ beforeEach(() => {
+ resetAllReactors();
+ });
+
+ afterEach(() => {
+ resetAllReactors();
+ });
+
+ after(async () => {
+ // Stop ONLY after every assertion (the FIRST stop calls the shared
+ // adapter's disconnect(), which wipes all subscriptions).
+ activities.setPublisherBus(undefined);
+ await Promise.all([publisher.stop(), pricing.stop(), audit.stop(), notify.stop()]);
+ });
+
+ it("PRIMARY: the terminal publishTripCompleted activity broadcasts ONCE, all three reactors react with the FULL message shape", async () => {
+ // Drive the actual publish SITE: the real activity body, not a hand-built
+ // publish. `amountCents` is recomputed in the activity from durationMs
+ // (60_000ms → 60s × 5 cents/s = 300n).
+ await run(activities.publishTripCompleted, { tripId: "trip-x", userId: "u-1", vehicleId: "v-1", durationMs: 60_000 });
+
+ // ALL THREE reacted to the SINGLE publish.
+ assert.equal(pricingTripCountValue(), 1, "pricing reactor counted the trip");
+ assert.equal(pricingRevenueCentsValue(), 300n, "pricing reactor tallied the settled revenue");
+
+ assert.equal(sentReceipts().length, 1, "notifications reactor sent one receipt");
+ assert.equal(sentReceipts()[0]?.userId, "u-1", "the receipt targets the renter");
+
+ // Full-shape oracle (contract-conformance): the audit record equals the
+ // documented five-field TripCompleted contract, field-by-field, proving
+ // the decoded payload — not just that "an event arrived".
+ const records = auditRecords();
+ assert.equal(records.length, 1, "audit reactor appended exactly one record");
+ assert.deepEqual(records[0], {
+ tripId: "trip-x",
+ userId: "u-1",
+ vehicleId: "v-1",
+ amountCents: 300n,
+ durationMs: 60_000n,
+ });
+ });
+
+ it("TOPIC: the event method resolves to exactly \"trips.completed\" from the proto option (no typeName fallback, no raw {topic})", () => {
+ const topic = resolveTopicName(TripEventHandlers.method.onTripCompleted);
+ assert.equal(topic, "trips.completed");
+ // Guard the fallback explicitly: the typeName is NOT the topic.
+ assert.notEqual(topic, TripCompletedSchema.typeName);
+ });
+
+ it("NEGATIVE: an off-topic subscriber (trips.other) receives 0 — the broadcast is scoped to trips.completed", async () => {
+ let offTopicHits = 0;
+ const sub: EventSubscription = await adapter.subscribe(["trips.other"], async (_event, ack) => {
+ offTopicHits += 1;
+ await ack();
+ });
+ try {
+ await run(activities.publishTripCompleted, { tripId: "trip-neg", userId: "u-9", vehicleId: "v-9", durationMs: 30_000 });
+ // The on-topic reactors DID receive (proving the publish happened)...
+ assert.equal(pricingTripCountValue(), 1);
+ // ...but the off-topic subscriber did NOT.
+ assert.equal(offTopicHits, 0, "off-topic subscriber must not receive trips.completed");
+ } finally {
+ await sub.unsubscribe();
+ }
+ });
+
+ it("IDEMPOTENT: a redelivery of the same tripId does NOT double-count revenue, double-audit, or double-notify", async () => {
+ const payload = create(TripCompletedSchema, { tripId: "trip-dupe", userId: "u-2", vehicleId: "v-2", amountCents: 150n, durationMs: 30_000n });
+ const bytes = toBinary(TripCompletedSchema, payload);
+
+ // Publish the SAME tripId twice straight through the adapter (simulating a
+ // broker redelivery / at-least-once), bypassing the publisher bus so we
+ // control the exact bytes and topic.
+ await adapter.publish("trips.completed", bytes);
+ await adapter.publish("trips.completed", bytes);
+
+ assert.equal(pricingTripCountValue(), 1, "trip counted once despite redelivery");
+ assert.equal(pricingRevenueCentsValue(), 150n, "revenue tallied once despite redelivery");
+ assert.equal(auditRecords().length, 1, "audited once despite redelivery");
+ assert.equal(sentReceipts().length, 1, "notified once despite redelivery");
+ });
+});
diff --git a/car-sharing/tests/workflow/tripWorkflow.test.ts b/car-sharing/tests/workflow/tripWorkflow.test.ts
index 94af2cd..c966429 100644
--- a/car-sharing/tests/workflow/tripWorkflow.test.ts
+++ b/car-sharing/tests/workflow/tripWorkflow.test.ts
@@ -62,6 +62,9 @@ function makeMockActivities(calls: string[], failing?: { step: string }): Record
addCharge: async () => record("addCharge", "charge-mock-1"),
refundCharge: async () => record("refundCharge"),
settle: async () => record("settle"),
+ // Phase 3 terminal broadcast — fired from the success-only tail AFTER the
+ // saga try/catch, so it appears only on the SETTLED path.
+ publishTripCompleted: async () => record("publishTripCompleted"),
};
}
@@ -87,12 +90,14 @@ describe("TripWorkflow: orchestration + compensation (time-skipping, mocked acti
return worker.runUntil(testEnv.client.workflow.execute(TripWorkflow, { args: [INPUT], taskQueue: TASK_QUEUE, workflowId }));
}
- it("success: runs the forward steps in order and SETTLES", async () => {
+ it("success: runs the forward steps in order, SETTLES, then broadcasts TripCompleted (success-only tail)", async () => {
const calls: string[] = [];
const result = await runWorkflow(makeMockActivities(calls), "wf-success");
assert.equal(result, "SETTLED");
- assert.deepEqual(calls, ["reserveVehicle", "recordTrip", "endTrip", "openTab", "addCharge", "settle"]);
+ // The terminal broadcast fires LAST, after settle — the success-only tail
+ // outside the saga try/catch (Phase 3).
+ assert.deepEqual(calls, ["reserveVehicle", "recordTrip", "endTrip", "openTab", "addCharge", "settle", "publishTripCompleted"]);
});
it("settle fails: compensations run in REVERSE order (refund → void → cancel → release)", async () => {
@@ -100,7 +105,9 @@ describe("TripWorkflow: orchestration + compensation (time-skipping, mocked acti
await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "settle" }), "wf-settle-fail"));
// Forward path up to and including the failing settle, then the LIFO
- // unwind of every pushed compensation.
+ // unwind of every pushed compensation. The ABSENCE of `publishTripCompleted`
+ // here is the negative guard: the broadcast fires only on the SETTLED tail,
+ // NEVER on a compensated (CANCELLED) run.
assert.deepEqual(calls, [
"reserveVehicle",
"recordTrip",
@@ -132,6 +139,20 @@ describe("TripWorkflow: orchestration + compensation (time-skipping, mocked acti
assert.deepEqual(calls, ["reserveVehicle"]);
});
+ it("broadcast failure leaves the trip SETTLED — the success-only tail swallows it, NO compensation runs", async () => {
+ // The CRITICAL Phase 3 guarantee (D4): a failed/timed-out broadcast must
+ // NEVER roll back a settled, paid trip. The mock's `publishTripCompleted`
+ // throws (nonRetryable), but it runs in the success-only tail OUTSIDE the
+ // saga try/catch, so the catch does not see it: the workflow still returns
+ // SETTLED and NO compensation follows. This proves the guard behaviorally
+ // (the structural placement), not by code-reading.
+ const calls: string[] = [];
+ const result = await runWorkflow(makeMockActivities(calls, { step: "publishTripCompleted" }), "wf-broadcast-fail");
+
+ assert.equal(result, "SETTLED");
+ assert.deepEqual(calls, ["reserveVehicle", "recordTrip", "endTrip", "openTab", "addCharge", "settle", "publishTripCompleted"]);
+ });
+
it("the REAL activities module + real workflowsPath register on a Worker (production startup path)", async () => {
// The other tests pass hand-built mock activities; this is the ONLY test
// that exercises the exact registration `src/worker.ts` does — the real