diff --git a/car-sharing/Dockerfile b/car-sharing/Dockerfile index 871059e..83e582d 100644 --- a/car-sharing/Dockerfile +++ b/car-sharing/Dockerfile @@ -3,13 +3,16 @@ # so we run TypeScript directly with `node src/index.ts` (native type stripping) # — no build step, no --experimental flags. -# ── deps: install production node_modules from the lockfile ────────────────── +# ── deps: install production node_modules ─────────────────────────────────── FROM node:25-slim AS deps RUN npm install -g pnpm WORKDIR /app COPY package.json pnpm-workspace.yaml ./ -# No lockfile is committed for this example; pnpm resolves @connectum/* to the -# published 1.0.0 versions pinned in package.json and writes a lockfile here. +# No lockfile is committed for this flagship example (matches getting-started / +# hris): pnpm resolves @connectum/* and other deps from the caret (^) ranges in +# package.json, so the image is example-grade, not bit-reproducible. Production +# services should commit pnpm-lock.yaml and use `pnpm install --frozen-lockfile` +# (the secondary examples model this). RUN pnpm install --prod # ── runtime: copy deps + source + generated proto code ────────────────────── diff --git a/car-sharing/README.md b/car-sharing/README.md index 59e89cd..b579cc7 100644 --- a/car-sharing/README.md +++ b/car-sharing/README.md @@ -230,10 +230,13 @@ falling back to a terminal status (`SETTLED` / `CANCELLED`) once it closes. ### The saga and its compensations -`TripWorkflow` runs the forward steps in order, pushing a compensation onto a -stack after each side-effecting step; on **any** failure it unwinds the stack in -reverse (LIFO) and rethrows — the canonical Temporal saga pattern. Each step is -an **activity** that makes one ConnectRPC call to a role service. +`TripWorkflow` runs the forward steps in order, registering each step's +compensation on a stack — **before** the forward call when the compensation's +inputs are already known (so an ambiguous failure that committed the side effect +is still unwound), or **after** when the compensation needs the call's result +(step 6's `refundCharge` needs the charge id). On **any** failure it unwinds the +stack in reverse (LIFO) and rethrows — the canonical Temporal saga pattern. Each +step is an **activity** that makes one ConnectRPC call to a role service. | # | Forward step (activity → RPC) | Compensation (activity → RPC) | | --- | ---------------------------------------- | ----------------------------------------- | @@ -254,6 +257,12 @@ after a forward step partially applied. Step 1's availability failure is a **non-retryable** `ApplicationFailure`, so the workflow fails fast with nothing to undo. +Forward steps are made **idempotent under at-least-once retries** the same way: +`reserveVehicle` carries the trip id as a `holder_id`, so a Temporal retry that +re-runs the activity after its first attempt already committed the reservation +re-reserves *its own* vehicle (success) instead of being mistaken for a +double-booking — while a different trip on a held vehicle is still rejected. + ### Processes The native Temporal worker (the Rust core-bridge addon + the on-the-fly workflow diff --git a/car-sharing/drizzle/0001_sturdy_wendell_vaughn.sql b/car-sharing/drizzle/0001_sturdy_wendell_vaughn.sql new file mode 100644 index 0000000..5157209 --- /dev/null +++ b/car-sharing/drizzle/0001_sturdy_wendell_vaughn.sql @@ -0,0 +1 @@ +ALTER TABLE "vehicles" ADD COLUMN "holder" text; \ No newline at end of file diff --git a/car-sharing/drizzle/meta/0001_snapshot.json b/car-sharing/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000..c05f729 --- /dev/null +++ b/car-sharing/drizzle/meta/0001_snapshot.json @@ -0,0 +1,81 @@ +{ + "id": "dd4fb939-798e-43c8-b71a-47f7c31b93eb", + "prevId": "3e01871f-ef91-4cb5-87c9-08bacaa6a942", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.vehicles": { + "name": "vehicles", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "available": { + "name": "available", + "type": "boolean", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "holder": { + "name": "holder", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "lat": { + "name": "lat", + "type": "double precision", + "primaryKey": false, + "notNull": false + }, + "lng": { + "name": "lng", + "type": "double precision", + "primaryKey": false, + "notNull": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": {}, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/car-sharing/drizzle/meta/_journal.json b/car-sharing/drizzle/meta/_journal.json index 461f145..5674d8d 100644 --- a/car-sharing/drizzle/meta/_journal.json +++ b/car-sharing/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1781980551863, "tag": "0000_big_christian_walker", "breakpoints": true + }, + { + "idx": 1, + "version": "7", + "when": 1782221206144, + "tag": "0001_sturdy_wendell_vaughn", + "breakpoints": true } ] } \ No newline at end of file diff --git a/car-sharing/proto/fleet/v1/fleet.proto b/car-sharing/proto/fleet/v1/fleet.proto index b637a98..4c2ec37 100644 --- a/car-sharing/proto/fleet/v1/fleet.proto +++ b/car-sharing/proto/fleet/v1/fleet.proto @@ -67,6 +67,12 @@ message ListVehiclesRequest { message ReserveVehicleRequest { string id = 1; + // Reservation holder — the trip/workflow id acquiring the lock. Makes reserve + // idempotent across Temporal retries: the same holder re-reserving its own + // vehicle succeeds (returns the vehicle), while a held vehicle with a + // different holder is FAILED_PRECONDITION. Empty for a non-saga caller, which + // is then treated as having no holder (a held vehicle is always a conflict). + string holder_id = 2; } message ReleaseVehicleRequest { diff --git a/car-sharing/src/db/schema.ts b/car-sharing/src/db/schema.ts index 7e8eb59..7ecdae5 100644 --- a/car-sharing/src/db/schema.ts +++ b/car-sharing/src/db/schema.ts @@ -35,6 +35,9 @@ export type VehicleStatus = (typeof VehicleStatus)[keyof typeof VehicleStatus]; * - `model` human-readable model name. * - `available` derived boolean, kept in sync with `status`. * - `status` {@link VehicleStatus} string. + * - `holder` trip/workflow id currently holding the reservation; null when + * available. Lets ReserveVehicle be idempotent across Temporal + * retries (the same holder re-reserving its own vehicle succeeds). * - `lat`/`lng` last-known position (nullable; double precision so it maps to * the proto `double` location fields without string coercion). * - `updatedAt` last mutation timestamp (defaults to now()). @@ -44,6 +47,7 @@ export const vehicles = pgTable("vehicles", { model: text("model").notNull(), available: boolean("available").notNull(), status: text("status").notNull(), + holder: text("holder"), lat: doublePrecision("lat"), lng: doublePrecision("lng"), updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), diff --git a/car-sharing/src/services/fleetService.ts b/car-sharing/src/services/fleetService.ts index 732c443..97aa4ce 100644 --- a/car-sharing/src/services/fleetService.ts +++ b/car-sharing/src/services/fleetService.ts @@ -95,12 +95,13 @@ export function createFleetService(db: Db): ServiceDefinition { }, async reserveVehicle(req) { - // Atomic reserve: only flips a vehicle that is currently available. - // An empty result means either the id is unknown OR it was not - // available — a follow-up read disambiguates the error code. + // Atomic reserve: only flips a vehicle that is currently available, + // stamping the holder (the trip/workflow id). An empty result means + // either the id is unknown OR it was not available — a follow-up read + // disambiguates the error code AND the idempotent-retry case below. const updated = await db .update(vehicles) - .set({ available: false, status: VehicleStatus.RESERVED, updatedAt: new Date() }) + .set({ available: false, status: VehicleStatus.RESERVED, holder: req.holderId, updatedAt: new Date() }) .where(and(eq(vehicles.id, req.id), eq(vehicles.available, true))) .returning(); @@ -110,10 +111,17 @@ export function createFleetService(db: Db): ServiceDefinition { } const existing = await db.select().from(vehicles).where(eq(vehicles.id, req.id)).limit(1); - if (existing[0] === undefined) { + const current = existing[0]; + if (current === undefined) { throw new ConnectError(`No vehicle with id "${req.id}".`, Code.NotFound); } - throw new ConnectError(`Vehicle "${req.id}" is not available (status "${existing[0].status}").`, Code.FailedPrecondition); + // Idempotent retry: the SAME holder re-reserving its own vehicle (a + // Temporal retry that observed its prior commit) succeeds. A held + // vehicle with a different (or empty) holder is a genuine conflict. + if (req.holderId !== "" && current.holder === req.holderId) { + return toVehicle(current); + } + throw new ConnectError(`Vehicle "${req.id}" is not available (status "${current.status}").`, Code.FailedPrecondition); }, async releaseVehicle(req) { @@ -130,7 +138,7 @@ export function createFleetService(db: Db): ServiceDefinition { const updated = await db .update(vehicles) - .set({ available: true, status: VehicleStatus.AVAILABLE, updatedAt: new Date() }) + .set({ available: true, status: VehicleStatus.AVAILABLE, holder: null, updatedAt: new Date() }) .where(eq(vehicles.id, req.id)) .returning(); diff --git a/car-sharing/src/services/tripService.ts b/car-sharing/src/services/tripService.ts index cb01bf6..d217606 100644 --- a/car-sharing/src/services/tripService.ts +++ b/car-sharing/src/services/tripService.ts @@ -34,6 +34,7 @@ import { create } from "@bufbuild/protobuf"; import { Code, ConnectError } from "@connectrpc/connect"; import type { ServiceDefinition } from "@connectum/core"; import { defineService } from "@connectum/core"; +import { QueryNotRegisteredError, QueryRejectedError, WorkflowNotFoundError } from "@temporalio/client"; import { GetVehicleRequestSchema } from "#gen/fleet/v1/fleet_pb.ts"; import { EndTripResponseSchema, GetTripResponseSchema, RecordTripResponseSchema, StartTripResponseSchema, TripSchema, TripService } from "#gen/trips/v1/trips_pb.ts"; import type { TripStatus as TripStatusT } from "#temporal/tripStatus.ts"; @@ -108,6 +109,16 @@ function terminalStatusFor(workflowStatusName: string): TripStatusT { return workflowStatusName === "COMPLETED" ? TripStatus.SETTLED : TripStatus.CANCELLED; } +/** + * True when a failed Query legitimately means "the run is closed/gone or its + * query handler is unavailable" — the only cases where falling back to a + * terminal status from `describe()` is correct. A transient/other error must + * NOT be collapsed into a terminal status (it is surfaced as `Unavailable`). + */ +function isClosedOrMissingRun(err: unknown): boolean { + return err instanceof WorkflowNotFoundError || err instanceof QueryNotRegisteredError || err instanceof QueryRejectedError; +} + /** * Build the TripService definition with an injected Temporal client. * @@ -157,13 +168,18 @@ export function createTripService(options: TripServiceOptions): ServiceDefinitio const handle = workflowClient.getHandle(req.tripId); - // Prefer the LIVE status from the running workflow's Query. If the - // workflow has closed (queries are rejected on closed runs), fall - // back to a terminal status derived from describe(). + // Prefer the LIVE status from the running workflow's Query. Only when + // the Query is unavailable because the run is closed/gone or its + // query handler isn't registered do we fall back to a terminal status + // derived from describe(). A transient/other failure is surfaced as + // Unavailable, never silently mapped to a terminal status. let status: TripStatusT; try { status = await handle.query("getTripStatus"); - } catch { + } catch (err) { + if (!isClosedOrMissingRun(err)) { + throw new ConnectError(`Could not read status for trip "${req.tripId}".`, Code.Unavailable); + } const description = await handle.describe(); status = terminalStatusFor(description.status.name); } diff --git a/car-sharing/src/temporal/activities.ts b/car-sharing/src/temporal/activities.ts index 00be09b..3dcbe16 100644 --- a/car-sharing/src/temporal/activities.ts +++ b/car-sharing/src/temporal/activities.ts @@ -68,16 +68,19 @@ function chargeCents(durationMs: number): bigint { // ── Forward steps ───────────────────────────────────────────────────────── /** - * Step 1 — reserve the vehicle. A business failure (unavailable / unknown) is - * rethrown as a NON-RETRYABLE `ApplicationFailure(VEHICLE_UNAVAILABLE)` so the - * workflow fails fast with no compensation; any other (infra) error stays + * Step 1 — reserve the vehicle. `holderId` (the trip/workflow id) is the + * reservation owner: it makes the reserve idempotent across Temporal retries, so + * a retry that observes its OWN prior commit succeeds instead of being mistaken + * for a conflict. A real business failure (unavailable / unknown / a different + * holder) is rethrown as a NON-RETRYABLE `ApplicationFailure(VEHICLE_UNAVAILABLE)` + * so the workflow fails fast with no compensation; any other (infra) error stays * retryable. * - * @param input - `{ vehicleId }`. + * @param input - `{ vehicleId, holderId }`. */ -export async function reserveVehicle(input: { vehicleId: string }): Promise { +export async function reserveVehicle(input: { vehicleId: string; holderId: string }): Promise { try { - await clients().fleet.reserveVehicle(create(ReserveVehicleRequestSchema, { id: input.vehicleId })); + await clients().fleet.reserveVehicle(create(ReserveVehicleRequestSchema, { id: input.vehicleId, holderId: input.holderId })); } catch (err) { if (err instanceof ConnectError && (err.code === Code.FailedPrecondition || err.code === Code.NotFound)) { throw ApplicationFailure.create({ diff --git a/car-sharing/src/temporal/workflows.ts b/car-sharing/src/temporal/workflows.ts index a468269..cfbc247 100644 --- a/car-sharing/src/temporal/workflows.ts +++ b/car-sharing/src/temporal/workflows.ts @@ -8,11 +8,13 @@ * (that would break determinism). All I/O is in the activities; the workflow * only orchestrates. * - * Saga (compensation-stack pattern, samples-repo style): run the forward steps - * in order; after each side-effecting step `unshift` its compensation onto a - * stack; on ANY failure, run the compensations in LIFO order (each wrapped in - * its own try/catch so the unwind never throws), then rethrow the original - * error. The result: + * Saga (compensation-stack pattern): run the forward steps in order, registering + * each step's compensation on a LIFO stack — BEFORE the forward call when the + * compensation's inputs are already known (so an ambiguous failure that DID + * commit the side effect is still unwound), or AFTER when the compensation needs + * the call's result (the charge id). On ANY failure, run the compensations in + * LIFO order (each wrapped in its own try/catch so the unwind never throws), + * then rethrow the original error. The result: * - settle fails → refundCharge → voidTab → markTripCancelled → releaseVehicle * - endTrip fails → markTripCancelled → releaseVehicle * - reserve fails → (non-retryable) fail fast, NOTHING to compensate. @@ -81,18 +83,26 @@ export async function TripWorkflow(input: TripWorkflowInput): Promise status); - // LIFO compensation stack: unshift after each side-effecting forward step. + // LIFO compensation stack: each step registers its compensation here, BEFORE + // its forward call where the inputs are already known (so an ambiguous + // failure that committed the side effect is still unwound), or AFTER where + // the compensation needs the call's result. const compensations: Compensation[] = []; try { - // Step 1 — reserve the vehicle (business failure here is non-retryable - // and fails the workflow fast; nothing pushed, nothing to undo). - await acts.reserveVehicle({ vehicleId }); + // Step 1 — reserve the vehicle. `holderId: tripId` makes the reserve + // idempotent across Temporal retries (the same holder re-reserving its + // own vehicle succeeds). A business failure here is non-retryable and + // fails the workflow fast; registered AFTER, since a failed reserve held + // nothing and the release would have nothing to undo. + await acts.reserveVehicle({ vehicleId, holderId: tripId }); compensations.unshift({ name: "releaseVehicle", run: () => acts.releaseVehicle({ vehicleId }) }); - // Step 2 — record the trip (STARTED). - await acts.recordTrip({ userId, vehicleId, tripId }); + // Step 2 — record the trip (STARTED). Register the cancel BEFORE the + // call: markTripCancelled is idempotent (a no-op if the record never + // committed), so an ambiguous recordTrip failure is still unwound. compensations.unshift({ name: "markTripCancelled", run: () => acts.markTripCancelled({ tripId }) }); + await acts.recordTrip({ userId, vehicleId, tripId }); status = TripStatus.STARTED; // Step 3 — the drive. A timer; time-skipped instantly under test. @@ -103,9 +113,11 @@ export async function TripWorkflow(input: TripWorkflowInput): Promise acts.voidTab({ tripId }) }); + await acts.openTab({ tripId }); // Step 6 — add the charge derived from the drive duration. const chargeId = await acts.addCharge({ tripId, durationMs: DRIVE_DURATION_MS }); diff --git a/car-sharing/tests/activity/activities.test.ts b/car-sharing/tests/activity/activities.test.ts index ce86bbd..8bdab64 100644 --- a/car-sharing/tests/activity/activities.test.ts +++ b/car-sharing/tests/activity/activities.test.ts @@ -91,7 +91,7 @@ describe("Activities: real RPC wiring + compensation idempotency (in-process mon it("reserveVehicle drives FleetService and recordTrip/endTrip drive the trip ledger", async () => { const tripId = "trip-act-2"; - await run(activities.reserveVehicle, { vehicleId: "v-001" }); + await run(activities.reserveVehicle, { vehicleId: "v-001", holderId: tripId }); await run(activities.recordTrip, { userId: "user-42", vehicleId: "v-001", tripId }); assert.equal(tripStatus(tripId), TripStatus.STARTED); @@ -105,7 +105,7 @@ describe("Activities: real RPC wiring + compensation idempotency (in-process mon // which the activity rethrows as a non-retryable ApplicationFailure whose // `type` is exactly the value the workflow lists in nonRetryableErrorTypes. await assert.rejects( - run(activities.reserveVehicle, { vehicleId: "v-003" }), + run(activities.reserveVehicle, { vehicleId: "v-003", holderId: "trip-unavail" }), (err: unknown) => err instanceof ApplicationFailure && err.type === "VehicleUnavailable" && err.nonRetryable === true && /not available/i.test(err.message), ); @@ -113,13 +113,32 @@ describe("Activities: real RPC wiring + compensation idempotency (in-process mon it("reserveVehicle on an UNKNOWN vehicle also throws a NON-RETRYABLE VehicleUnavailable ApplicationFailure", async () => { await assert.rejects( - run(activities.reserveVehicle, { vehicleId: "ghost" }), + run(activities.reserveVehicle, { vehicleId: "ghost", holderId: "trip-ghost" }), + (err: unknown) => err instanceof ApplicationFailure && err.type === "VehicleUnavailable" && err.nonRetryable === true, + ); + }); + + it("reserveVehicle is idempotent for the SAME holder: a retry observing its own reservation succeeds", async () => { + // A Temporal retry re-runs the activity. Re-reserving the SAME vehicle + // with the SAME holder must succeed (no VehicleUnavailable): the holder + // proves this is our own prior commit, not a conflicting second trip. + await run(activities.reserveVehicle, { vehicleId: "v-001", holderId: "trip-A" }); + await run(activities.reserveVehicle, { vehicleId: "v-001", holderId: "trip-A" }); + }); + + it("reserveVehicle rejects a DIFFERENT holder on a held vehicle (no double-booking)", async () => { + // trip-A holds v-001; trip-B must NOT be able to reserve it — the guard + // that keeps two trips off one vehicle. The activity rethrows the conflict + // as a non-retryable VehicleUnavailable. + await run(activities.reserveVehicle, { vehicleId: "v-001", holderId: "trip-A" }); + await assert.rejects( + run(activities.reserveVehicle, { vehicleId: "v-001", holderId: "trip-B" }), (err: unknown) => err instanceof ApplicationFailure && err.type === "VehicleUnavailable" && err.nonRetryable === true, ); }); it("releaseVehicle is idempotent: releasing an already-available vehicle is a no-op success", async () => { - await run(activities.reserveVehicle, { vehicleId: "v-001" }); + await run(activities.reserveVehicle, { vehicleId: "v-001", holderId: "trip-release" }); await run(activities.releaseVehicle, { vehicleId: "v-001" }); // Second release on an already-available vehicle must not throw. await run(activities.releaseVehicle, { vehicleId: "v-001" }); diff --git a/car-sharing/tests/e2e/e2e.test.ts b/car-sharing/tests/e2e/e2e.test.ts index abba1a2..cc6aa1f 100644 --- a/car-sharing/tests/e2e/e2e.test.ts +++ b/car-sharing/tests/e2e/e2e.test.ts @@ -45,6 +45,7 @@ import type { Client } from "@connectrpc/connect"; import { Code, ConnectError, createClient } from "@connectrpc/connect"; import { createGrpcTransport } from "@connectrpc/connect-node"; import type { Server } from "@connectum/core"; +import { QueryNotRegisteredError } from "@temporalio/client"; import { JWT_AUDIENCE, JWT_ISSUER } from "#auth.ts"; import { BillingService } from "#gen/billing/v1/billing_pb.ts"; import { FleetService } from "#gen/fleet/v1/fleet_pb.ts"; @@ -63,10 +64,17 @@ interface StartCall { /** How a stub handle answers GetTrip: a live query result, or a closed describe(). */ interface StubHandleBehaviour { - /** Status the live Query returns; if `undefined`, the Query throws (closed run). */ + /** Status the live Query returns; if `undefined`, the Query throws. */ readonly queryStatus?: string; /** Workflow status name returned by describe() when the Query is rejected. */ readonly describeStatusName?: string; + /** + * When the Query throws (`queryStatus` undefined), what kind of failure: + * `"closed"` (default) → a `QueryNotRegisteredError`, which the handler + * treats as "run closed/gone" and falls back to describe(); `"transient"` → + * a generic error, which the handler must surface as `Unavailable`. + */ + readonly queryError?: "closed" | "transient"; } /** @@ -87,7 +95,13 @@ function makeStubWorkflowClient(starts: StartCall[], handleBehaviour: () => Stub return { async query(): Promise { if (behaviour.queryStatus === undefined) { - throw new Error("query not supported on a closed workflow"); + if (behaviour.queryError === "transient") { + // A generic/infra failure — NOT a closed-run signal. + throw new Error("temporal frontend unavailable"); + } + // Closed/gone run: its query handler is not registered. + // (code 3 = INVALID_ARGUMENT; the handler keys off the type.) + throw new QueryNotRegisteredError("query handler not registered on a closed run", 3); } return behaviour.queryStatus as Ret; }, @@ -181,8 +195,9 @@ describe("E2E: car-sharing monolith (in-process gateway, no cluster, stub Tempor }); it("GetTrip falls back to a terminal status via describe() when the workflow has CLOSED", async () => { - // queryStatus undefined → the stub Query throws (queries are rejected on - // a closed run); the handler falls back to describe(): COMPLETED → SETTLED. + // queryStatus undefined → the stub Query throws a QueryNotRegisteredError + // (the run is closed/gone), which the handler treats as "fall back to + // describe()": COMPLETED → SETTLED. handleBehaviour = { describeStatusName: "COMPLETED" }; const settled = await trips.getTrip(create(GetTripRequestSchema, { tripId: "trip-done" }), { headers: { Authorization: `Bearer ${userToken}` }, @@ -197,6 +212,19 @@ describe("E2E: car-sharing monolith (in-process gateway, no cluster, stub Tempor assert.equal(cancelled.trip?.status, "CANCELLED"); }); + it("GetTrip surfaces Unavailable (NOT a terminal status) when the live Query fails transiently", async () => { + // Regression for the blanket catch that mapped ANY query error to a + // terminal status: a transient/infra failure must surface as Unavailable, + // never be silently reported as a SETTLED/CANCELLED trip. + handleBehaviour = { queryError: "transient" }; + await assert.rejects( + trips.getTrip(create(GetTripRequestSchema, { tripId: "trip-flaky" }), { + headers: { Authorization: `Bearer ${userToken}` }, + }), + (err: unknown) => err instanceof ConnectError && err.code === Code.Unavailable, + ); + }); + it("gateway auth: GetTrip with NO token is rejected as Unauthenticated", async () => { await assert.rejects( trips.getTrip(create(GetTripRequestSchema, { tripId: "trip-live" })), diff --git a/car-sharing/tests/workflow/tripWorkflow.test.ts b/car-sharing/tests/workflow/tripWorkflow.test.ts index 94af2cd..0a9e6ca 100644 --- a/car-sharing/tests/workflow/tripWorkflow.test.ts +++ b/car-sharing/tests/workflow/tripWorkflow.test.ts @@ -132,6 +132,24 @@ describe("TripWorkflow: orchestration + compensation (time-skipping, mocked acti assert.deepEqual(calls, ["reserveVehicle"]); }); + it("recordTrip fails: its PRE-registered cancel still unwinds (markTripCancelled → releaseVehicle)", async () => { + // F4 (register-before): markTripCancelled is on the stack BEFORE recordTrip + // is awaited, so an ambiguous recordTrip failure still unwinds it (it + // no-ops if the record never committed). With the old register-AFTER order + // this compensation would have been missed entirely. + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "recordTrip" }), "wf-record-fail")); + assert.deepEqual(calls, ["reserveVehicle", "recordTrip", "markTripCancelled", "releaseVehicle"]); + }); + + it("openTab fails: its PRE-registered void still unwinds (void → cancel → release)", async () => { + // F4 (register-before): voidTab is registered BEFORE openTab, so an + // ambiguous openTab failure still unwinds it (no-op on a missing tab). + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "openTab" }), "wf-opentab-fail")); + assert.deepEqual(calls, ["reserveVehicle", "recordTrip", "endTrip", "openTab", "voidTab", "markTripCancelled", "releaseVehicle"]); + }); + it("the REAL activities module + real workflowsPath register on a Worker (production startup path)", async () => { // The other tests pass hand-built mock activities; this is the ONLY test // that exercises the exact registration `src/worker.ts` does — the real