Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions car-sharing/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
# so we run TypeScript directly with `node src/index.ts` (native type stripping)
# — no build step, no --experimental flags.

# ── deps: install production node_modules from the lockfile ──────────────────
# ── deps: install production node_modules ───────────────────────────────────
FROM node:25-slim AS deps
RUN npm install -g pnpm
WORKDIR /app
COPY package.json pnpm-workspace.yaml ./
# No lockfile is committed for this example; pnpm resolves @connectum/* to the
# published 1.0.0 versions pinned in package.json and writes a lockfile here.
# No lockfile is committed for this flagship example (matches getting-started /
# hris): pnpm resolves @connectum/* and other deps from the caret (^) ranges in
# package.json, so the image is example-grade, not bit-reproducible. Production
# services should commit pnpm-lock.yaml and use `pnpm install --frozen-lockfile`
# (the secondary examples model this).
RUN pnpm install --prod

# ── runtime: copy deps + source + generated proto code ──────────────────────
Expand Down
17 changes: 13 additions & 4 deletions car-sharing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,13 @@ falling back to a terminal status (`SETTLED` / `CANCELLED`) once it closes.

### The saga and its compensations

`TripWorkflow` runs the forward steps in order, pushing a compensation onto a
stack after each side-effecting step; on **any** failure it unwinds the stack in
reverse (LIFO) and rethrows — the canonical Temporal saga pattern. Each step is
an **activity** that makes one ConnectRPC call to a role service.
`TripWorkflow` runs the forward steps in order, registering each step's
compensation on a stack — **before** the forward call when the compensation's
inputs are already known (so an ambiguous failure that committed the side effect
is still unwound), or **after** when the compensation needs the call's result
(step 6's `refundCharge` needs the charge id). On **any** failure it unwinds the
stack in reverse (LIFO) and rethrows — the canonical Temporal saga pattern. Each
step is an **activity** that makes one ConnectRPC call to a role service.

| # | Forward step (activity → RPC) | Compensation (activity → RPC) |
| --- | ---------------------------------------- | ----------------------------------------- |
Expand All @@ -254,6 +257,12 @@ after a forward step partially applied. Step 1's availability failure is a
**non-retryable** `ApplicationFailure`, so the workflow fails fast with nothing
to undo.

Forward steps are made **idempotent under at-least-once retries** the same way:
`reserveVehicle` carries the trip id as a `holder_id`, so a Temporal retry that
re-runs the activity after its first attempt already committed the reservation
re-reserves *its own* vehicle (success) instead of being mistaken for a
double-booking — while a different trip on a held vehicle is still rejected.

### Processes

The native Temporal worker (the Rust core-bridge addon + the on-the-fly workflow
Expand Down
1 change: 1 addition & 0 deletions car-sharing/drizzle/0001_sturdy_wendell_vaughn.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "vehicles" ADD COLUMN "holder" text;
81 changes: 81 additions & 0 deletions car-sharing/drizzle/meta/0001_snapshot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"id": "dd4fb939-798e-43c8-b71a-47f7c31b93eb",
"prevId": "3e01871f-ef91-4cb5-87c9-08bacaa6a942",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.vehicles": {
"name": "vehicles",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true
},
"available": {
"name": "available",
"type": "boolean",
"primaryKey": false,
"notNull": true
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true
},
"holder": {
"name": "holder",
"type": "text",
"primaryKey": false,
"notNull": false
},
"lat": {
"name": "lat",
"type": "double precision",
"primaryKey": false,
"notNull": false
},
"lng": {
"name": "lng",
"type": "double precision",
"primaryKey": false,
"notNull": false
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
}
},
"enums": {},
"schemas": {},
"sequences": {},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"columns": {},
"schemas": {},
"tables": {}
}
}
7 changes: 7 additions & 0 deletions car-sharing/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
"when": 1781980551863,
"tag": "0000_big_christian_walker",
"breakpoints": true
},
{
"idx": 1,
"version": "7",
"when": 1782221206144,
"tag": "0001_sturdy_wendell_vaughn",
"breakpoints": true
}
]
}
6 changes: 6 additions & 0 deletions car-sharing/proto/fleet/v1/fleet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ message ListVehiclesRequest {

message ReserveVehicleRequest {
string id = 1;
// Reservation holder — the trip/workflow id acquiring the lock. Makes reserve
// idempotent across Temporal retries: the same holder re-reserving its own
// vehicle succeeds (returns the vehicle), while a held vehicle with a
// different holder is FAILED_PRECONDITION. Empty for a non-saga caller, which
// is then treated as having no holder (a held vehicle is always a conflict).
string holder_id = 2;
}

message ReleaseVehicleRequest {
Expand Down
4 changes: 4 additions & 0 deletions car-sharing/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ export type VehicleStatus = (typeof VehicleStatus)[keyof typeof VehicleStatus];
* - `model` human-readable model name.
* - `available` derived boolean, kept in sync with `status`.
* - `status` {@link VehicleStatus} string.
* - `holder` trip/workflow id currently holding the reservation; null when
* available. Lets ReserveVehicle be idempotent across Temporal
* retries (the same holder re-reserving its own vehicle succeeds).
* - `lat`/`lng` last-known position (nullable; double precision so it maps to
* the proto `double` location fields without string coercion).
* - `updatedAt` last mutation timestamp (defaults to now()).
Expand All @@ -44,6 +47,7 @@ export const vehicles = pgTable("vehicles", {
model: text("model").notNull(),
available: boolean("available").notNull(),
status: text("status").notNull(),
holder: text("holder"),
lat: doublePrecision("lat"),
lng: doublePrecision("lng"),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
Expand Down
22 changes: 15 additions & 7 deletions car-sharing/src/services/fleetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ export function createFleetService(db: Db): ServiceDefinition {
},

async reserveVehicle(req) {
// Atomic reserve: only flips a vehicle that is currently available.
// An empty result means either the id is unknown OR it was not
// available — a follow-up read disambiguates the error code.
// Atomic reserve: only flips a vehicle that is currently available,
// stamping the holder (the trip/workflow id). An empty result means
// either the id is unknown OR it was not available — a follow-up read
// disambiguates the error code AND the idempotent-retry case below.
const updated = await db
.update(vehicles)
.set({ available: false, status: VehicleStatus.RESERVED, updatedAt: new Date() })
.set({ available: false, status: VehicleStatus.RESERVED, holder: req.holderId, updatedAt: new Date() })
.where(and(eq(vehicles.id, req.id), eq(vehicles.available, true)))
.returning();

Expand All @@ -110,10 +111,17 @@ export function createFleetService(db: Db): ServiceDefinition {
}

const existing = await db.select().from(vehicles).where(eq(vehicles.id, req.id)).limit(1);
if (existing[0] === undefined) {
const current = existing[0];
if (current === undefined) {
throw new ConnectError(`No vehicle with id "${req.id}".`, Code.NotFound);
}
throw new ConnectError(`Vehicle "${req.id}" is not available (status "${existing[0].status}").`, Code.FailedPrecondition);
// Idempotent retry: the SAME holder re-reserving its own vehicle (a
// Temporal retry that observed its prior commit) succeeds. A held
// vehicle with a different (or empty) holder is a genuine conflict.
if (req.holderId !== "" && current.holder === req.holderId) {
return toVehicle(current);
}
throw new ConnectError(`Vehicle "${req.id}" is not available (status "${current.status}").`, Code.FailedPrecondition);
},

async releaseVehicle(req) {
Expand All @@ -130,7 +138,7 @@ export function createFleetService(db: Db): ServiceDefinition {

const updated = await db
.update(vehicles)
.set({ available: true, status: VehicleStatus.AVAILABLE, updatedAt: new Date() })
.set({ available: true, status: VehicleStatus.AVAILABLE, holder: null, updatedAt: new Date() })
.where(eq(vehicles.id, req.id))
Comment on lines +141 to 142

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Guard release by the reservation holder.

Line 141 clears the lock using only id in the WHERE clause. If a releaseVehicle compensation commits but its activity response is lost, a retry can run after another trip reserves the now-available vehicle and then clear that other trip’s holder, reopening double-booking. Thread the expected holder through ReleaseVehicleRequest/activity/workflow and only clear when the stored holder matches, while treating an already-available holder = null row as idempotent success.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@car-sharing/src/services/fleetService.ts` around lines 141 - 142, The WHERE
clause in the vehicle release update on line 141-142 only checks the vehicle ID,
creating a double-booking vulnerability if retries occur after response loss.
Add a holder field to ReleaseVehicleRequest and thread it through the activity
and workflow to the releaseVehicle method. Modify the WHERE clause to include an
additional condition that verifies the current holder matches the expected
holder using eq(vehicles.holder, expectedHolder), ensuring the update only
clears the lock if held by the same reservation that requested the release.
Treat the case where holder is already null as idempotent success to prevent
errors on legitimate retries.

.returning();

Expand Down
24 changes: 20 additions & 4 deletions car-sharing/src/services/tripService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { create } from "@bufbuild/protobuf";
import { Code, ConnectError } from "@connectrpc/connect";
import type { ServiceDefinition } from "@connectum/core";
import { defineService } from "@connectum/core";
import { QueryNotRegisteredError, QueryRejectedError, WorkflowNotFoundError } from "@temporalio/client";
import { GetVehicleRequestSchema } from "#gen/fleet/v1/fleet_pb.ts";
import { EndTripResponseSchema, GetTripResponseSchema, RecordTripResponseSchema, StartTripResponseSchema, TripSchema, TripService } from "#gen/trips/v1/trips_pb.ts";
import type { TripStatus as TripStatusT } from "#temporal/tripStatus.ts";
Expand Down Expand Up @@ -108,6 +109,16 @@ function terminalStatusFor(workflowStatusName: string): TripStatusT {
return workflowStatusName === "COMPLETED" ? TripStatus.SETTLED : TripStatus.CANCELLED;
}

/**
* True when a failed Query legitimately means "the run is closed/gone or its
* query handler is unavailable" — the only cases where falling back to a
* terminal status from `describe()` is correct. A transient/other error must
* NOT be collapsed into a terminal status (it is surfaced as `Unavailable`).
*/
function isClosedOrMissingRun(err: unknown): boolean {
return err instanceof WorkflowNotFoundError || err instanceof QueryNotRegisteredError || err instanceof QueryRejectedError;
}

/**
* Build the TripService definition with an injected Temporal client.
*
Expand Down Expand Up @@ -157,13 +168,18 @@ export function createTripService(options: TripServiceOptions): ServiceDefinitio

const handle = workflowClient.getHandle(req.tripId);

// Prefer the LIVE status from the running workflow's Query. If the
// workflow has closed (queries are rejected on closed runs), fall
// back to a terminal status derived from describe().
// Prefer the LIVE status from the running workflow's Query. Only when
// the Query is unavailable because the run is closed/gone or its
// query handler isn't registered do we fall back to a terminal status
// derived from describe(). A transient/other failure is surfaced as
// Unavailable, never silently mapped to a terminal status.
let status: TripStatusT;
try {
status = await handle.query<TripStatusT>("getTripStatus");
} catch {
} catch (err) {
if (!isClosedOrMissingRun(err)) {
throw new ConnectError(`Could not read status for trip "${req.tripId}".`, Code.Unavailable);
}
const description = await handle.describe();
status = terminalStatusFor(description.status.name);
}
Expand Down
15 changes: 9 additions & 6 deletions car-sharing/src/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,19 @@ function chargeCents(durationMs: number): bigint {
// ── Forward steps ─────────────────────────────────────────────────────────

/**
* Step 1 — reserve the vehicle. A business failure (unavailable / unknown) is
* rethrown as a NON-RETRYABLE `ApplicationFailure(VEHICLE_UNAVAILABLE)` so the
* workflow fails fast with no compensation; any other (infra) error stays
* Step 1 — reserve the vehicle. `holderId` (the trip/workflow id) is the
* reservation owner: it makes the reserve idempotent across Temporal retries, so
* a retry that observes its OWN prior commit succeeds instead of being mistaken
* for a conflict. A real business failure (unavailable / unknown / a different
* holder) is rethrown as a NON-RETRYABLE `ApplicationFailure(VEHICLE_UNAVAILABLE)`
* so the workflow fails fast with no compensation; any other (infra) error stays
* retryable.
*
* @param input - `{ vehicleId }`.
* @param input - `{ vehicleId, holderId }`.
*/
export async function reserveVehicle(input: { vehicleId: string }): Promise<void> {
export async function reserveVehicle(input: { vehicleId: string; holderId: string }): Promise<void> {
try {
await clients().fleet.reserveVehicle(create(ReserveVehicleRequestSchema, { id: input.vehicleId }));
await clients().fleet.reserveVehicle(create(ReserveVehicleRequestSchema, { id: input.vehicleId, holderId: input.holderId }));
} catch (err) {
if (err instanceof ConnectError && (err.code === Code.FailedPrecondition || err.code === Code.NotFound)) {
throw ApplicationFailure.create({
Expand Down
38 changes: 25 additions & 13 deletions car-sharing/src/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
* (that would break determinism). All I/O is in the activities; the workflow
* only orchestrates.
*
* Saga (compensation-stack pattern, samples-repo style): run the forward steps
* in order; after each side-effecting step `unshift` its compensation onto a
* stack; on ANY failure, run the compensations in LIFO order (each wrapped in
* its own try/catch so the unwind never throws), then rethrow the original
* error. The result:
* Saga (compensation-stack pattern): run the forward steps in order, registering
* each step's compensation on a LIFO stack — BEFORE the forward call when the
* compensation's inputs are already known (so an ambiguous failure that DID
* commit the side effect is still unwound), or AFTER when the compensation needs
* the call's result (the charge id). On ANY failure, run the compensations in
* LIFO order (each wrapped in its own try/catch so the unwind never throws),
* then rethrow the original error. The result:
* - settle fails → refundCharge → voidTab → markTripCancelled → releaseVehicle
* - endTrip fails → markTripCancelled → releaseVehicle
* - reserve fails → (non-retryable) fail fast, NOTHING to compensate.
Expand Down Expand Up @@ -81,18 +83,26 @@ export async function TripWorkflow(input: TripWorkflowInput): Promise<TripStatus
let status: TripStatusT = TripStatus.STARTED;
setHandler(getTripStatusQuery, () => status);

// LIFO compensation stack: unshift after each side-effecting forward step.
// LIFO compensation stack: each step registers its compensation here, BEFORE
// its forward call where the inputs are already known (so an ambiguous
// failure that committed the side effect is still unwound), or AFTER where
// the compensation needs the call's result.
const compensations: Compensation[] = [];

try {
// Step 1 — reserve the vehicle (business failure here is non-retryable
// and fails the workflow fast; nothing pushed, nothing to undo).
await acts.reserveVehicle({ vehicleId });
// Step 1 — reserve the vehicle. `holderId: tripId` makes the reserve
// idempotent across Temporal retries (the same holder re-reserving its
// own vehicle succeeds). A business failure here is non-retryable and
// fails the workflow fast; registered AFTER, since a failed reserve held
// nothing and the release would have nothing to undo.
await acts.reserveVehicle({ vehicleId, holderId: tripId });
compensations.unshift({ name: "releaseVehicle", run: () => acts.releaseVehicle({ vehicleId }) });

// Step 2 — record the trip (STARTED).
await acts.recordTrip({ userId, vehicleId, tripId });
// Step 2 — record the trip (STARTED). Register the cancel BEFORE the
// call: markTripCancelled is idempotent (a no-op if the record never
// committed), so an ambiguous recordTrip failure is still unwound.
compensations.unshift({ name: "markTripCancelled", run: () => acts.markTripCancelled({ tripId }) });
await acts.recordTrip({ userId, vehicleId, tripId });
status = TripStatus.STARTED;

// Step 3 — the drive. A timer; time-skipped instantly under test.
Expand All @@ -103,9 +113,11 @@ export async function TripWorkflow(input: TripWorkflowInput): Promise<TripStatus
await acts.endTrip({ tripId });
status = TripStatus.ENDED;

// Step 5 — open the billing tab.
await acts.openTab({ tripId });
// Step 5 — open the billing tab. Register the void BEFORE the call:
// voidTab is idempotent (a no-op on a missing tab), so an ambiguous
// openTab failure is still unwound.
compensations.unshift({ name: "voidTab", run: () => acts.voidTab({ tripId }) });
await acts.openTab({ tripId });

// Step 6 — add the charge derived from the drive duration.
const chargeId = await acts.addCharge({ tripId, durationMs: DRIVE_DURATION_MS });
Expand Down
Loading