From 5543cb0e0def0302eb2b3c48db34c9abdb635b66 Mon Sep 17 00:00:00 2001 From: intech Date: Sun, 21 Jun 2026 20:26:55 +0400 Subject: [PATCH 1/6] =?UTF-8?q?feat(hris):=20durable=20onboarding=20saga?= =?UTF-8?q?=20(Temporal)=20=E2=80=94=20Phase=205a?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Evolve the HRIS example with a durable new-hire onboarding saga, mirroring the car-sharing trip saga. A new OnboardingService gateway runs a synchronous pre-check (the employee id must be free → AlreadyExists otherwise) and then starts an OnboardingWorkflow that provisions the hire across services with automatic LIFO compensation: createEmployee → setupPayroll → grantTimeOff → provisionAccess → activate - Three orthogonal mechanisms now coexist in one example: cross-service ctx.call (RequestLeave → directory), EventBus broadcast (LeaveApproved → payroll), and a Temporal saga (onboarding). The framework stays thin. - New AccessService (leaf, in-memory) is the saga's IT-provisioning step. - Directory gains CreateEmployee/ActivateEmployee/OffboardEmployee over the existing Drizzle db; CreateEmployee uses an atomic `insert ... on conflict do nothing returning` → AlreadyExists (no raw DB error leak). The activity rethrows it as a non-retryable ApplicationFailure so the workflow fails fast with nothing to compensate. - Payroll/TimeOff gain idempotent Setup/Teardown and Grant/Revoke RPCs; payroll-setup is the decrementable leave BALANCE, timeoff-grant is the distinct annual PTO policy allotment. - The worker (src/worker.ts) is the ONLY importer of @temporalio/worker; the RPC roles import only the pure-JS @temporalio/client, keeping their no-build native-TS run model. @swc/core build is approved for the workflow bundle. - Runs on the published 1.0.0 packages — the saga uses no 1.1.0-only API, so Phase 5a is not blocked on the 1.1.0 release. Tests (dockerless, 33/33): workflow orchestration + LIFO compensation (time-skipping, mocked activities), real activity ↔ RPC wiring + compensation idempotency (in-process monolith + PGlite), and the onboarding edge pre-check (AlreadyExists before Temporal; Unavailable with no client). The existing LeaveApproved e2e is unchanged. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/package.json | 6 + hris/pnpm-workspace.yaml | 7 + hris/proto/access/v1/access.proto | 40 ++++ hris/proto/directory/v1/directory.proto | 45 +++++ hris/proto/onboarding/v1/onboarding.proto | 64 +++++++ hris/proto/payroll/v1/payroll.proto | 23 +++ hris/proto/timeoff/v1/timeoff.proto | 32 ++++ hris/src/empty.ts | 19 ++ hris/src/server.ts | 41 +++- hris/src/services/accessService.ts | 67 +++++++ hris/src/services/directoryService.ts | 55 +++++- hris/src/services/onboardingService.ts | 148 +++++++++++++++ hris/src/services/payrollService.ts | 22 ++- hris/src/services/timeOffService.ts | 39 +++- hris/src/temporal/activities.ts | 150 +++++++++++++++ hris/src/temporal/clients.ts | 61 ++++++ hris/src/temporal/config.ts | 23 +++ hris/src/temporal/onboardingStatus.ts | 34 ++++ hris/src/temporal/workflowClient.ts | 31 +++ hris/src/temporal/workflows.ts | 133 +++++++++++++ hris/src/topology.ts | 8 +- hris/src/worker.ts | 46 +++++ hris/tests/activity/activities.test.ts | 179 ++++++++++++++++++ hris/tests/e2e/onboarding.test.ts | 144 ++++++++++++++ .../tests/workflow/onboardingWorkflow.test.ts | 167 ++++++++++++++++ 25 files changed, 1575 insertions(+), 9 deletions(-) create mode 100644 hris/proto/access/v1/access.proto create mode 100644 hris/proto/onboarding/v1/onboarding.proto create mode 100644 hris/src/empty.ts create mode 100644 hris/src/services/accessService.ts create mode 100644 hris/src/services/onboardingService.ts create mode 100644 hris/src/temporal/activities.ts create mode 100644 hris/src/temporal/clients.ts create mode 100644 hris/src/temporal/config.ts create mode 100644 hris/src/temporal/onboardingStatus.ts create mode 100644 hris/src/temporal/workflowClient.ts create mode 100644 hris/src/temporal/workflows.ts create mode 100644 hris/src/worker.ts create mode 100644 hris/tests/activity/activities.test.ts create mode 100644 hris/tests/e2e/onboarding.test.ts create mode 100644 hris/tests/workflow/onboardingWorkflow.test.ts diff --git a/hris/package.json b/hris/package.json index 5b02ad9..f2516f7 100644 --- a/hris/package.json +++ b/hris/package.json @@ -11,6 +11,7 @@ "scripts": { "start": "node src/index.ts", "dev": "node --watch src/index.ts", + "worker": "node src/worker.ts", "typecheck": "tsc --noEmit", "buf:generate": "buf generate", "buf:lint": "buf lint", @@ -50,6 +51,10 @@ "@connectum/healthcheck": "^1.0.0", "@connectum/interceptors": "^1.0.0", "@connectum/reflection": "^1.0.0", + "@temporalio/activity": "^1.18.1", + "@temporalio/client": "^1.18.1", + "@temporalio/worker": "^1.18.1", + "@temporalio/workflow": "^1.18.1", "drizzle-orm": "^0.45.0", "postgres": "^3.4.0" }, @@ -58,6 +63,7 @@ "@bufbuild/protoc-gen-es": "^2.11.0", "@connectum/protoc-gen-catalog": "^1.0.0", "@electric-sql/pglite": "^0.5.0", + "@temporalio/testing": "^1.18.1", "@types/node": "^25.2.0", "drizzle-kit": "^0.31.0", "typescript": "^5.9.3" diff --git a/hris/pnpm-workspace.yaml b/hris/pnpm-workspace.yaml index 6ab3670..af01b7d 100644 --- a/hris/pnpm-workspace.yaml +++ b/hris/pnpm-workspace.yaml @@ -1,3 +1,10 @@ allowBuilds: '@bufbuild/buf': true esbuild: true + # @swc/core compiles the Temporal workflow bundle (@temporalio/worker bundles + # src/temporal/workflows.ts with swc at worker startup — no separate build). + '@swc/core': true + # protobufjs is pulled transitively by @temporalio's gRPC client + # (@grpc/grpc-js → @grpc/proto-loader → protobufjs). Allow its build so pnpm + # does not report ERR_PNPM_IGNORED_BUILDS on install. + protobufjs: true diff --git a/hris/proto/access/v1/access.proto b/hris/proto/access/v1/access.proto new file mode 100644 index 0000000..0ed6f78 --- /dev/null +++ b/hris/proto/access/v1/access.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package access.v1; + +import "google/protobuf/empty.proto"; + +// AccessService provisions and revokes system access (the IT side of +// onboarding). It is a leaf service with an in-memory account ledger — the +// onboarding saga's fourth forward step. +service AccessService { + // ProvisionAccess creates the employee's system account. Idempotent: + // re-provisioning returns the existing account. Onboarding saga step 4. + rpc ProvisionAccess(ProvisionAccessRequest) returns (ProvisionAccessResponse) {} + + // RevokeAccess removes the employee's system account (the ProvisionAccess + // compensation). Idempotent; a no-op for an unknown employee. + rpc RevokeAccess(RevokeAccessRequest) returns (google.protobuf.Empty) {} +} + +message ProvisionAccessRequest { + string employee_id = 1; + // Work email the account is keyed on. + string email = 2; +} + +message ProvisionAccessResponse { + Access access = 1; +} + +message RevokeAccessRequest { + string employee_id = 1; +} + +message Access { + string employee_id = 1; + // The provisioned account handle (e.g. an SSO subject id). + string account_id = 2; + // True once an account exists for this employee. + bool provisioned = 3; +} diff --git a/hris/proto/directory/v1/directory.proto b/hris/proto/directory/v1/directory.proto index b702dac..c29d8a1 100644 --- a/hris/proto/directory/v1/directory.proto +++ b/hris/proto/directory/v1/directory.proto @@ -19,12 +19,57 @@ service DirectoryService { // and `manager_id` filters narrow the stream; `manager_id` is the org-chart // "direct reports of this manager" query. rpc ListEmployees(ListEmployeesRequest) returns (stream Employee) {} + + // CreateEmployee inserts a new employee in "onboarding" status. Returns + // ALREADY_EXISTS if the id is already taken. This is the onboarding saga's + // first forward step — its business failure (duplicate id) is what the + // workflow treats as non-retryable. + rpc CreateEmployee(CreateEmployeeRequest) returns (CreateEmployeeResponse) {} + + // ActivateEmployee flips an employee from "onboarding" to "active" — the + // onboarding saga's terminal step. Idempotent: activating an already-active + // employee is a no-op success. + rpc ActivateEmployee(ActivateEmployeeRequest) returns (ActivateEmployeeResponse) {} + + // OffboardEmployee marks an employee "offboarded" — the CreateEmployee + // compensation. Idempotent; a no-op for an unknown id. + rpc OffboardEmployee(OffboardEmployeeRequest) returns (OffboardEmployeeResponse) {} } message GetEmployeeRequest { string id = 1; } +message CreateEmployeeRequest { + string id = 1; + string name = 2; + string email = 3; + string title = 4; + string department = 5; + // The id of this hire's manager, or empty for the top of the org chart. + string manager_id = 6; +} + +message CreateEmployeeResponse { + Employee employee = 1; +} + +message ActivateEmployeeRequest { + string id = 1; +} + +message ActivateEmployeeResponse { + Employee employee = 1; +} + +message OffboardEmployeeRequest { + string id = 1; +} + +message OffboardEmployeeResponse { + Employee employee = 1; +} + message GetEmployeeResponse { Employee employee = 1; } diff --git a/hris/proto/onboarding/v1/onboarding.proto b/hris/proto/onboarding/v1/onboarding.proto new file mode 100644 index 0000000..cf2d532 --- /dev/null +++ b/hris/proto/onboarding/v1/onboarding.proto @@ -0,0 +1,64 @@ +syntax = "proto3"; + +package onboarding.v1; + +// OnboardingService is the edge orchestrator for new-hire onboarding, backed by +// a durable Temporal saga. +// +// A monolith ran onboarding inline; this service splits it. OnboardEmployee runs +// a SYNCHRONOUS pre-check (the employee id must be free) and only then STARTS a +// durable OnboardingWorkflow — a multi-step saga that provisions the new hire +// across the directory, payroll, time-off, and access services, with automatic +// LIFO compensation if any step fails. The long-running saga lives in Temporal, +// not in the handler; GetOnboarding reads its live status via a workflow query. +service OnboardingService { + // OnboardEmployee validates that the id is free (a ctx.call to the directory: + // an EXISTING employee is rejected with ALREADY_EXISTS), then starts the + // durable onboarding saga and returns immediately with status STARTED and the + // workflow id. The pre-check runs BEFORE any Temporal use, so the error path + // needs no live Temporal. + rpc OnboardEmployee(OnboardEmployeeRequest) returns (OnboardEmployeeResponse) {} + + // GetOnboarding reads the new hire's live onboarding status from the running + // workflow (a Temporal query), falling back to a terminal status once the + // saga has closed (COMPLETED on success, FAILED if it rolled back). + rpc GetOnboarding(GetOnboardingRequest) returns (GetOnboardingResponse) {} +} + +message OnboardEmployeeRequest { + // Desired employee id (must be free; e.g. "e-100"). + string employee_id = 1; + string name = 2; + // Work email address. + string email = 3; + // Job title (e.g. "Staff Engineer"). + string title = 4; + string department = 5; + // The id of this hire's manager, or empty for the top of the org chart. + string manager_id = 6; +} + +message OnboardEmployeeResponse { + Onboarding onboarding = 1; + // The Temporal workflow id (equals the employee id) — GetOnboarding maps an + // employee id straight to its onboarding workflow. + string workflow_id = 2; +} + +message GetOnboardingRequest { + string employee_id = 1; +} + +message GetOnboardingResponse { + Onboarding onboarding = 1; +} + +message Onboarding { + string employee_id = 1; + // Onboarding saga lifecycle state, modeled as a plain string (not a proto + // enum) so the generated TypeScript stays erasable under this example's + // `erasableSyntaxOnly` tsconfig. The domain is pinned in code via a `const` + // object (see src/temporal/onboardingStatus.ts): "STARTED" | "COMPLETED" | + // "FAILED". + string status = 2; +} diff --git a/hris/proto/payroll/v1/payroll.proto b/hris/proto/payroll/v1/payroll.proto index 2a24421..5724cca 100644 --- a/hris/proto/payroll/v1/payroll.proto +++ b/hris/proto/payroll/v1/payroll.proto @@ -11,6 +11,15 @@ import "google/protobuf/empty.proto"; service PayrollService { // GetBalance returns the remaining leave-days balance for an employee. rpc GetBalance(GetBalanceRequest) returns (GetBalanceResponse) {} + + // SetupPayroll enrolls a new hire with an initial leave-days balance (the + // balance OnLeaveApproved later decrements) — the onboarding saga's second + // forward step. Idempotent: re-running keeps the existing enrollment. + rpc SetupPayroll(SetupPayrollRequest) returns (SetupPayrollResponse) {} + + // TeardownPayroll removes an employee's payroll enrollment — the SetupPayroll + // compensation. Idempotent; a no-op for an unknown employee. + rpc TeardownPayroll(TeardownPayrollRequest) returns (google.protobuf.Empty) {} } message GetBalanceRequest { @@ -21,6 +30,20 @@ message GetBalanceResponse { Balance balance = 1; } +message SetupPayrollRequest { + string employee_id = 1; + // Initial leave-days balance to enroll the new hire with. + int32 initial_days = 2; +} + +message SetupPayrollResponse { + Balance balance = 1; +} + +message TeardownPayrollRequest { + string employee_id = 1; +} + message Balance { string employee_id = 1; int32 remaining_days = 2; diff --git a/hris/proto/timeoff/v1/timeoff.proto b/hris/proto/timeoff/v1/timeoff.proto index b6d9f24..25f7212 100644 --- a/hris/proto/timeoff/v1/timeoff.proto +++ b/hris/proto/timeoff/v1/timeoff.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package timeoff.v1; +import "google/protobuf/empty.proto"; + // TimeOffService approves leave requests. Its handler validates the employee // via a cross-service ctx.call to directory.v1.DirectoryService, then publishes // a LeaveApproved event that payroll.v1.PayrollService consumes. @@ -10,6 +12,16 @@ service TimeOffService { // the request, and emits LeaveApproved. Returns NOT_FOUND when the employee // is unknown. rpc RequestLeave(RequestLeaveRequest) returns (RequestLeaveResponse) {} + + // GrantTimeOff assigns a new hire their annual PTO policy allotment (distinct + // from the payroll leave-days balance — this is the entitlement, not the + // decrementable balance) — the onboarding saga's third forward step. + // Idempotent: re-granting keeps the existing grant. + rpc GrantTimeOff(GrantTimeOffRequest) returns (GrantTimeOffResponse) {} + + // RevokeTimeOff removes an employee's PTO grant — the GrantTimeOff + // compensation. Idempotent; a no-op for an unknown employee. + rpc RevokeTimeOff(RevokeTimeOffRequest) returns (google.protobuf.Empty) {} } message RequestLeaveRequest { @@ -26,3 +38,23 @@ message LeaveRequest { // Approval status: "APPROVED" in this demo (always approved once validated). string status = 2; } + +message GrantTimeOffRequest { + string employee_id = 1; + // Annual PTO policy allotment in days. + int32 policy_days = 2; +} + +message GrantTimeOffResponse { + TimeOffGrant grant = 1; +} + +message RevokeTimeOffRequest { + string employee_id = 1; +} + +message TimeOffGrant { + string employee_id = 1; + // Annual PTO policy allotment in days. + int32 policy_days = 2; +} diff --git a/hris/src/empty.ts b/hris/src/empty.ts new file mode 100644 index 0000000..6657ea6 --- /dev/null +++ b/hris/src/empty.ts @@ -0,0 +1,19 @@ +/** + * Small helper for RPCs that return `google.protobuf.Empty`. + * + * The onboarding-saga compensations (revokeAccess, teardownPayroll, + * revokeTimeOff) are declared `returns (google.protobuf.Empty)` in proto. Their + * handlers must return an `Empty` message instance; this wraps the one-liner so + * each service does not repeat the well-known-type import. + * + * @module empty + */ + +import { create } from "@bufbuild/protobuf"; +import type { Empty } from "@bufbuild/protobuf/wkt"; +import { EmptySchema } from "@bufbuild/protobuf/wkt"; + +/** Build an empty `google.protobuf.Empty` message. */ +export function empty(): Empty { + return create(EmptySchema, {}); +} diff --git a/hris/src/server.ts b/hris/src/server.ts index 22b1f04..aec5850 100644 --- a/hris/src/server.ts +++ b/hris/src/server.ts @@ -30,10 +30,15 @@ import { serviceCatalog } from "#gen/catalog.gen.ts"; import { createDb } from "#db/client.ts"; import type { Db } from "#db/client.ts"; import { buildEventBus } from "#eventBus.ts"; +import { accessService } from "#services/accessService.ts"; import { createDirectoryService } from "#services/directoryService.ts"; +import { createOnboardingService } from "#services/onboardingService.ts"; +import type { OnboardingWorkflowClient } from "#services/onboardingService.ts"; import { payrollService } from "#services/payrollService.ts"; import { makeTimeOffService } from "#services/timeOffService.ts"; -import { resolveTopology } from "#topology.ts"; +import { TEMPORAL_TASK_QUEUE } from "#temporal/config.ts"; +import { createWorkflowClient } from "#temporal/workflowClient.ts"; +import { resolveTopology, TYPE_NAMES } from "#topology.ts"; import type { Topology } from "#topology.ts"; /** Options for {@link buildServer}. */ @@ -56,6 +61,18 @@ export interface BuildServerOptions { * `ctx.call` to GetEmployee runs in-process). */ readonly db?: Db; + /** + * Temporal client override for OnboardingService. + * + * - `undefined` (default): when the role hosts OnboardingService, a real + * LAZY `@temporalio/client` `WorkflowClient` is built (no socket until the + * first start/query — so the server starts and the pre-check e2e runs + * without a live Temporal). When the role does NOT host OnboardingService, + * no client is built. + * - a value: injected verbatim (tests pass a stub; pass `null` to force the + * "Temporal not configured" path even when OnboardingService is mounted). + */ + readonly workflowClient?: OnboardingWorkflowClient | null; } /** @@ -75,10 +92,28 @@ export function buildServer(options: BuildServerOptions = {}): Server { const db = options.db ?? createDb(); const directoryService = createDirectoryService(db); + // Temporal client for the onboarding saga. Built only when this role hosts + // OnboardingService and no override was given. The default client is LAZY + // (no socket until the first start/query), so the server starts and the + // pre-check e2e runs without a live Temporal server. An explicit `null` + // forces the "Temporal not configured" path; an explicit value (a test + // stub) is used verbatim. + const hostsOnboarding = topology.localTypeNames.includes(TYPE_NAMES.onboarding); + let workflowClient: OnboardingWorkflowClient | undefined; + if (options.workflowClient !== undefined) { + workflowClient = options.workflowClient ?? undefined; + } else if (hostsOnboarding) { + // The concrete WorkflowClient structurally satisfies the narrow + // OnboardingWorkflowClient port (start/getHandle/query/describe); the + // cast adapts its richer generic overloads to the port the handler uses. + workflowClient = createWorkflowClient() as unknown as OnboardingWorkflowClient; + } + const onboardingService = createOnboardingService({ workflowClient, taskQueue: TEMPORAL_TASK_QUEUE }); + return createServer({ - // All three definitions are always passed; `enabledServices` decides + // All service definitions are always passed; `enabledServices` decides // which are mounted locally vs reached via the resolver. - services: [directoryService, makeTimeOffService(eventBus), payrollService], + services: [directoryService, makeTimeOffService(eventBus), payrollService, accessService, onboardingService], catalog: serviceCatalog, enabledServices: topology.enabledServices, remoteResolver: topology.remoteResolver, diff --git a/hris/src/services/accessService.ts b/hris/src/services/accessService.ts new file mode 100644 index 0000000..635fb7e --- /dev/null +++ b/hris/src/services/accessService.ts @@ -0,0 +1,67 @@ +/** + * AccessService — provisions and revokes system access (the IT side of + * onboarding). + * + * A leaf service with an in-memory account ledger. It is the onboarding saga's + * fourth forward step: `provisionAccess` creates an account for the new hire, + * and its compensation `revokeAccess` removes it. Both are IDEMPOTENT so a saga + * unwind that runs after a partially-applied step is a no-op success. + * + * @module services/accessService + */ + +import { create } from "@bufbuild/protobuf"; +import { defineService } from "@connectum/core"; +import { AccessSchema, AccessService, ProvisionAccessResponseSchema } from "#gen/access/v1/access_pb.ts"; +import { empty } from "#empty.ts"; + +/** A provisioned account record keyed by employee id. */ +interface AccessRecord { + employeeId: string; + accountId: string; + email: string; +} + +/** Demo account ledger (employee id → provisioned account). */ +const accounts = new Map(); + +/** Reset the ledger — used between tests. */ +export function resetAccess(): void { + accounts.clear(); +} + +/** Number of provisioned accounts (used by tests to assert side effects). */ +export function accessCount(): number { + return accounts.size; +} + +/** True when an account exists for `employeeId` (test/inspection helper). */ +export function isProvisioned(employeeId: string): boolean { + return accounts.has(employeeId); +} + +/** Derive a stable demo account id for an employee. */ +function accountIdFor(employeeId: string): string { + return `acct-${employeeId}`; +} + +export const accessService = defineService(AccessService, { + // Provision (or return the existing) account. Idempotent by employee id. + provisionAccess(req) { + let record = accounts.get(req.employeeId); + if (record === undefined) { + record = { employeeId: req.employeeId, accountId: accountIdFor(req.employeeId), email: req.email }; + accounts.set(req.employeeId, record); + } + return create(ProvisionAccessResponseSchema, { + access: create(AccessSchema, { employeeId: record.employeeId, accountId: record.accountId, provisioned: true }), + }); + }, + + // Compensation — remove the account. Idempotent: a no-op for an unknown + // employee (returns success either way). + revokeAccess(req) { + accounts.delete(req.employeeId); + return empty(); + }, +}); diff --git a/hris/src/services/directoryService.ts b/hris/src/services/directoryService.ts index 3390b19..3a12a0b 100644 --- a/hris/src/services/directoryService.ts +++ b/hris/src/services/directoryService.ts @@ -30,9 +30,9 @@ import { Code, ConnectError } from "@connectrpc/connect"; import { defineService } from "@connectum/core"; import type { ServiceDefinition } from "@connectum/core"; import { and, asc, eq, gt } from "drizzle-orm"; -import { DirectoryService, EmployeeSchema, GetEmployeeResponseSchema } from "#gen/directory/v1/directory_pb.ts"; +import { ActivateEmployeeResponseSchema, CreateEmployeeResponseSchema, DirectoryService, EmployeeSchema, GetEmployeeResponseSchema, OffboardEmployeeResponseSchema } from "#gen/directory/v1/directory_pb.ts"; import type { Employee } from "#gen/directory/v1/directory_pb.ts"; -import { employees } from "#db/schema.ts"; +import { employees, EmployeeStatus } from "#db/schema.ts"; import type { Db } from "#db/client.ts"; import type { EmployeeRow } from "#db/schema.ts"; @@ -101,5 +101,56 @@ export function createDirectoryService(db: Db): ServiceDefinition { yield toEmployee(row); } }, + + // ── Onboarding saga RPCs (driven by the Temporal worker's activities) ── + + // Step 1 — insert a new employee in "onboarding" status. Uses an ATOMIC + // `insert ... on conflict do nothing returning`: an empty result means + // the id was already taken, surfaced as Code.AlreadyExists (race-free, + // and never leaking a raw DB unique-violation). The workflow treats this + // business failure as non-retryable. + async createEmployee(req) { + const inserted = await db + .insert(employees) + .values({ + id: req.id, + name: req.name, + email: req.email, + title: req.title, + department: req.department, + managerId: req.managerId !== "" ? req.managerId : null, + status: EmployeeStatus.ONBOARDING, + }) + .onConflictDoNothing() + .returning(); + + const row = inserted[0]; + if (row === undefined) { + throw new ConnectError(`Employee with id "${req.id}" already exists.`, Code.AlreadyExists); + } + return create(CreateEmployeeResponseSchema, { employee: toEmployee(row) }); + }, + + // Terminal step — flip "onboarding" → "active". Idempotent: re-activating + // an already-active employee returns the same row. NOT_FOUND for an + // unknown id (in-saga this never happens — activation follows creation). + async activateEmployee(req) { + const updated = await db.update(employees).set({ status: EmployeeStatus.ACTIVE, updatedAt: new Date() }).where(eq(employees.id, req.id)).returning(); + const row = updated[0]; + if (row === undefined) { + throw new ConnectError(`No employee with id "${req.id}".`, Code.NotFound); + } + return create(ActivateEmployeeResponseSchema, { employee: toEmployee(row) }); + }, + + // Compensation for step 1 — mark "offboarded". Idempotent: a no-op + // success for an unknown id (the saga only offboards a row it created, + // so this defends against a double unwind). + async offboardEmployee(req) { + const updated = await db.update(employees).set({ status: EmployeeStatus.OFFBOARDED, updatedAt: new Date() }).where(eq(employees.id, req.id)).returning(); + const row = updated[0]; + const employee = row !== undefined ? toEmployee(row) : create(EmployeeSchema, { id: req.id, status: EmployeeStatus.OFFBOARDED }); + return create(OffboardEmployeeResponseSchema, { employee }); + }, }); } diff --git a/hris/src/services/onboardingService.ts b/hris/src/services/onboardingService.ts new file mode 100644 index 0000000..716c338 --- /dev/null +++ b/hris/src/services/onboardingService.ts @@ -0,0 +1,148 @@ +/** + * OnboardingService — the edge orchestrator, backed by a durable Temporal saga. + * + * - `OnboardEmployee` runs a SYNCHRONOUS availability pre-check via + * `ctx.call("directory.v1.DirectoryService/GetEmployee", …)`. Unlike the + * car-sharing trip pre-check (which requires the vehicle to EXIST), this one + * is INVERTED: the employee id must be FREE. A directory hit → the id is + * taken → `Code.AlreadyExists`; a `Code.NotFound` → the id is free, proceed. + * Any OTHER error (e.g. the directory is `Code.Unavailable`) PROPAGATES — it + * is not treated as "free to go". Only AFTER the pre-check passes does it + * START the durable `OnboardingWorkflow` and return `{ onboarding: { …, + * status: STARTED }, workflowId }`. The long-running saga (create → payroll → + * timeoff → access → activate, with automatic compensation) runs in Temporal, + * not in this handler — and because the pre-check runs first, the error path + * needs no live Temporal. + * - `GetOnboarding` reads LIVE status from the workflow via a Temporal Workflow + * Query (`handle.query(getOnboardingStatusQuery)`), falling back to a terminal + * status derived from `handle.describe()` once the workflow has closed. + * + * The Temporal client is INJECTED via the {@link createOnboardingService} + * factory, so the server supplies a lazy `@temporalio/client` `WorkflowClient` in + * production and tests inject a stub. When no client is configured (a non- + * onboarding role, or the server-only e2e), `OnboardEmployee`'s workflow start + * and `GetOnboarding` raise `Code.Unavailable` — but the pre-check still runs + * first, so the error-path e2e needs no live Temporal. + * + * @module services/onboardingService + */ + +import { create } from "@bufbuild/protobuf"; +import { Code, ConnectError } from "@connectrpc/connect"; +import type { ServiceDefinition } from "@connectum/core"; +import { defineService } from "@connectum/core"; +import { GetEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; +import { GetOnboardingResponseSchema, OnboardEmployeeResponseSchema, OnboardingSchema, OnboardingService } from "#gen/onboarding/v1/onboarding_pb.ts"; +import type { OnboardingStatus as OnboardingStatusT } from "#temporal/onboardingStatus.ts"; +import { OnboardingStatus } from "#temporal/onboardingStatus.ts"; +import type { OnboardingWorkflowInput } from "#temporal/workflows.ts"; + +// ── Temporal client port ──────────────────────────────────────────────────── +// A minimal structural interface over the `@temporalio/client` `WorkflowClient` +// the handler actually uses. Typing against a port (not the concrete class) +// keeps the service import-light and lets tests inject a stub. The real +// `WorkflowClient` satisfies this shape. + +/** A handle subset: query the live status and describe the (possibly closed) run. */ +export interface OnboardingWorkflowHandle { + query(queryName: string): Promise; + describe(): Promise<{ status: { name: string } }>; +} + +/** The Temporal client subset the onboarding handler depends on. */ +export interface OnboardingWorkflowClient { + start(workflowType: string, options: { taskQueue: string; workflowId: string; args: [OnboardingWorkflowInput] }): Promise<{ workflowId: string }>; + getHandle(workflowId: string): OnboardingWorkflowHandle; +} + +/** Options for {@link createOnboardingService}. */ +export interface OnboardingServiceOptions { + /** + * Temporal client used to start the saga and read status. Optional: when + * absent, the pre-check still runs, but starting the workflow / reading + * status raises `Code.Unavailable` (so non-onboarding roles and the + * server-only e2e build and run without Temporal). + */ + readonly workflowClient?: OnboardingWorkflowClient; + /** Task queue to start workflows on. */ + readonly taskQueue: string; +} + +/** Map a closed-workflow status name to a terminal onboarding status. */ +function terminalStatusFor(workflowStatusName: string): OnboardingStatusT { + // A completed saga activated the employee; anything else (FAILED/CANCELLED/ + // TERMINATED/TIMED_OUT) means the saga unwound. + return workflowStatusName === "COMPLETED" ? OnboardingStatus.COMPLETED : OnboardingStatus.FAILED; +} + +/** + * Build the OnboardingService definition with an injected Temporal client. + * + * @param options - {@link OnboardingServiceOptions}. + */ +export function createOnboardingService(options: OnboardingServiceOptions): ServiceDefinition { + const { workflowClient, taskQueue } = options; + + return defineService(OnboardingService, { + async onboardEmployee(req, ctx) { + // INVERTED availability pre-check — the employee id must be FREE. + // A directory hit means the id is taken (AlreadyExists); a NotFound + // means it is free (proceed). Any OTHER error propagates unchanged — + // an Unavailable directory is NOT treated as "free to go". This runs + // BEFORE any Temporal use, so the error path needs no live Temporal. + try { + await ctx.call("directory.v1.DirectoryService/GetEmployee", create(GetEmployeeRequestSchema, { id: req.employeeId })); + // The call resolved → the employee already exists. + throw new ConnectError(`Employee "${req.employeeId}" already exists.`, Code.AlreadyExists); + } catch (err) { + if (err instanceof ConnectError && err.code === Code.NotFound) { + // Free id — fall through to start the workflow. + } else { + // AlreadyExists (raised above) and any other error (e.g. + // Unavailable) propagate unchanged. + throw err; + } + } + + if (workflowClient === undefined) { + throw new ConnectError("Temporal is not configured — cannot start the onboarding workflow.", Code.Unavailable); + } + + // The workflow id IS the employee id, so GetOnboarding can map an + // employee id straight to its workflow. + const handle = await workflowClient.start("OnboardingWorkflow", { + taskQueue, + workflowId: req.employeeId, + args: [{ employeeId: req.employeeId, name: req.name, email: req.email, title: req.title, department: req.department, managerId: req.managerId }], + }); + + return create(OnboardEmployeeResponseSchema, { + onboarding: create(OnboardingSchema, { employeeId: req.employeeId, status: OnboardingStatus.STARTED }), + workflowId: handle.workflowId, + }); + }, + + async getOnboarding(req) { + if (workflowClient === undefined) { + throw new ConnectError("Temporal is not configured — cannot read onboarding status.", Code.Unavailable); + } + + const handle = workflowClient.getHandle(req.employeeId); + + // Prefer the LIVE status from the running workflow's Query. If the + // workflow has closed (queries are rejected on closed runs), fall + // back to a terminal status derived from describe(). + let status: OnboardingStatusT; + try { + status = await handle.query("getOnboardingStatus"); + } catch { + const description = await handle.describe(); + status = terminalStatusFor(description.status.name); + } + + return create(GetOnboardingResponseSchema, { + onboarding: create(OnboardingSchema, { employeeId: req.employeeId, status }), + }); + }, + }); +} diff --git a/hris/src/services/payrollService.ts b/hris/src/services/payrollService.ts index 36aef86..90c0fde 100644 --- a/hris/src/services/payrollService.ts +++ b/hris/src/services/payrollService.ts @@ -16,7 +16,8 @@ import { create } from "@bufbuild/protobuf"; import { defineService } from "@connectum/core"; import type { EventRoute } from "@connectum/events"; -import { BalanceSchema, GetBalanceResponseSchema, PayrollEventHandlers, PayrollService } from "#gen/payroll/v1/payroll_pb.ts"; +import { BalanceSchema, GetBalanceResponseSchema, PayrollEventHandlers, PayrollService, SetupPayrollResponseSchema } from "#gen/payroll/v1/payroll_pb.ts"; +import { empty } from "#empty.ts"; /** Initial leave-days balance per employee (id → days). */ const INITIAL_BALANCE: ReadonlyArray = [ @@ -39,6 +40,25 @@ export const payrollService = defineService(PayrollService, { create(GetBalanceResponseSchema, { balance: create(BalanceSchema, { employeeId: req.employeeId, remainingDays: balances.get(req.employeeId) ?? 0 }), }), + + // Onboarding saga step 2 — enroll a new hire with an initial leave balance. + // Idempotent: if already enrolled, the existing balance is kept (a re-run + // does not reset a balance OnLeaveApproved may have since decremented). + setupPayroll(req) { + if (!balances.has(req.employeeId)) { + balances.set(req.employeeId, req.initialDays); + } + return create(SetupPayrollResponseSchema, { + balance: create(BalanceSchema, { employeeId: req.employeeId, remainingDays: balances.get(req.employeeId) ?? 0 }), + }); + }, + + // Compensation for step 2 — drop the payroll enrollment. Idempotent: a + // no-op success for an unknown employee. + teardownPayroll(req) { + balances.delete(req.employeeId); + return empty(); + }, }); /** diff --git a/hris/src/services/timeOffService.ts b/hris/src/services/timeOffService.ts index f23bcf1..1ff9a85 100644 --- a/hris/src/services/timeOffService.ts +++ b/hris/src/services/timeOffService.ts @@ -24,8 +24,27 @@ import { defineService, type ServiceDefinition } from "@connectum/core"; import type { EventBus } from "@connectum/events"; import { GetEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; import { LeaveApprovedSchema } from "#gen/payroll/v1/payroll_pb.ts"; -import { LeaveRequestSchema, RequestLeaveResponseSchema, TimeOffService } from "#gen/timeoff/v1/timeoff_pb.ts"; +import { GrantTimeOffResponseSchema, LeaveRequestSchema, RequestLeaveResponseSchema, TimeOffGrantSchema, TimeOffService } from "#gen/timeoff/v1/timeoff_pb.ts"; import { LEAVE_APPROVED_TOPIC } from "#events.ts"; +import { empty } from "#empty.ts"; + +/** + * PTO policy-grant ledger (employee id → annual allotment in days). Distinct + * from PayrollService's decrementable leave BALANCE — this is the entitlement + * assigned at onboarding. Module-level so the onboarding saga's GrantTimeOff / + * RevokeTimeOff share it regardless of which `makeTimeOffService` instance runs. + */ +const grants = new Map(); + +/** Reset the PTO grant ledger — used between tests. */ +export function resetGrants(): void { + grants.clear(); +} + +/** Read an employee's PTO policy grant in days (test/inspection helper). */ +export function timeOffGrant(employeeId: string): number | undefined { + return grants.get(employeeId); +} /** * Build the TimeOffService definition bound to a specific EventBus instance. @@ -53,5 +72,23 @@ export function makeTimeOffService(eventBus: EventBus): ServiceDefinition { leaveRequest: create(LeaveRequestSchema, { id: leaveRequestId, status: "APPROVED" }), }); }, + + // Onboarding saga step 3 — assign the new hire their annual PTO policy + // allotment. Idempotent: re-granting keeps the existing grant. + grantTimeOff(req) { + if (!grants.has(req.employeeId)) { + grants.set(req.employeeId, req.policyDays); + } + return create(GrantTimeOffResponseSchema, { + grant: create(TimeOffGrantSchema, { employeeId: req.employeeId, policyDays: grants.get(req.employeeId) ?? 0 }), + }); + }, + + // Compensation for step 3 — revoke the PTO grant. Idempotent: a no-op + // success for an unknown employee. + revokeTimeOff(req) { + grants.delete(req.employeeId); + return empty(); + }, }); } diff --git a/hris/src/temporal/activities.ts b/hris/src/temporal/activities.ts new file mode 100644 index 0000000..0463c11 --- /dev/null +++ b/hris/src/temporal/activities.ts @@ -0,0 +1,150 @@ +/** + * Temporal activities — the onboarding saga's side effects, each a ConnectRPC + * call. + * + * Activities run in the worker's Node process (NOT the deterministic workflow + * sandbox), so they may freely create ConnectRPC clients and do I/O. Each + * activity is one RPC against a role service over the network (`*_ADDR`). The + * workflow (`workflows.ts`) only `proxyActivities` these and never touches a + * client itself. + * + * Activities are grouped: + * - forward steps: createEmployee, setupPayroll, grantTimeOff, provisionAccess, + * activateEmployee. + * - compensations: offboardEmployee, teardownPayroll, revokeTimeOff, + * revokeAccess — all IDEMPOTENT (the services no-op on already-undone state), + * since a compensation may run after a forward step partially applied. + * + * The business failure of the very first step (the employee id is already taken) + * is rethrown as a NON-RETRYABLE `ApplicationFailure` so Temporal fails the + * workflow fast (no pointless retries, no compensation — nothing was created). + * Transient/infra failures of every other step stay retryable, which is the + * whole point of the durable saga, so they are NOT marked non-retryable here. + * + * @module temporal/activities + */ + +import { create } from "@bufbuild/protobuf"; +import { Code, ConnectError } from "@connectrpc/connect"; +import { ApplicationFailure } from "@temporalio/activity"; +import { ProvisionAccessRequestSchema, RevokeAccessRequestSchema } from "#gen/access/v1/access_pb.ts"; +import { ActivateEmployeeRequestSchema, CreateEmployeeRequestSchema, OffboardEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; +import { SetupPayrollRequestSchema, TeardownPayrollRequestSchema } from "#gen/payroll/v1/payroll_pb.ts"; +import { GrantTimeOffRequestSchema, RevokeTimeOffRequestSchema } from "#gen/timeoff/v1/timeoff_pb.ts"; +import type { ServiceClients } from "#temporal/clients.ts"; +import { createServiceClients } from "#temporal/clients.ts"; + +/** + * Error type for a non-retryable, business failure of step 1. The workflow + * lists this string in `nonRetryableErrorTypes` (by the SAME literal value) and + * surfaces it as a terminal workflow failure (preserving the ALREADY_EXISTS + * meaning). + * + * NOT exported: only used inside this module, and keeping the activities + * namespace function-only (so `import * as activities` passed to `Worker.create` + * carries no non-function entry). + */ +const EMPLOYEE_EXISTS = "EmployeeExists" as const; + +/** Initial leave-days balance a new hire is enrolled with (demo policy). */ +const INITIAL_PAYROLL_DAYS = 25; +/** Annual PTO policy allotment granted on onboarding (demo policy). */ +const PTO_POLICY_DAYS = 25; + +/** Lazily-built shared clients (one transport set per worker process). */ +let sharedClients: ServiceClients | undefined; + +/** Get (or build once) the worker's service clients. */ +function clients(): ServiceClients { + if (sharedClients === undefined) { + sharedClients = createServiceClients(); + } + return sharedClients; +} + +/** Details of the new hire threaded through the forward steps. */ +export interface NewHire { + readonly employeeId: string; + readonly name: string; + readonly email: string; + readonly title: string; + readonly department: string; + readonly managerId: string; +} + +// ── Forward steps ───────────────────────────────────────────────────────── + +/** + * Step 1 — create the directory row (status "onboarding"). A business failure + * (the id is already taken → `Code.AlreadyExists`) is rethrown as a + * NON-RETRYABLE `ApplicationFailure(EMPLOYEE_EXISTS)` so the workflow fails fast + * with no compensation; any other (infra) error stays retryable. + * + * @param hire - The {@link NewHire} details. + */ +export async function createEmployee(hire: NewHire): Promise { + try { + await clients().directory.createEmployee( + create(CreateEmployeeRequestSchema, { + id: hire.employeeId, + name: hire.name, + email: hire.email, + title: hire.title, + department: hire.department, + managerId: hire.managerId, + }), + ); + } catch (err) { + if (err instanceof ConnectError && err.code === Code.AlreadyExists) { + throw ApplicationFailure.create({ + message: err.message, + type: EMPLOYEE_EXISTS, + nonRetryable: true, + }); + } + throw err; + } +} + +/** Compensation for step 1 — mark the directory row "offboarded". Idempotent. */ +export async function offboardEmployee(input: { employeeId: string }): Promise { + await clients().directory.offboardEmployee(create(OffboardEmployeeRequestSchema, { id: input.employeeId })); +} + +/** Step 2 — enroll the new hire in payroll (initial leave balance). */ +export async function setupPayroll(input: { employeeId: string }): Promise { + await clients().payroll.setupPayroll(create(SetupPayrollRequestSchema, { employeeId: input.employeeId, initialDays: INITIAL_PAYROLL_DAYS })); +} + +/** Compensation for step 2 — remove the payroll enrollment. Idempotent. */ +export async function teardownPayroll(input: { employeeId: string }): Promise { + await clients().payroll.teardownPayroll(create(TeardownPayrollRequestSchema, { employeeId: input.employeeId })); +} + +/** Step 3 — assign the new hire their annual PTO policy grant. */ +export async function grantTimeOff(input: { employeeId: string }): Promise { + await clients().timeoff.grantTimeOff(create(GrantTimeOffRequestSchema, { employeeId: input.employeeId, policyDays: PTO_POLICY_DAYS })); +} + +/** Compensation for step 3 — revoke the PTO policy grant. Idempotent. */ +export async function revokeTimeOff(input: { employeeId: string }): Promise { + await clients().timeoff.revokeTimeOff(create(RevokeTimeOffRequestSchema, { employeeId: input.employeeId })); +} + +/** Step 4 — provision system access (the IT account) for the new hire. */ +export async function provisionAccess(input: { employeeId: string; email: string }): Promise { + await clients().access.provisionAccess(create(ProvisionAccessRequestSchema, { employeeId: input.employeeId, email: input.email })); +} + +/** Compensation for step 4 — revoke the system account. Idempotent. */ +export async function revokeAccess(input: { employeeId: string }): Promise { + await clients().access.revokeAccess(create(RevokeAccessRequestSchema, { employeeId: input.employeeId })); +} + +/** + * Step 5 — activate the employee (directory status "onboarding" → "active"). + * The terminal happy-path step; no compensation (success is final). + */ +export async function activateEmployee(input: { employeeId: string }): Promise { + await clients().directory.activateEmployee(create(ActivateEmployeeRequestSchema, { id: input.employeeId })); +} diff --git a/hris/src/temporal/clients.ts b/hris/src/temporal/clients.ts new file mode 100644 index 0000000..2c32d4c --- /dev/null +++ b/hris/src/temporal/clients.ts @@ -0,0 +1,61 @@ +/** + * ConnectRPC client factory for the Temporal activities. + * + * The worker is a separate process with NO Connectum `Server`, so the in-process + * `ctx.call` / `server.localClient` facilities are unavailable there. Activities + * therefore reach the role services as a plain network client — exactly the + * example's split-topology story (`*_ADDR` env), just initiated from the worker + * instead of a request handler. + * + * The clients carry no Authorization header — the HRIS edge has no auth chain + * (the real trust boundary is the mesh). Each activity is one RPC against one + * role service over the network. + * + * @module temporal/clients + */ + +import type { Client } from "@connectrpc/connect"; +import { createClient } from "@connectrpc/connect"; +import { createGrpcTransport } from "@connectrpc/connect-node"; +import { AccessService } from "#gen/access/v1/access_pb.ts"; +import { DirectoryService } from "#gen/directory/v1/directory_pb.ts"; +import { PayrollService } from "#gen/payroll/v1/payroll_pb.ts"; +import { TimeOffService } from "#gen/timeoff/v1/timeoff_pb.ts"; + +/** Default endpoints for a local `docker compose up` (one role per service). */ +const DEFAULT_DIRECTORY_ADDR = "http://localhost:5001"; +const DEFAULT_PAYROLL_ADDR = "http://localhost:5002"; +const DEFAULT_TIMEOFF_ADDR = "http://localhost:5003"; +const DEFAULT_ACCESS_ADDR = "http://localhost:5004"; + +/** The typed clients the activities use to drive the onboarding saga's RPCs. */ +export interface ServiceClients { + readonly directory: Client; + readonly payroll: Client; + readonly timeoff: Client; + readonly access: Client; +} + +/** + * Build the directory/payroll/timeoff/access clients from the `*_ADDR` env + * convention. + * + * `createGrpcTransport({ baseUrl })` requires a full URL (`http://host:port`), + * the same shape `DIRECTORY_ADDR`/`PAYROLL_ADDR`/`TIMEOFF_ADDR`/`ACCESS_ADDR` + * carry in k8s/compose. + * + * @param env - Process env to read endpoints from (defaults to `process.env`). + */ +export function createServiceClients(env: NodeJS.ProcessEnv = process.env): ServiceClients { + const directoryAddr = env.DIRECTORY_ADDR ?? DEFAULT_DIRECTORY_ADDR; + const payrollAddr = env.PAYROLL_ADDR ?? DEFAULT_PAYROLL_ADDR; + const timeoffAddr = env.TIMEOFF_ADDR ?? DEFAULT_TIMEOFF_ADDR; + const accessAddr = env.ACCESS_ADDR ?? DEFAULT_ACCESS_ADDR; + + return { + directory: createClient(DirectoryService, createGrpcTransport({ baseUrl: directoryAddr })), + payroll: createClient(PayrollService, createGrpcTransport({ baseUrl: payrollAddr })), + timeoff: createClient(TimeOffService, createGrpcTransport({ baseUrl: timeoffAddr })), + access: createClient(AccessService, createGrpcTransport({ baseUrl: accessAddr })), + }; +} diff --git a/hris/src/temporal/config.ts b/hris/src/temporal/config.ts new file mode 100644 index 0000000..6279c23 --- /dev/null +++ b/hris/src/temporal/config.ts @@ -0,0 +1,23 @@ +/** + * Temporal configuration — the single source of truth for the durable layer. + * + * Read once from env so the onboarding gateway (which builds a + * `@temporalio/client`), the worker (`@temporalio/worker`), and the dockerless + * tests all agree on the connection target, namespace, and task queue. The + * defaults make a local `docker compose up` (temporalio/auto-setup on `:7233`, + * namespace `default`) work with no extra env. + * + * @module temporal/config + */ + +/** Temporal frontend gRPC address (host:port). Compose sets `temporal:7233`. */ +export const TEMPORAL_ADDRESS = process.env.TEMPORAL_ADDRESS ?? "localhost:7233"; + +/** Temporal namespace. `temporalio/auto-setup` creates `default`. */ +export const TEMPORAL_NAMESPACE = process.env.TEMPORAL_NAMESPACE ?? "default"; + +/** + * Task queue the worker polls and the gateway targets when starting workflows. + * Both sides MUST agree, or started workflows are never picked up. + */ +export const TEMPORAL_TASK_QUEUE = process.env.TEMPORAL_TASK_QUEUE ?? "hris-onboarding"; diff --git a/hris/src/temporal/onboardingStatus.ts b/hris/src/temporal/onboardingStatus.ts new file mode 100644 index 0000000..7450a52 --- /dev/null +++ b/hris/src/temporal/onboardingStatus.ts @@ -0,0 +1,34 @@ +/** + * Onboarding lifecycle status domain — a side-effect-free `const` object. + * + * This module is imported by BOTH the Temporal workflow (`workflows.ts`, which + * runs in the deterministic, bundled sandbox) and the Node-side services / + * activities. It therefore MUST stay import-clean: no Node built-ins, no + * Connectum/generated runtime, no Drizzle — only this literal map. Importing a + * module with side effects (e.g. `db/schema.ts`, which pulls in drizzle) into + * the workflow bundle would break determinism, so the onboarding-status domain + * lives here on its own. + * + * The status is a plain string on the wire (no proto enum — the example's + * `erasableSyntaxOnly` tsconfig rejects the native `enum` protoc-gen-es emits), + * with the domain pinned by this `const` object (ADR-001: no `enum`). + * + * @module temporal/onboardingStatus + */ + +/** + * Onboarding saga lifecycle states. + * + * - `STARTED` — the saga is running (provisioning the new hire across services). + * - `COMPLETED` — every forward step succeeded and the employee is now active; + * the terminal happy-path state. + * - `FAILED` — a step failed and the saga rolled back (compensations ran). + */ +export const OnboardingStatus = { + STARTED: "STARTED", + COMPLETED: "COMPLETED", + FAILED: "FAILED", +} as const; + +/** One of the {@link OnboardingStatus} string values. */ +export type OnboardingStatus = (typeof OnboardingStatus)[keyof typeof OnboardingStatus]; diff --git a/hris/src/temporal/workflowClient.ts b/hris/src/temporal/workflowClient.ts new file mode 100644 index 0000000..780776b --- /dev/null +++ b/hris/src/temporal/workflowClient.ts @@ -0,0 +1,31 @@ +/** + * Lazy Temporal client factory for the gateway (OnboardingService) role. + * + * Builds a `@temporalio/client` `WorkflowClient` over a LAZY connection + * (`Connection.lazy` opens no socket until the first call). That laziness is + * what lets the server START — and the pre-check e2e run — WITHOUT a live + * Temporal server: only an actual `OnboardEmployee` that reaches + * `workflowClient.start` (or a `GetOnboarding` query) touches Temporal. + * + * `@temporalio/client` is pure JS (no core-bridge native addon), so importing it + * here keeps every RPC role on its no-build native-TS run model — only + * `src/worker.ts` pulls the native worker. + * + * @module temporal/workflowClient + */ + +import { Client, Connection } from "@temporalio/client"; +import { TEMPORAL_ADDRESS, TEMPORAL_NAMESPACE } from "#temporal/config.ts"; + +/** The `WorkflowClient` shape (a subset is consumed by `createOnboardingService`). */ +export type WorkflowClient = Client["workflow"]; + +/** + * Create a `WorkflowClient` over a lazy connection. No socket opens until the + * first workflow start/query, so this is safe to construct in any topology. + */ +export function createWorkflowClient(): WorkflowClient { + const connection = Connection.lazy({ address: TEMPORAL_ADDRESS }); + const client = new Client({ connection, namespace: TEMPORAL_NAMESPACE }); + return client.workflow; +} diff --git a/hris/src/temporal/workflows.ts b/hris/src/temporal/workflows.ts new file mode 100644 index 0000000..f03648b --- /dev/null +++ b/hris/src/temporal/workflows.ts @@ -0,0 +1,133 @@ +/** + * OnboardingWorkflow — the durable new-hire onboarding saga (Temporal workflow + * code). + * + * This file is the worker's `workflowsPath` target: it is bundled (webpack + + * swc) and runs in Temporal's DETERMINISTIC sandbox. It therefore imports ONLY + * `@temporalio/workflow`, the activity *types*, and the side-effect-free + * `onboardingStatus.ts` — never Node built-ins, Connectum, or the generated + * runtime (that would break determinism). All I/O is in the activities; the + * workflow only orchestrates. + * + * Saga (compensation-stack pattern, samples-repo style): run the forward steps + * in order; after each side-effecting step `unshift` its compensation onto a + * stack; on ANY failure, run the compensations in LIFO order (each wrapped in + * its own try/catch so the unwind never throws), then rethrow the original + * error. The result: + * - provisionAccess fails → teardownPayroll → ... wait, LIFO from the failing + * point: revokeTimeOff → teardownPayroll → offboardEmployee + * - grantTimeOff fails → teardownPayroll → offboardEmployee + * - setupPayroll fails → offboardEmployee + * - createEmployee fails → (non-retryable) fail fast, NOTHING to compensate. + * + * activateEmployee (step 5) pushes NO compensation: it is the terminal + * happy-path step, so a success is final and there is nothing after it to roll + * back. + * + * Live status is exposed via `getOnboardingStatusQuery` so the gateway's + * GetOnboarding can read it with `handle.query(getOnboardingStatusQuery)`. + * + * @module temporal/workflows + */ + +import { ApplicationFailure, defineQuery, log, proxyActivities, setHandler } from "@temporalio/workflow"; +import type * as activities from "#temporal/activities.ts"; +import type { OnboardingStatus as OnboardingStatusT } from "#temporal/onboardingStatus.ts"; +import { OnboardingStatus } from "#temporal/onboardingStatus.ts"; + +/** Input to {@link OnboardingWorkflow}. */ +export interface OnboardingWorkflowInput { + readonly employeeId: string; + readonly name: string; + readonly email: string; + readonly title: string; + readonly department: string; + readonly managerId: string; +} + +/** + * Query for the onboarding's live status, read from outside via + * `handle.query(getOnboardingStatusQuery)`. Returns the current + * {@link OnboardingStatus}. + */ +export const getOnboardingStatusQuery = defineQuery("getOnboardingStatus"); + +/** A single compensation: a label (for assertions/logs) and its undo action. */ +interface Compensation { + readonly name: string; + readonly run: () => Promise; +} + +/** + * Activity proxies. `createEmployee`'s business failure is non-retryable (the + * activity rethrows `ApplicationFailure(EmployeeExists)`); every other step + * keeps Temporal's default retry policy, which is the durability the saga + * demonstrates. + */ +const acts = proxyActivities({ + startToCloseTimeout: "30 seconds", + retry: { + initialInterval: "1 second", + maximumAttempts: 5, + // A duplicate-id business failure must not be retried. + nonRetryableErrorTypes: ["EmployeeExists"], + }, +}); + +/** + * Run the onboarding saga for a new hire. + * + * @param input - {@link OnboardingWorkflowInput}. + * @returns the terminal onboarding status (`COMPLETED` on success). + */ +export async function OnboardingWorkflow(input: OnboardingWorkflowInput): Promise { + const { employeeId, name, email, title, department, managerId } = input; + + let status: OnboardingStatusT = OnboardingStatus.STARTED; + setHandler(getOnboardingStatusQuery, () => status); + + // LIFO compensation stack: unshift after each side-effecting forward step. + const compensations: Compensation[] = []; + + try { + // Step 1 — create the directory row (business failure here is + // non-retryable and fails the workflow fast; nothing pushed, nothing to + // undo). + await acts.createEmployee({ employeeId, name, email, title, department, managerId }); + compensations.unshift({ name: "offboardEmployee", run: () => acts.offboardEmployee({ employeeId }) }); + + // Step 2 — enroll in payroll. + await acts.setupPayroll({ employeeId }); + compensations.unshift({ name: "teardownPayroll", run: () => acts.teardownPayroll({ employeeId }) }); + + // Step 3 — grant the annual PTO policy. + await acts.grantTimeOff({ employeeId }); + compensations.unshift({ name: "revokeTimeOff", run: () => acts.revokeTimeOff({ employeeId }) }); + + // Step 4 — provision system access. + await acts.provisionAccess({ employeeId, email }); + compensations.unshift({ name: "revokeAccess", run: () => acts.revokeAccess({ employeeId }) }); + + // Step 5 — activate the employee (onboarding → active). Terminal; no + // compensation. + await acts.activateEmployee({ employeeId }); + status = OnboardingStatus.COMPLETED; + + return status; + } catch (err) { + // Unwind in LIFO order; each compensation is isolated so the unwind + // never throws. Temporal already retried each forward+comp activity. + log.warn("OnboardingWorkflow failed; compensating", { employeeId, error: String(err) }); + for (const comp of compensations) { + try { + await comp.run(); + } catch (compErr) { + log.error("compensation failed (continuing unwind)", { employeeId, compensation: comp.name, error: String(compErr) }); + } + } + status = OnboardingStatus.FAILED; + // Preserve the original failure so it surfaces in temporal-ui / GetOnboarding. + if (err instanceof ApplicationFailure) throw err; + throw ApplicationFailure.create({ message: String(err), type: "OnboardingWorkflowFailed" }); + } +} diff --git a/hris/src/topology.ts b/hris/src/topology.ts index 5ef89de..a12ca0b 100644 --- a/hris/src/topology.ts +++ b/hris/src/topology.ts @@ -19,21 +19,25 @@ import { parseServicesEnv, perServiceEnvResolver } from "@connectum/core"; import type { RemoteResolver } from "@connectum/core"; -/** Canonical proto `typeName`s of the three RPC services. */ +/** Canonical proto `typeName`s of the RPC services. */ export const TYPE_NAMES = { directory: "directory.v1.DirectoryService", timeoff: "timeoff.v1.TimeOffService", payroll: "payroll.v1.PayrollService", + access: "access.v1.AccessService", + onboarding: "onboarding.v1.OnboardingService", } as const; /** All RPC service typeNames (the monolith set). */ -const ALL_TYPE_NAMES: readonly string[] = [TYPE_NAMES.directory, TYPE_NAMES.timeoff, TYPE_NAMES.payroll]; +const ALL_TYPE_NAMES: readonly string[] = [TYPE_NAMES.directory, TYPE_NAMES.timeoff, TYPE_NAMES.payroll, TYPE_NAMES.access, TYPE_NAMES.onboarding]; /** Per-service endpoint env var, consumed by the remote resolver. */ const ENDPOINT_ENV: Readonly> = { [TYPE_NAMES.directory]: "DIRECTORY_ADDR", [TYPE_NAMES.timeoff]: "TIMEOFF_ADDR", [TYPE_NAMES.payroll]: "PAYROLL_ADDR", + [TYPE_NAMES.access]: "ACCESS_ADDR", + [TYPE_NAMES.onboarding]: "ONBOARDING_ADDR", }; /** The resolved deployment topology for this process. */ diff --git a/hris/src/worker.ts b/hris/src/worker.ts new file mode 100644 index 0000000..1d7e3b1 --- /dev/null +++ b/hris/src/worker.ts @@ -0,0 +1,46 @@ +/** + * Temporal worker — the onboarding saga's host process. + * + * This is a NEW process type alongside the RPC roles. It is the ONLY entry that + * imports `@temporalio/worker` (the Rust core-bridge native addon + the + * webpack/swc workflow bundler), so the existing `node src/index.ts` roles keep + * their no-build, native-TS run model untouched. The worker runs its own + * process (`node src/worker.ts`), not a `SERVICES`-selected role — it has no + * inbound RPC; it polls Temporal for workflow/activity tasks. + * + * `Worker.create({ workflowsPath })` bundles `temporal/workflows.ts` on the fly + * (swc) at startup — no separate build step. Under ESM, `workflowsPath` is + * resolved with `fileURLToPath(new URL(...))`. The activities run here in full + * Node and drive the role services over ConnectRPC (`temporal/clients.ts`). + * + * @module worker + */ + +import { fileURLToPath } from "node:url"; +import { NativeConnection, Worker } from "@temporalio/worker"; +import * as activities from "#temporal/activities.ts"; +import { TEMPORAL_ADDRESS, TEMPORAL_NAMESPACE, TEMPORAL_TASK_QUEUE } from "#temporal/config.ts"; + +async function main(): Promise { + const connection = await NativeConnection.connect({ address: TEMPORAL_ADDRESS }); + try { + const worker = await Worker.create({ + connection, + namespace: TEMPORAL_NAMESPACE, + taskQueue: TEMPORAL_TASK_QUEUE, + // Bundled (swc) at startup — no build step; ESM-safe path resolution. + workflowsPath: fileURLToPath(new URL("./temporal/workflows.ts", import.meta.url)), + activities, + }); + + console.log(`hris temporal worker ready — taskQueue=${TEMPORAL_TASK_QUEUE} namespace=${TEMPORAL_NAMESPACE} temporal=${TEMPORAL_ADDRESS}`); + await worker.run(); + } finally { + await connection.close(); + } +} + +main().catch((err: unknown) => { + console.error("hris temporal worker error:", err); + process.exitCode = 1; +}); diff --git a/hris/tests/activity/activities.test.ts b/hris/tests/activity/activities.test.ts new file mode 100644 index 0000000..bde4b23 --- /dev/null +++ b/hris/tests/activity/activities.test.ts @@ -0,0 +1,179 @@ +/** + * Activity ↔ RPC wiring + compensation idempotency tests — DOCKERLESS. + * + * Runs the REAL activity bodies (via `MockActivityEnvironment`) against a REAL + * in-process Connectum monolith (`buildServer({ port: 0 })`) — the same server + * the e2e uses, with a PGlite-backed DirectoryService and the in-memory + * Payroll/TimeOff/Access services. No Temporal cluster, no Docker. This proves: + * + * - each forward activity calls its RPC and mutates the real service state + * (a new directory row, a payroll balance, a PTO grant, a provisioned + * account, and finally the active status); + * - createEmployee on an already-taken id throws a NON-RETRYABLE + * `ApplicationFailure(EmployeeExists)` — the workflow's fail-fast contract; + * - the compensating activities are IDEMPOTENT — running offboard / teardown / + * revoke twice (or on already-undone state) is a no-op success. + * + * The activities read endpoints from `*_ADDR`; this test points all four at the + * one in-process monolith before any activity builds its (lazily cached) client. + * + * @module tests/activity/activities + */ + +import assert from "node:assert/strict"; +import { after, before, beforeEach, describe, it } from "node:test"; +import { create } from "@bufbuild/protobuf"; +import type { Server } from "@connectum/core"; +import { MemoryAdapter } from "@connectum/events"; +import { ApplicationFailure } from "@temporalio/activity"; +import { MockActivityEnvironment } from "@temporalio/testing"; +import { buildEventBus } from "#eventBus.ts"; +import { GetEmployeeRequestSchema, DirectoryService } from "#gen/directory/v1/directory_pb.ts"; +import { GetBalanceRequestSchema, PayrollService } from "#gen/payroll/v1/payroll_pb.ts"; +import { buildServer } from "#server.ts"; +import { accessCount, isProvisioned, resetAccess } from "#services/accessService.ts"; +import { resetBalances } from "#services/payrollService.ts"; +import { resetGrants, timeOffGrant } from "#services/timeOffService.ts"; +import * as activities from "#temporal/activities.ts"; +import type { NewHire } from "#temporal/activities.ts"; +import { resolveTopology } from "#topology.ts"; +import { makeTestDb, reseed } from "../helpers/db.ts"; + +const env = new MockActivityEnvironment(); + +/** Run a real activity body inside a mocked Temporal Activity Context. */ +function run(fn: (...args: A) => Promise, ...args: A): Promise { + return env.run(fn, ...args); +} + +/** A new hire whose id is FREE in the seed (e-001..e-007 are taken). */ +const HIRE: NewHire = { + employeeId: "e-200", + name: "New Hire", + email: "newhire@example.com", + title: "Software Engineer", + department: "Engineering", + managerId: "e-002", +}; + +describe("Activities: real RPC wiring + compensation idempotency (in-process monolith, PGlite)", () => { + let server: Server; + let db: Awaited>; + + before(async () => { + const topology = resolveTopology("*"); + db = await makeTestDb(); + // In-memory bus so the monolith starts without a NATS broker (the saga + // activities never publish; payroll still mounts its subscriber route). + const eventBus = buildEventBus({ localTypeNames: topology.localTypeNames, adapter: MemoryAdapter() }); + // Mount onboarding without a Temporal client (`null`): only the saga's + // role-service activities are exercised here, never the workflow client. + server = buildServer({ port: 0, topology, db, eventBus, workflowClient: null }); + await server.start(); + const port = server.address?.port ?? 0; + const addr = `http://localhost:${port}`; + // Point every activity client at the one in-process monolith. Set BEFORE + // the first activity call, since activities cache their clients lazily. + process.env.DIRECTORY_ADDR = addr; + process.env.PAYROLL_ADDR = addr; + process.env.TIMEOFF_ADDR = addr; + process.env.ACCESS_ADDR = addr; + }); + + beforeEach(async () => { + await reseed(db); + resetBalances(); + resetGrants(); + resetAccess(); + }); + + after(async () => { + if (server.state === "running") await server.stop(); + }); + + it("forward steps create the employee, payroll, PTO grant, access, then activate", async () => { + const directory = server.localClient(DirectoryService); + const payroll = server.localClient(PayrollService); + + // Step 1 — directory row created in "onboarding" status. + await run(activities.createEmployee, HIRE); + const created = await directory.getEmployee(create(GetEmployeeRequestSchema, { id: HIRE.employeeId })); + assert.equal(created.employee?.status, "onboarding"); + assert.equal(created.employee?.name, "New Hire"); + + // Step 2 — payroll enrollment (initial balance). + await run(activities.setupPayroll, { employeeId: HIRE.employeeId }); + const balance = await payroll.getBalance(create(GetBalanceRequestSchema, { employeeId: HIRE.employeeId })); + assert.equal(balance.balance?.remainingDays, 25); + + // Step 3 — PTO policy grant recorded. + await run(activities.grantTimeOff, { employeeId: HIRE.employeeId }); + assert.equal(timeOffGrant(HIRE.employeeId), 25); + + // Step 4 — system access provisioned. + await run(activities.provisionAccess, { employeeId: HIRE.employeeId, email: HIRE.email }); + assert.equal(isProvisioned(HIRE.employeeId), true); + + // Step 5 — terminal activation flips status to "active". + await run(activities.activateEmployee, { employeeId: HIRE.employeeId }); + const active = await directory.getEmployee(create(GetEmployeeRequestSchema, { id: HIRE.employeeId })); + assert.equal(active.employee?.status, "active"); + }); + + it("createEmployee on an already-taken id throws a NON-RETRYABLE EmployeeExists ApplicationFailure", async () => { + // e-001 (Ada Lovelace) is in the seed → CreateEmployee is ALREADY_EXISTS, + // which the activity rethrows as a non-retryable ApplicationFailure whose + // `type` is exactly the value the workflow lists in nonRetryableErrorTypes. + await assert.rejects( + run(activities.createEmployee, { ...HIRE, employeeId: "e-001" }), + (err: unknown) => err instanceof ApplicationFailure && err.type === "EmployeeExists" && err.nonRetryable === true && /already exists/i.test(err.message), + ); + }); + + it("offboardEmployee is idempotent: marking offboarded twice (and an unknown id) is a no-op success", async () => { + const directory = server.localClient(DirectoryService); + await run(activities.createEmployee, HIRE); + + await run(activities.offboardEmployee, { employeeId: HIRE.employeeId }); + const offboarded = await directory.getEmployee(create(GetEmployeeRequestSchema, { id: HIRE.employeeId })); + assert.equal(offboarded.employee?.status, "offboarded"); + + // Offboard again — already offboarded. + await run(activities.offboardEmployee, { employeeId: HIRE.employeeId }); + // Offboard an id that never existed — still a no-op success. + await run(activities.offboardEmployee, { employeeId: "e-ghost" }); + }); + + it("teardownPayroll is idempotent: tearing down twice (and an unknown employee) is a no-op success", async () => { + await run(activities.setupPayroll, { employeeId: HIRE.employeeId }); + await run(activities.teardownPayroll, { employeeId: HIRE.employeeId }); + // Tear down again — already removed. + await run(activities.teardownPayroll, { employeeId: HIRE.employeeId }); + // Tear down an employee that was never enrolled — still a no-op success. + await run(activities.teardownPayroll, { employeeId: "e-ghost" }); + }); + + it("revokeTimeOff is idempotent: revoking twice (and an unknown employee) is a no-op success", async () => { + await run(activities.grantTimeOff, { employeeId: HIRE.employeeId }); + assert.equal(timeOffGrant(HIRE.employeeId), 25); + + await run(activities.revokeTimeOff, { employeeId: HIRE.employeeId }); + assert.equal(timeOffGrant(HIRE.employeeId), undefined); + // Revoke again — already revoked. + await run(activities.revokeTimeOff, { employeeId: HIRE.employeeId }); + // Revoke an unknown employee — still a no-op success. + await run(activities.revokeTimeOff, { employeeId: "e-ghost" }); + }); + + it("revokeAccess is idempotent: revoking twice (and an unknown employee) is a no-op success", async () => { + await run(activities.provisionAccess, { employeeId: HIRE.employeeId, email: HIRE.email }); + assert.equal(accessCount(), 1); + + await run(activities.revokeAccess, { employeeId: HIRE.employeeId }); + assert.equal(isProvisioned(HIRE.employeeId), false); + // Revoke again — already revoked. + await run(activities.revokeAccess, { employeeId: HIRE.employeeId }); + // Revoke an unknown employee — still a no-op success. + await run(activities.revokeAccess, { employeeId: "e-ghost" }); + }); +}); diff --git a/hris/tests/e2e/onboarding.test.ts b/hris/tests/e2e/onboarding.test.ts new file mode 100644 index 0000000..55de6e0 --- /dev/null +++ b/hris/tests/e2e/onboarding.test.ts @@ -0,0 +1,144 @@ +/** + * Onboarding edge (OnboardingService) e2e — DOCKERLESS, no live Temporal. + * + * Exercises the gateway's SYNCHRONOUS pre-check and start contract without a + * Temporal cluster, using an injected STUB `OnboardingWorkflowClient`: + * + * - a FREE id passes the inverted pre-check (directory NotFound) and STARTS the + * saga: the response is STARTED, the workflow id equals the employee id, and + * the stub's `start` was invoked once; + * - an ALREADY-TAKEN id is rejected with `Code.AlreadyExists` BEFORE any + * Temporal use — the stub's `start` is never called; + * - GetOnboarding reads the live status via the stub's query; + * - with NO workflow client, a free id still passes the pre-check but then + * raises `Code.Unavailable` ("Temporal is not configured") — proving the + * pre-check runs first and the error path needs no live Temporal. + * + * The existing LeaveApproved flow (tests/e2e/e2e.test.ts) is untouched. + * + * @module tests/e2e/onboarding + */ + +import assert from "node:assert/strict"; +import { after, before, describe, it } from "node:test"; +import { create } from "@bufbuild/protobuf"; +import { Code, ConnectError } from "@connectrpc/connect"; +import type { Server } from "@connectum/core"; +import { MemoryAdapter } from "@connectum/events"; +import { buildEventBus } from "#eventBus.ts"; +import { GetOnboardingRequestSchema, OnboardEmployeeRequestSchema, OnboardingService } from "#gen/onboarding/v1/onboarding_pb.ts"; +import { buildServer } from "#server.ts"; +import type { OnboardingWorkflowClient } from "#services/onboardingService.ts"; +import { resolveTopology } from "#topology.ts"; +import { makeTestDb } from "../helpers/db.ts"; + +/** A stub workflow client recording started ids and serving a live STARTED query. */ +function makeStubClient(started: string[]): OnboardingWorkflowClient { + return { + async start(_type, opts) { + started.push(opts.workflowId); + return { workflowId: opts.workflowId }; + }, + getHandle() { + return { + async query() { + return "STARTED" as unknown as Ret; + }, + async describe() { + return { status: { name: "RUNNING" } }; + }, + }; + }, + }; +} + +describe("E2E: onboarding edge (pre-check + start, stub Temporal)", () => { + let server: Server; + const started: string[] = []; + + before(async () => { + const topology = resolveTopology("*"); + const eventBus = buildEventBus({ localTypeNames: topology.localTypeNames, adapter: MemoryAdapter() }); + const db = await makeTestDb(); + server = buildServer({ port: 0, topology, eventBus, db, workflowClient: makeStubClient(started) }); + await server.start(); + }); + + after(async () => { + if (server.state === "running") await server.stop(); + }); + + it("OnboardEmployee on a FREE id passes the pre-check and STARTS the saga", async () => { + const onboarding = server.localClient(OnboardingService); + const before = started.length; + + const res = await onboarding.onboardEmployee( + create(OnboardEmployeeRequestSchema, { + employeeId: "e-300", + name: "New Hire", + email: "newhire@example.com", + title: "Software Engineer", + department: "Engineering", + managerId: "e-002", + }), + ); + + assert.equal(res.onboarding?.status, "STARTED"); + assert.equal(res.workflowId, "e-300"); + assert.equal(started.length, before + 1); + assert.equal(started.at(-1), "e-300"); + }); + + it("OnboardEmployee on an ALREADY-TAKEN id is rejected with AlreadyExists, never touching Temporal", async () => { + const onboarding = server.localClient(OnboardingService); + const before = started.length; + + await assert.rejects( + // e-001 (Ada Lovelace) is in the seed. + onboarding.onboardEmployee(create(OnboardEmployeeRequestSchema, { employeeId: "e-001", name: "Dup", email: "dup@example.com", title: "x", department: "y", managerId: "" })), + (err: unknown) => err instanceof ConnectError && err.code === Code.AlreadyExists, + ); + + // The pre-check rejected before the workflow start — stub untouched. + assert.equal(started.length, before); + }); + + it("GetOnboarding reads the live status via the workflow query", async () => { + const onboarding = server.localClient(OnboardingService); + const res = await onboarding.getOnboarding(create(GetOnboardingRequestSchema, { employeeId: "e-300" })); + assert.equal(res.onboarding?.status, "STARTED"); + }); +}); + +describe("E2E: onboarding edge with NO Temporal client (pre-check still runs)", () => { + let server: Server; + + before(async () => { + const topology = resolveTopology("*"); + const eventBus = buildEventBus({ localTypeNames: topology.localTypeNames, adapter: MemoryAdapter() }); + const db = await makeTestDb(); + // workflowClient: null → force the "Temporal not configured" path. + server = buildServer({ port: 0, topology, eventBus, db, workflowClient: null }); + await server.start(); + }); + + after(async () => { + if (server.state === "running") await server.stop(); + }); + + it("a FREE id passes the pre-check but then raises Unavailable (no Temporal)", async () => { + const onboarding = server.localClient(OnboardingService); + await assert.rejects( + onboarding.onboardEmployee(create(OnboardEmployeeRequestSchema, { employeeId: "e-301", name: "x", email: "x@example.com", title: "x", department: "y", managerId: "" })), + (err: unknown) => err instanceof ConnectError && err.code === Code.Unavailable, + ); + }); + + it("an ALREADY-TAKEN id is still rejected with AlreadyExists (pre-check independent of Temporal)", async () => { + const onboarding = server.localClient(OnboardingService); + await assert.rejects( + onboarding.onboardEmployee(create(OnboardEmployeeRequestSchema, { employeeId: "e-002", name: "x", email: "x@example.com", title: "x", department: "y", managerId: "" })), + (err: unknown) => err instanceof ConnectError && err.code === Code.AlreadyExists, + ); + }); +}); diff --git a/hris/tests/workflow/onboardingWorkflow.test.ts b/hris/tests/workflow/onboardingWorkflow.test.ts new file mode 100644 index 0000000..96b184a --- /dev/null +++ b/hris/tests/workflow/onboardingWorkflow.test.ts @@ -0,0 +1,167 @@ +/** + * OnboardingWorkflow orchestration + compensation tests — DOCKERLESS. + * + * Uses Temporal's time-skipping test environment (an EMBEDDED test server — no + * Docker; the server binary is downloaded + cached on first run only) with a + * worker whose ACTIVITIES ARE MOCKED (plain JS functions that record their call + * order). The WORKFLOW is the real `OnboardingWorkflow`, so this asserts the + * saga's orchestration without any Connectum server or Temporal cluster: + * + * - success: forward order is createEmployee → setupPayroll → grantTimeOff → + * provisionAccess → activateEmployee. + * - activateEmployee fails: the recorded tail unwinds in LIFO order — + * revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee. + * - provisionAccess fails: only the 3→1 compensations run (provisionAccess + * pushed nothing before failing) — revokeTimeOff → teardownPayroll → + * offboardEmployee. + * - setupPayroll fails: only offboardEmployee runs. + * - createEmployee fails (non-retryable): fails fast with NO compensation. + * + * Failures are forced by making the MOCK throw `ApplicationFailure.nonRetryable`, + * so the production retry policy stays realistic (this test does not depend on + * `maximumAttempts`). + * + * @module tests/workflow/onboardingWorkflow + */ + +import assert from "node:assert/strict"; +import { after, before, describe, it } from "node:test"; +import { fileURLToPath } from "node:url"; +import { ApplicationFailure } from "@temporalio/activity"; +import { TestWorkflowEnvironment } from "@temporalio/testing"; +import { Worker } from "@temporalio/worker"; +import * as realActivities from "#temporal/activities.ts"; +import type { OnboardingWorkflowInput } from "#temporal/workflows.ts"; +import { OnboardingWorkflow } from "#temporal/workflows.ts"; + +/** The workflow bundle source — the same `.ts` the production worker bundles. */ +const WORKFLOWS_PATH = fileURLToPath(new URL("../../src/temporal/workflows.ts", import.meta.url)); + +const TASK_QUEUE = "test-onboarding-saga"; +const INPUT: OnboardingWorkflowInput = { + employeeId: "e-100", + name: "New Hire", + email: "newhire@example.com", + title: "Software Engineer", + department: "Engineering", + managerId: "e-002", +}; + +/** Build a mock activity set that records call order into `calls`. */ +function makeMockActivities(calls: string[], failing?: { step: string }): Record Promise> { + const record = (name: string, result: unknown = undefined) => { + calls.push(name); + if (failing?.step === name) { + // Non-retryable so the run fails immediately without depending on + // the production retry budget. + throw ApplicationFailure.create({ message: `mock ${name} failed`, type: "MockFailure", nonRetryable: true }); + } + return result; + }; + return { + createEmployee: async () => record("createEmployee"), + offboardEmployee: async () => record("offboardEmployee"), + setupPayroll: async () => record("setupPayroll"), + teardownPayroll: async () => record("teardownPayroll"), + grantTimeOff: async () => record("grantTimeOff"), + revokeTimeOff: async () => record("revokeTimeOff"), + provisionAccess: async () => record("provisionAccess"), + revokeAccess: async () => record("revokeAccess"), + activateEmployee: async () => record("activateEmployee"), + }; +} + +describe("OnboardingWorkflow: orchestration + compensation (time-skipping, mocked activities)", () => { + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createTimeSkipping(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + /** Run the workflow once with a worker whose activities are `activities`. */ + async function runWorkflow(activities: Record Promise>, workflowId: string): Promise { + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: TASK_QUEUE, + workflowsPath: WORKFLOWS_PATH, + activities, + }); + return worker.runUntil(testEnv.client.workflow.execute(OnboardingWorkflow, { args: [INPUT], taskQueue: TASK_QUEUE, workflowId })); + } + + it("success: runs the forward steps in order and COMPLETES", async () => { + const calls: string[] = []; + const result = await runWorkflow(makeMockActivities(calls), "wf-success"); + + assert.equal(result, "COMPLETED"); + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "activateEmployee"]); + }); + + it("activateEmployee fails: compensations run in REVERSE order (revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee)", async () => { + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "activateEmployee" }), "wf-activate-fail")); + + // Forward path up to and including the failing activate, then the LIFO + // unwind of every pushed compensation. + assert.deepEqual(calls, [ + "createEmployee", + "setupPayroll", + "grantTimeOff", + "provisionAccess", + "activateEmployee", // throws + "revokeAccess", + "revokeTimeOff", + "teardownPayroll", + "offboardEmployee", + ]); + }); + + it("provisionAccess fails: the 3→1 compensations run (revokeTimeOff → teardownPayroll → offboardEmployee)", async () => { + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "provisionAccess" }), "wf-provision-fail")); + + // provisionAccess pushed NO compensation before failing, so the unwind + // starts from step 3's revokeTimeOff. + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "revokeTimeOff", "teardownPayroll", "offboardEmployee"]); + }); + + it("setupPayroll fails: only offboardEmployee compensates (step 1's undo)", async () => { + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "setupPayroll" }), "wf-setup-fail")); + + // Only createEmployee pushed a compensation before setupPayroll failed. + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "offboardEmployee"]); + }); + + it("createEmployee fails (non-retryable): fails fast with NO compensation", async () => { + const calls: string[] = []; + await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "createEmployee" }), "wf-create-fail")); + + // Nothing was pushed before the failing first step, so nothing unwinds. + assert.deepEqual(calls, ["createEmployee"]); + }); + + it("the REAL activities module + real workflowsPath register on a Worker (production startup path)", async () => { + // The other tests pass hand-built mock activities; this is the ONLY test + // that exercises the exact registration `src/worker.ts` does — the real + // activities namespace (its lazy ConnectRPC clients are not built by + // Worker.create, which only registers, never invokes) plus the real + // workflow bundle. Catches a non-function export or a bundle break that + // would otherwise only surface at `node src/worker.ts` startup. + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: "test-real-onboarding-activities", + workflowsPath: WORKFLOWS_PATH, + activities: realActivities, + }); + assert.ok(worker); + // Start and immediately stop so the worker's poll/heartbeat loop is + // drained and shut down — a created-but-never-run worker leaks a + // background loop and the test process never exits. + await worker.runUntil(Promise.resolve()); + }); +}); From dfe46a6ef3b057d4bae4f6482eee704dd03ee7c5 Mon Sep 17 00:00:00 2001 From: intech Date: Sun, 21 Jun 2026 20:33:10 +0400 Subject: [PATCH 2/6] docs(hris): document the onboarding saga + add the `saga` compose profile - README: a "three orthogonal mechanisms" framing (ctx.call / EventBus / Temporal saga), a dedicated onboarding-saga section (inverted pre-check, LIFO compensation diagram, the separate worker process, run instructions), and the updated layout / testing / key-points. - docker-compose: a `saga` profile adding Temporal (server + Web UI), the OnboardingService gateway (:5005), the access role (:5004), and the worker (`node src/worker.ts`). Pairs with `split`. - Dockerfile: copy pnpm-workspace.yaml so the @swc/core build approval applies in the image (the worker bundles workflows with swc); drop the lockfile requirement (mirrors car-sharing); expose the access/onboarding ports. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/Dockerfile | 14 ++-- hris/README.md | 151 +++++++++++++++++++++++++++++++----- hris/docker-compose.yml | 164 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 299 insertions(+), 30 deletions(-) diff --git a/hris/Dockerfile b/hris/Dockerfile index 9eee92e..ab97447 100644 --- a/hris/Dockerfile +++ b/hris/Dockerfile @@ -1,10 +1,14 @@ -# One image, every role. The role is chosen at runtime by the SERVICES env -# (see docker-compose.yml); the entrypoint is always src/index.ts. +# One image, every role. The RPC role is chosen at runtime by the SERVICES env +# (see docker-compose.yml); the default entrypoint is src/index.ts. The Temporal +# worker reuses this same image with `command: ["node", "src/worker.ts"]`. FROM node:25-slim AS deps RUN npm install -g pnpm WORKDIR /app -COPY package.json pnpm-lock.yaml ./ -RUN pnpm install --frozen-lockfile --prod +# pnpm-workspace.yaml carries the allowBuilds approvals (@swc/core compiles the +# Temporal workflow bundle in the worker; protobufjs is a Temporal gRPC dep), so +# it MUST be present for `pnpm install` to run those build scripts. +COPY package.json pnpm-workspace.yaml ./ +RUN pnpm install --prod FROM node:25-slim AS runtime RUN apt-get update && apt-get install -y --no-install-recommends wget && rm -rf /var/lib/apt/lists/* @@ -14,7 +18,7 @@ COPY package.json ./ COPY src/ ./src/ COPY gen/ ./gen/ ENV NODE_ENV=production -EXPOSE 5000 5001 5002 5003 +EXPOSE 5000 5001 5002 5003 5004 5005 HEALTHCHECK --interval=10s --timeout=3s --start-period=15s --retries=3 \ CMD wget -q --spider http://localhost:${PORT:-5000}/healthz || exit 1 CMD ["node", "src/index.ts"] diff --git a/hris/README.md b/hris/README.md index 8f7ecd3..386d431 100644 --- a/hris/README.md +++ b/hris/README.md @@ -3,30 +3,49 @@ A shallow **HR Information System** that showcases the hard cases Connectum solves out of the box: **the same handler code runs as a monolith (in-process) or as microservices (split processes) purely by env**, plus an event-driven -payroll flow. +payroll flow and a **durable Temporal saga** for onboarding. -Three services and one integration event: +It demonstrates **three orthogonal cross-service mechanisms** side by side, each +the right tool for a different job: + +| Mechanism | Used for | In this example | +|---|---|---| +| **`ctx.call`** (synchronous) | a reply you need now | `RequestLeave` validates the employee against the directory | +| **EventBus** (fire-and-forget) | broadcast a fact | `LeaveApproved` → payroll decrements the balance | +| **Temporal saga** (durable) | a long multi-step transaction with rollback | onboarding a new hire across four services | + +Five services, one integration event, and a durable saga: - **`directory.v1.DirectoryService`** — `GetEmployee(id)` → `Employee` and `ListEmployees(filter)` → stream of `Employee` (the employee **system of record**, backed by Drizzle ORM + Postgres; `Code.NotFound` for an unknown id). - A leaf service — see [Persistence](#persistence-directoryservice--drizzle--postgres). + Also hosts the saga's `CreateEmployee` / `ActivateEmployee` / `OffboardEmployee`. + See [Persistence](#persistence-directoryservice--drizzle--postgres). - **`timeoff.v1.TimeOffService`** — `RequestLeave(employeeId, days)`. Its handler validates the employee with `ctx.call("directory.v1.DirectoryService/GetEmployee", …)`, then approves and - **publishes** a `LeaveApproved` event. + **publishes** a `LeaveApproved` event. Also hosts the saga's `GrantTimeOff` / + `RevokeTimeOff`. - **`payroll.v1.PayrollService`** — `GetBalance(employeeId)` → `Balance`. - **Subscribes** to `LeaveApproved` and decrements the balance. + **Subscribes** to `LeaveApproved` and decrements the balance. Also hosts the + saga's `SetupPayroll` / `TeardownPayroll`. +- **`access.v1.AccessService`** — `ProvisionAccess` / `RevokeAccess` (the saga's + IT-provisioning leaf; an in-memory account ledger). +- **`onboarding.v1.OnboardingService`** — `OnboardEmployee` / `GetOnboarding` + (the saga **gateway**: a synchronous pre-check, then it starts the durable + `OnboardingWorkflow`). See [The onboarding saga](#the-onboarding-saga-temporal). The product is deliberately thin — the point is the framework wiring. > **Note:** this example uses the service-catalog API (`defineService`, > `ctx.call`, `catalog`) and the EventBus, shipped in **1.0.0** (published on -> npm). +> npm). The onboarding saga uses [Temporal](https://temporal.io) (an external +> dependency) — no Connectum API beyond 1.0.0, so it runs on the published +> packages. ## The headline: one codebase, two topologies -`src/server.ts` exposes a single `buildServer()`. It always passes the same three +`src/server.ts` exposes a single `buildServer()`. It always passes the same five service definitions and the same generated `serviceCatalog`. Only `src/topology.ts` reads env and tells the framework what is local vs remote: @@ -68,26 +87,97 @@ NATS broker when started via `pnpm start` / Docker. Only the e2e test swaps in a in-memory adapter so the full flow runs broker-free (see [Testing note](#testing-note)). +## The onboarding saga (Temporal) + +Onboarding a new hire is a **long, multi-step transaction**: create the directory +record, enrol them in payroll, grant their PTO policy, and provision system +access. If any step fails, the ones that already succeeded must be **undone** — +the classic *saga* with compensations. `ctx.call` and the EventBus are the wrong +tools (no durability, no rollback); this is where a durable workflow engine earns +its place. Connectum stays thin: the framework serves the RPCs; [Temporal](https://temporal.io) +owns the durability. + +`OnboardingService.OnboardEmployee` is the **gateway**. It runs a **synchronous +pre-check** — `ctx.call` to the directory, where an *existing* id is rejected +with `Code.AlreadyExists` (the check is inverted from a "must exist" lookup: the +new id must be **free**). Only after the pre-check passes does it **start** the +durable `OnboardingWorkflow` and return immediately with `STARTED`. Because the +pre-check runs first, the error path needs **no live Temporal**. + +The workflow runs the forward steps and pushes each step's compensation onto a +stack; on any failure it unwinds the stack in **LIFO** order, then fails: + +``` +createEmployee ──▶ setupPayroll ──▶ grantTimeOff ──▶ provisionAccess ──▶ activateEmployee ✓ COMPLETED + │ offboard │ teardown │ revokeTimeOff │ revokeAccess (terminal — no undo) + └────────────── on failure, compensations run in reverse ◀──────────────┘ ✗ FAILED +``` + +- **`createEmployee`** is the saga's first step; a duplicate id is a **business** + failure surfaced as `Code.AlreadyExists` (via an atomic + `insert … on conflict do nothing returning`, never a raw DB error). The activity + rethrows it as a **non-retryable** `ApplicationFailure` so the workflow fails + fast with nothing to compensate. +- Every other step keeps Temporal's **default retry policy** — the durability the + saga demonstrates. The compensations are **idempotent**, so an unwind after a + partially-applied step is safe. +- **`activateEmployee`** (onboarding → active) is terminal: a success is final, so + it pushes no compensation. + +### The worker — a separate process + +The durable code runs in a dedicated **worker** (`src/worker.ts`), the **only** +process that imports `@temporalio/worker` (the native core-bridge + the swc +workflow bundler). The RPC roles import only the pure-JS `@temporalio/client`, so +they keep their **no-build, native-TS** run model — `node src/index.ts`, no +compile step. The worker's activities drive the role services over ConnectRPC +(`*_ADDR`), exactly like any other cross-pod call. + +### Run the saga + +The `saga` compose profile adds Temporal, the onboarding gateway, and the worker; +it pairs with `split` (the worker targets the per-role services): + +```bash +docker compose --profile split --profile saga up +# Onboard a new hire (the gateway is on :5005): +grpcurl -plaintext -d '{"employee_id":"e-100","name":"New Hire","email":"newhire@example.com","title":"Engineer","department":"Engineering","manager_id":"e-002"}' \ + localhost:5005 onboarding.v1.OnboardingService/OnboardEmployee +# Watch the workflow + its compensations in the Temporal Web UI: +open http://localhost:8088 +``` + ## Layout ``` proto/ - directory/v1/directory.proto # DirectoryService.GetEmployee + ListEmployees (stream) - timeoff/v1/timeoff.proto # TimeOffService.RequestLeave - payroll/v1/payroll.proto # PayrollService.GetBalance + LeaveApproved + PayrollEventHandlers + directory/v1/directory.proto # GetEmployee + ListEmployees + Create/Activate/Offboard (saga) + timeoff/v1/timeoff.proto # RequestLeave + GrantTimeOff/RevokeTimeOff (saga) + payroll/v1/payroll.proto # GetBalance + LeaveApproved + SetupPayroll/TeardownPayroll (saga) + access/v1/access.proto # AccessService.ProvisionAccess/RevokeAccess (saga leaf) + onboarding/v1/onboarding.proto # OnboardingService.OnboardEmployee/GetOnboarding (saga gateway) connectum/events/v1/options.proto # (connectum.events.v1.event).topic option buf.gen.yaml # protoc-gen-es + protoc-gen-connectum-catalog (strategy: all) src/ db/schema.ts # Drizzle: employees table + EmployeeStatus const db/client.ts # Db type + createDb() (DATABASE_URL, postgres.js) db/seed.ts # SEED_EMPLOYEES (org chart) + seedEmployees() - services/directoryService.ts # createDirectoryService(db) — Drizzle-backed leaf service - services/timeOffService.ts # ctx.call validation + publishes LeaveApproved - services/payrollService.ts # GetBalance + LeaveApproved subscriber route + services/directoryService.ts # createDirectoryService(db) — leaf + saga create/activate/offboard + services/timeOffService.ts # ctx.call validation + publishes LeaveApproved + saga grant/revoke + services/payrollService.ts # GetBalance + LeaveApproved subscriber + saga setup/teardown + services/accessService.ts # AccessService — saga IT-provisioning leaf (in-memory) + services/onboardingService.ts # createOnboardingService(workflowClient) — saga gateway + temporal/onboardingStatus.ts # OnboardingStatus const (side-effect-free, sandbox-safe) + temporal/config.ts # TEMPORAL_ADDRESS / NAMESPACE / TASK_QUEUE (env) + temporal/workflowClient.ts # lazy @temporalio/client WorkflowClient (gateway side) + temporal/clients.ts # ConnectRPC clients the activities drive (*_ADDR) + temporal/activities.ts # saga side effects (each one RPC) + compensations + temporal/workflows.ts # OnboardingWorkflow — the durable saga (deterministic sandbox) + worker.ts # @temporalio/worker host (the ONLY native-addon process) topology.ts # env → enabledServices + remoteResolver events.ts # LEAVE_APPROVED_TOPIC constant (publisher/subscriber match) eventBus.ts # one bus per process; payroll subscribes only when local - server.ts # buildServer() — same code, both topologies + db DI + server.ts # buildServer() — same code, both topologies + db + Temporal DI index.ts # env-driven entry point drizzle/ # generated SQL migrations (single source of truth) drizzle.config.ts # drizzle-kit config (schema → migrations / push) @@ -96,8 +186,11 @@ tests/ helpers/db.ts # PGlite test db (migrate + seed), injected via DI e2e/e2e.test.ts # monolith e2e — in-process, no broker (PGlite db) e2e/directory.test.ts # DirectoryService persistence e2e (real gRPC client) -docker-compose.yml # mono + split profiles + NATS + Postgres (config only) -Dockerfile # one image, role chosen by SERVICES env + e2e/onboarding.test.ts # onboarding edge — pre-check + start (stub Temporal) + activity/activities.test.ts # real activities ↔ RPC wiring + compensation idempotency + workflow/onboardingWorkflow.test.ts# saga orchestration + LIFO compensation (time-skipping) +docker-compose.yml # mono + split + saga profiles (NATS + Postgres + Temporal) +Dockerfile # one image, role chosen by SERVICES env (worker = node src/worker.ts) ``` ## The generated catalog @@ -261,7 +354,7 @@ balance by the time `RequestLeave` resolves, so the assertion needs no polling. The cross-service validation path (`ctx.call` → `Code.NotFound` for an unknown employee) needs no broker at all. -Two test files share the same PGlite setup (`tests/helpers/db.ts`, which migrates +The e2e files share the same PGlite setup (`tests/helpers/db.ts`, which migrates + seeds the directory): - `tests/e2e/e2e.test.ts` — the monolith flow (gateway `ctx.call` validation + @@ -270,9 +363,23 @@ Two test files share the same PGlite setup (`tests/helpers/db.ts`, which migrate over a **real gRPC client**: `GetEmployee` (incl. `NOT_FOUND`), streaming `ListEmployees` with the `department` and org-chart `manager_id` filters, and cursor pagination. +- `tests/e2e/onboarding.test.ts` — the onboarding gateway: a free id passes the + inverted pre-check and starts the saga (stub Temporal client); an already-taken + id is rejected with `AlreadyExists` **before** Temporal; with no client the + pre-check still runs and a free id then raises `Unavailable`. + +The **saga itself** is verified without Docker or a Temporal cluster: + +- `tests/workflow/onboardingWorkflow.test.ts` — the real `OnboardingWorkflow` + with **mocked activities** under Temporal's **time-skipping** test environment: + the forward order, and the **LIFO compensation** unwind on each failing step + (incl. the non-retryable first step that compensates nothing). +- `tests/activity/activities.test.ts` — the **real activity bodies** against an + in-process Connectum monolith (PGlite): each step mutates real service state, + the duplicate-id failure is non-retryable, and every compensation is idempotent. -The split topology shares this exact handler code and is exercised via -`docker-compose.yml`. +The split topology and the saga share this exact handler code and are exercised +via `docker-compose.yml` (the `split` and `saga` profiles). ## Key points @@ -290,6 +397,12 @@ The split topology shares this exact handler code and is exercised via `DATABASE_URL`; tests inject PGlite so the persistence e2e runs without Docker. The committed `drizzle/` migrations are the single source of truth both paths apply. +- **Durable saga via Temporal** — onboarding is a multi-step transaction with + automatic LIFO compensation, run in a dedicated **worker** process; the RPC + roles stay no-build (only the worker loads the native `@temporalio/worker`). + The framework serves the RPCs; Temporal owns the durability. Three mechanisms — + `ctx.call`, EventBus, Temporal saga — coexist in one codebase, each fit to its + job. - **`enabledServices: undefined` vs `[]`** — monolith must pass `undefined` (mount everything); an empty array would mount nothing. `topology.ts` handles the unset/`*` → `undefined` mapping. diff --git a/hris/docker-compose.yml b/hris/docker-compose.yml index 583044e..c3c8549 100644 --- a/hris/docker-compose.yml +++ b/hris/docker-compose.yml @@ -3,13 +3,17 @@ # the monolith in-process with an in-memory bus and a PGlite Postgres; see # tests/e2e). # -# docker compose --profile mono up # all services in one process -# docker compose --profile split up # directory + timeoff + payroll split +# docker compose --profile mono up # all services in one process +# docker compose --profile split up # directory + timeoff + payroll + access split +# docker compose --profile split --profile saga up # + Temporal, the onboarding gateway, and the worker # -# Both profiles share the same `nats` broker and `postgres` database. The split -# profile shows the headline: identical handler code, three processes, ctx.call -# auto-routing across the network and LeaveApproved flowing over NATS to the -# payroll role. +# All profiles share the same `nats` broker and `postgres` database. The split +# profile shows the headline: identical handler code, separate processes, +# ctx.call auto-routing across the network and LeaveApproved flowing over NATS to +# the payroll role. The `saga` profile adds the durable onboarding saga — a +# Temporal server + Web UI (http://localhost:8088), the OnboardingService gateway +# (:5005), and the worker that hosts OnboardingWorkflow and drives the role +# services. It pairs with `split` (the worker targets the per-role services). # # Persistence (Phase 1): DirectoryService is backed by Drizzle ORM over Postgres. # Every app role wires DATABASE_URL (postgres.js connects lazily, so the timeoff @@ -168,9 +172,157 @@ services: - hris profiles: ["split"] + # ── ACCESS role — the saga's IT-provisioning leaf (in-memory) ───────────── + access: + build: . + command: ["node", "src/index.ts"] + ports: + - "5004:5004" + environment: + - PORT=5004 + - NATS_URL=nats://nats:4222 + - DATABASE_URL=postgresql://hris:hris@postgres:5432/hris + - SERVICES=access.v1.AccessService + depends_on: + nats: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:5004/healthz"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 10s + networks: + - hris + profiles: ["split"] + + # ── ONBOARDING role — the saga gateway (pre-check + start workflow) ─────── + # + # Reaches the directory over the network for its pre-check (DIRECTORY_ADDR) and + # builds a lazy @temporalio/client to start OnboardingWorkflow (TEMPORAL_ADDRESS). + onboarding: + build: . + command: ["node", "src/index.ts"] + ports: + - "5005:5005" + environment: + - PORT=5005 + - NATS_URL=nats://nats:4222 + - DATABASE_URL=postgresql://hris:hris@postgres:5432/hris + - SERVICES=onboarding.v1.OnboardingService + - DIRECTORY_ADDR=http://directory:5001 + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_TASK_QUEUE=hris-onboarding + depends_on: + directory: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:5005/healthz"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 10s + networks: + - hris + profiles: ["saga"] + + # ── Temporal's own metadata store (SEPARATE from the app Postgres) ───────── + temporal-postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: temporal + POSTGRES_PASSWORD: temporal + POSTGRES_DB: temporal + volumes: + - temporal-pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U temporal -d temporal"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 5s + networks: + - hris + profiles: ["saga"] + + # ── Temporal server (auto-setup creates the `default` namespace) ─────────── + temporal: + image: temporalio/auto-setup:1.25.2 + depends_on: + temporal-postgres: + condition: service_healthy + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=temporal-postgres + ports: + - "7233:7233" + healthcheck: + # Wait until the frontend is serving and the default namespace exists. + test: ["CMD", "tctl", "--address", "temporal:7233", "namespace", "describe", "default"] + interval: 5s + timeout: 5s + retries: 30 + start_period: 10s + networks: + - hris + profiles: ["saga"] + + # ── Temporal Web UI — inspect onboarding workflows + their compensations ─── + temporal-ui: + image: temporalio/ui:2.34.0 + depends_on: + temporal: + condition: service_healthy + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:8088 + ports: + - "8088:8080" + networks: + - hris + profiles: ["saga"] + + # ── WORKER — hosts OnboardingWorkflow + the activities (node src/worker.ts) ─ + # + # A separate process type (NOT a SERVICES-selected RPC role): no inbound RPC; + # it polls Temporal for workflow/activity tasks and drives the role services + # over ConnectRPC. The ONLY process that loads the @temporalio/worker native + # addon. Pairs with the `split` profile (it targets the per-role services): + # + # docker compose --profile split --profile saga up + worker: + build: . + command: ["node", "src/worker.ts"] + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_NAMESPACE=default + - TEMPORAL_TASK_QUEUE=hris-onboarding + - DIRECTORY_ADDR=http://directory:5001 + - PAYROLL_ADDR=http://payroll:5003 + - TIMEOFF_ADDR=http://timeoff:5002 + - ACCESS_ADDR=http://access:5004 + depends_on: + temporal: + condition: service_healthy + directory: + condition: service_healthy + payroll: + condition: service_healthy + timeoff: + condition: service_healthy + access: + condition: service_healthy + networks: + - hris + profiles: ["saga"] + networks: hris: driver: bridge volumes: pgdata: + temporal-pgdata: From 0ab54b71a539f10191738433855f63f7a4a66b39 Mon Sep 17 00:00:00 2001 From: intech Date: Sun, 21 Jun 2026 20:37:48 +0400 Subject: [PATCH 3/6] docs(hris): sync docstrings + README with the five-service topology MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Doc-currency follow-up: update the server/topology docstrings ("three" → "five" services) and the README (resolver env vars, the generated-catalog example with the new saga RPCs, and the split-profile run commands incl. the access role). Comments/docs only — no logic change. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/README.md | 25 ++++++++++++++++++++----- hris/src/server.ts | 4 ++-- hris/src/topology.ts | 2 +- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/hris/README.md b/hris/README.md index 386d431..13badbc 100644 --- a/hris/README.md +++ b/hris/README.md @@ -54,8 +54,9 @@ reads env and tells the framework what is local vs remote: Set it to one service (`SERVICES=directory.v1.DirectoryService`) and the process becomes that single microservice role. - **`remoteResolver: perServiceEnvResolver(…)`** maps each non-local service to an - endpoint env var (`DIRECTORY_ADDR`, `TIMEOFF_ADDR`, `PAYROLL_ADDR`), so - `ctx.call` auto-routes to the right process when it is remote. + endpoint env var (`DIRECTORY_ADDR`, `TIMEOFF_ADDR`, `PAYROLL_ADDR`, + `ACCESS_ADDR`, `ONBOARDING_ADDR`), so `ctx.call` auto-routes to the right + process when it is remote. Nothing in the service handlers changes between topologies. @@ -200,7 +201,9 @@ Dockerfile # one image, role chosen by SERVICES env (w ```ts export const serviceCatalog = { + "access.v1.AccessService": AccessService, "directory.v1.DirectoryService": DirectoryService, + "onboarding.v1.OnboardingService": OnboardingService, "payroll.v1.PayrollService": PayrollService, "payroll.v1.PayrollEventHandlers": PayrollEventHandlers, "timeoff.v1.TimeOffService": TimeOffService, @@ -208,10 +211,21 @@ export const serviceCatalog = { declare module "@connectum/core" { interface ConnectumCallMap { + "access.v1.AccessService/ProvisionAccess": { request: ProvisionAccessRequest; response: ProvisionAccessResponse }; + "access.v1.AccessService/RevokeAccess": { request: RevokeAccessRequest; response: Empty }; "directory.v1.DirectoryService/GetEmployee": { request: GetEmployeeRequest; response: GetEmployeeResponse }; + "directory.v1.DirectoryService/CreateEmployee": { request: CreateEmployeeRequest; response: CreateEmployeeResponse }; + "directory.v1.DirectoryService/ActivateEmployee": { request: ActivateEmployeeRequest; response: ActivateEmployeeResponse }; + "directory.v1.DirectoryService/OffboardEmployee": { request: OffboardEmployeeRequest; response: OffboardEmployeeResponse }; + "onboarding.v1.OnboardingService/OnboardEmployee": { request: OnboardEmployeeRequest; response: OnboardEmployeeResponse }; + "onboarding.v1.OnboardingService/GetOnboarding": { request: GetOnboardingRequest; response: GetOnboardingResponse }; "payroll.v1.PayrollService/GetBalance": { request: GetBalanceRequest; response: GetBalanceResponse }; + "payroll.v1.PayrollService/SetupPayroll": { request: SetupPayrollRequest; response: SetupPayrollResponse }; + "payroll.v1.PayrollService/TeardownPayroll": { request: TeardownPayrollRequest; response: Empty }; "payroll.v1.PayrollEventHandlers/OnLeaveApproved": { request: LeaveApproved; response: Empty }; "timeoff.v1.TimeOffService/RequestLeave": { request: RequestLeaveRequest; response: RequestLeaveResponse }; + "timeoff.v1.TimeOffService/GrantTimeOff": { request: GrantTimeOffRequest; response: GrantTimeOffResponse }; + "timeoff.v1.TimeOffService/RevokeTimeOff": { request: RevokeTimeOffRequest; response: Empty }; } interface ConnectumStreamMap { "directory.v1.DirectoryService/ListEmployees": { request: ListEmployeesRequest; response: Employee; kind: "server-stream" }; @@ -301,10 +315,10 @@ Requires Node.js >= 25.2.0 (native TypeScript) and pnpm >= 10. ```bash pnpm install -pnpm build:proto # buf generate → gen/ (incl. catalog.gen.ts with all 3 services) +pnpm build:proto # buf generate → gen/ (incl. catalog.gen.ts with all services) pnpm db:generate # drizzle-kit → drizzle/ SQL migration (the PGlite test migrator applies it) pnpm typecheck # ctx.call is typed by the generated catalog -pnpm test # e2e: ctx.call validation + event-driven balance decrement + directory persistence +pnpm test # e2e + workflow + activity tests (no Docker or Temporal server required) ``` The `drizzle/` migrations are committed and are the single source of truth: the @@ -333,7 +347,7 @@ The same image runs each role; cross-service calls auto-route and `LeaveApproved flows over NATS. With Docker: ```bash -docker compose --profile split up # directory + timeoff + payroll + NATS +docker compose --profile split up # directory + timeoff + payroll + access + NATS ``` Or run roles directly (a NATS broker on `NATS_URL` is required for the event flow): @@ -342,6 +356,7 @@ Or run roles directly (a NATS broker on `NATS_URL` is required for the event flo DATABASE_URL=postgresql://hris:hris@localhost:5432/hris PORT=5001 SERVICES=directory.v1.DirectoryService node src/index.ts DATABASE_URL=postgresql://hris:hris@localhost:5432/hris PORT=5002 SERVICES=timeoff.v1.TimeOffService DIRECTORY_ADDR=http://localhost:5001 node src/index.ts DATABASE_URL=postgresql://hris:hris@localhost:5432/hris PORT=5003 SERVICES=payroll.v1.PayrollService node src/index.ts +DATABASE_URL=postgresql://hris:hris@localhost:5432/hris PORT=5004 SERVICES=access.v1.AccessService node src/index.ts ``` ## Testing note diff --git a/hris/src/server.ts b/hris/src/server.ts index aec5850..3d46692 100644 --- a/hris/src/server.ts +++ b/hris/src/server.ts @@ -3,11 +3,11 @@ * microservice role, decided by env (see `#topology.ts`). * * What stays constant across topologies: - * - the same three service definitions are passed to `createServer`; + * - the same five service definitions are passed to `createServer`; * - the same generated `serviceCatalog` types and routes every `ctx.call`. * * What env changes: - * - `enabledServices` — which of the three are mounted locally (undefined = + * - `enabledServices` — which of the five are mounted locally (undefined = * monolith, all local). Unmounted services are reached via `remoteResolver`. * - `remoteResolver` — maps a remote service `typeName` to its endpoint env var. * - the EventBus role — only the payroll process subscribes to LeaveApproved. diff --git a/hris/src/topology.ts b/hris/src/topology.ts index a12ca0b..cb583d0 100644 --- a/hris/src/topology.ts +++ b/hris/src/topology.ts @@ -44,7 +44,7 @@ const ENDPOINT_ENV: Readonly> = { export interface Topology { /** * Proto `typeName`s mounted locally, as a concrete list (always populated; - * `*`/unset expands to all three). Drives the EventBus role decisions. + * `*`/unset expands to all five). Drives the EventBus role decisions. */ readonly localTypeNames: readonly string[]; /** From 7a18140bfcf884338320fc6dca48247f2ae489e8 Mon Sep 17 00:00:00 2001 From: intech Date: Sun, 21 Jun 2026 20:38:14 +0400 Subject: [PATCH 4/6] =?UTF-8?q?docs(examples):=20hris=20index=20row=20?= =?UTF-8?q?=E2=80=94=20durable=20onboarding=20saga=20with=20Temporal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reflect Phase 5a in the examples index: the hris row now reads "EventBus + durable onboarding saga with Temporal" (was an imprecise "EventBus saga"), mirroring the car-sharing row. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9e13851..6325d74 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Ready-to-run examples demonstrating Connectum features — from a one-service [q | [extensions/redact](extensions/redact/) | Sensitive data redaction | Proto custom field options, `createRedactInterceptor()` | Ready | | [interceptors/jwt](interceptors/jwt/) | Client-side JWT interceptor | Bearer token injection, `createAddTokenInterceptor()` | Ready | | [with-custom-interceptor](with-custom-interceptor/) | Echo service with custom interceptors | API key auth, rate limiting | Ready | -| [hris](hris/) | Monolith **or** microservices — one codebase | `defineService` + catalog + `ctx.call` (in-process vs remote by env) + EventBus saga | Ready | +| [hris](hris/) | Monolith **or** microservices — one codebase | `defineService` + catalog + `ctx.call` (in-process vs remote by env) + EventBus + durable onboarding saga with Temporal | Ready | | [car-sharing](car-sharing/) | Enterprise deploy — Kubernetes + Istio | Split microservices + JWT/proto authz gateway + OpenTelemetry; durable trip saga with Temporal; k8s/Istio manifests (mTLS, canary) | Ready | | [with-events-kafka](with-events-kafka/) | EventBus with Kafka | Event-driven microservices, consumer groups | Ready | | [with-events-redpanda](with-events-redpanda/) | EventBus with Redpanda | Saga choreography, custom topics, Redpanda Console | Ready | From de7fb15e8197b8149cea026edd7c3cd528fa51a1 Mon Sep 17 00:00:00 2001 From: intech Date: Mon, 22 Jun 2026 16:13:08 +0400 Subject: [PATCH 5/6] docs(hris): drop internal ADR-001 citation from onboardingStatus comment Match the example-code currency cleanup: an ADR number in example code is internal jargon. The reason (no enum under erasable TypeScript) is already stated in the comment just above. Comment-only. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/src/temporal/onboardingStatus.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hris/src/temporal/onboardingStatus.ts b/hris/src/temporal/onboardingStatus.ts index 7450a52..5f6063b 100644 --- a/hris/src/temporal/onboardingStatus.ts +++ b/hris/src/temporal/onboardingStatus.ts @@ -11,7 +11,7 @@ * * The status is a plain string on the wire (no proto enum — the example's * `erasableSyntaxOnly` tsconfig rejects the native `enum` protoc-gen-es emits), - * with the domain pinned by this `const` object (ADR-001: no `enum`). + * with the domain pinned by this `const` object. * * @module temporal/onboardingStatus */ From 605b3b32801f48c3f4fdbebd337cc297bfa52d93 Mon Sep 17 00:00:00 2001 From: intech Date: Tue, 23 Jun 2026 17:48:33 +0400 Subject: [PATCH 6/6] refactor(hris): harden the onboarding saga (CodeRabbit Major findings) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address the four CodeRabbit "Major" findings on this PR, mirroring the same hardenings applied to the gold-standard car-sharing saga so the two examples stay in lockstep. - Idempotent create (F3): createEmployee is a create-by-id, so on Code.AlreadyExists it now reads the row back and treats a match as success (a Temporal retry observing its own prior commit) — only a genuinely different employee under the same id stays a terminal EmployeeExists. Compares business fields (not id), so the existing e-001 conflict test still fails fast. No proto change. (car-sharing uses a holder key instead, since its step 1 is a lock acquisition.) - GetOnboarding error mapping (F2): the blanket `catch {}` mapped ANY query failure to a terminal status. Narrow it: only WorkflowNotFoundError / QueryNotRegistered / QueryRejected (run closed/gone) fall back to describe(); a transient failure is surfaced as Code.Unavailable instead of a bogus COMPLETED/FAILED onboarding. - Compensation ordering (F4): register teardownPayroll / revokeTimeOff / revokeAccess (steps 2-4) BEFORE their forward calls, so an ambiguous failure that committed the side effect is still unwound (each comp no-ops on uncommitted state). Step 1 (createEmployee) stays register-after. - Dockerfile (F1): document the no-committed-lockfile policy honestly and point production users at `--frozen-lockfile`; run as the non-root `node` user (parity with car-sharing). New/updated tests: read-back retry succeeds; describe() fallback on a closed run; GetOnboarding surfaces Unavailable on a transient query failure; the steps-2-4 failure unwinds now include the step's own pre-registered compensation. typecheck + tests: 36/36 green. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/Dockerfile | 7 ++ hris/README.md | 15 +++-- hris/src/services/onboardingService.ts | 24 +++++-- hris/src/temporal/activities.ts | 21 ++++-- hris/src/temporal/workflows.ts | 46 +++++++------ hris/tests/activity/activities.test.ts | 14 ++++ hris/tests/e2e/onboarding.test.ts | 65 +++++++++++++++++-- .../tests/workflow/onboardingWorkflow.test.ts | 27 ++++---- 8 files changed, 169 insertions(+), 50 deletions(-) diff --git a/hris/Dockerfile b/hris/Dockerfile index ab97447..d2ce9bb 100644 --- a/hris/Dockerfile +++ b/hris/Dockerfile @@ -8,6 +8,11 @@ WORKDIR /app # Temporal workflow bundle in the worker; protobufjs is a Temporal gRPC dep), so # it MUST be present for `pnpm install` to run those build scripts. COPY package.json pnpm-workspace.yaml ./ +# No lockfile is committed for this flagship example (matches getting-started / +# car-sharing): pnpm resolves @connectum/* and other deps from the caret (^) +# ranges in package.json, so the image is example-grade, not bit-reproducible. +# Production services should commit pnpm-lock.yaml and use +# `pnpm install --frozen-lockfile` (the secondary examples model this). RUN pnpm install --prod FROM node:25-slim AS runtime @@ -21,4 +26,6 @@ ENV NODE_ENV=production EXPOSE 5000 5001 5002 5003 5004 5005 HEALTHCHECK --interval=10s --timeout=3s --start-period=15s --retries=3 \ CMD wget -q --spider http://localhost:${PORT:-5000}/healthz || exit 1 +# Run as the built-in non-root `node` user. +USER node CMD ["node", "src/index.ts"] diff --git a/hris/README.md b/hris/README.md index 13badbc..6c5914c 100644 --- a/hris/README.md +++ b/hris/README.md @@ -105,8 +105,10 @@ new id must be **free**). Only after the pre-check passes does it **start** the durable `OnboardingWorkflow` and return immediately with `STARTED`. Because the pre-check runs first, the error path needs **no live Temporal**. -The workflow runs the forward steps and pushes each step's compensation onto a -stack; on any failure it unwinds the stack in **LIFO** order, then fails: +The workflow runs the forward steps, registering each step's compensation on a +stack **before** its call (steps 2–4, so an ambiguous failure that committed the +side effect is still unwound) or after it (step 1); on any failure it unwinds the +stack in **LIFO** order, then fails: ``` createEmployee ──▶ setupPayroll ──▶ grantTimeOff ──▶ provisionAccess ──▶ activateEmployee ✓ COMPLETED @@ -116,9 +118,12 @@ createEmployee ──▶ setupPayroll ──▶ grantTimeOff ──▶ provision - **`createEmployee`** is the saga's first step; a duplicate id is a **business** failure surfaced as `Code.AlreadyExists` (via an atomic - `insert … on conflict do nothing returning`, never a raw DB error). The activity - rethrows it as a **non-retryable** `ApplicationFailure` so the workflow fails - fast with nothing to compensate. + `insert … on conflict do nothing returning`, never a raw DB error). To stay + idempotent under at-least-once retries, the activity reads the row back on + conflict: a stored employee that **matches** this hire is treated as success (a + retry observing its own prior commit), while a **genuinely different** employee + under the same id is rethrown as a **non-retryable** `ApplicationFailure` so the + workflow fails fast with nothing to compensate. - Every other step keeps Temporal's **default retry policy** — the durability the saga demonstrates. The compensations are **idempotent**, so an unwind after a partially-applied step is safe. diff --git a/hris/src/services/onboardingService.ts b/hris/src/services/onboardingService.ts index 716c338..f6cb41f 100644 --- a/hris/src/services/onboardingService.ts +++ b/hris/src/services/onboardingService.ts @@ -31,6 +31,7 @@ import { create } from "@bufbuild/protobuf"; import { Code, ConnectError } from "@connectrpc/connect"; import type { ServiceDefinition } from "@connectum/core"; import { defineService } from "@connectum/core"; +import { QueryNotRegisteredError, QueryRejectedError, WorkflowNotFoundError } from "@temporalio/client"; import { GetEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; import { GetOnboardingResponseSchema, OnboardEmployeeResponseSchema, OnboardingSchema, OnboardingService } from "#gen/onboarding/v1/onboarding_pb.ts"; import type { OnboardingStatus as OnboardingStatusT } from "#temporal/onboardingStatus.ts"; @@ -75,6 +76,16 @@ function terminalStatusFor(workflowStatusName: string): OnboardingStatusT { return workflowStatusName === "COMPLETED" ? OnboardingStatus.COMPLETED : OnboardingStatus.FAILED; } +/** + * True when a failed Query legitimately means "the run is closed/gone or its + * query handler is unavailable" — the only cases where falling back to a + * terminal status from `describe()` is correct. A transient/other error must + * NOT be collapsed into a terminal status (it is surfaced as `Unavailable`). + */ +function isClosedOrMissingRun(err: unknown): boolean { + return err instanceof WorkflowNotFoundError || err instanceof QueryNotRegisteredError || err instanceof QueryRejectedError; +} + /** * Build the OnboardingService definition with an injected Temporal client. * @@ -129,13 +140,18 @@ export function createOnboardingService(options: OnboardingServiceOptions): Serv const handle = workflowClient.getHandle(req.employeeId); - // Prefer the LIVE status from the running workflow's Query. If the - // workflow has closed (queries are rejected on closed runs), fall - // back to a terminal status derived from describe(). + // Prefer the LIVE status from the running workflow's Query. Only when + // the Query is unavailable because the run is closed/gone or its + // query handler isn't registered do we fall back to a terminal status + // from describe(). A transient/other failure is surfaced as + // Unavailable, never silently mapped to a terminal status. let status: OnboardingStatusT; try { status = await handle.query("getOnboardingStatus"); - } catch { + } catch (err) { + if (!isClosedOrMissingRun(err)) { + throw new ConnectError(`Could not read onboarding status for "${req.employeeId}".`, Code.Unavailable); + } const description = await handle.describe(); status = terminalStatusFor(description.status.name); } diff --git a/hris/src/temporal/activities.ts b/hris/src/temporal/activities.ts index 0463c11..8ffd20d 100644 --- a/hris/src/temporal/activities.ts +++ b/hris/src/temporal/activities.ts @@ -28,7 +28,7 @@ import { create } from "@bufbuild/protobuf"; import { Code, ConnectError } from "@connectrpc/connect"; import { ApplicationFailure } from "@temporalio/activity"; import { ProvisionAccessRequestSchema, RevokeAccessRequestSchema } from "#gen/access/v1/access_pb.ts"; -import { ActivateEmployeeRequestSchema, CreateEmployeeRequestSchema, OffboardEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; +import { ActivateEmployeeRequestSchema, CreateEmployeeRequestSchema, GetEmployeeRequestSchema, OffboardEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; import { SetupPayrollRequestSchema, TeardownPayrollRequestSchema } from "#gen/payroll/v1/payroll_pb.ts"; import { GrantTimeOffRequestSchema, RevokeTimeOffRequestSchema } from "#gen/timeoff/v1/timeoff_pb.ts"; import type { ServiceClients } from "#temporal/clients.ts"; @@ -75,10 +75,13 @@ export interface NewHire { // ── Forward steps ───────────────────────────────────────────────────────── /** - * Step 1 — create the directory row (status "onboarding"). A business failure - * (the id is already taken → `Code.AlreadyExists`) is rethrown as a - * NON-RETRYABLE `ApplicationFailure(EMPLOYEE_EXISTS)` so the workflow fails fast - * with no compensation; any other (infra) error stays retryable. + * Step 1 — create the directory row (status "onboarding"). Idempotent across + * Temporal retries: on `Code.AlreadyExists` it reads the existing row back and, + * if it matches this hire, treats it as success (a retry that observed its OWN + * prior commit) rather than a failure. A row that DIFFERS under the same id is a + * genuine duplicate-id conflict, rethrown as a NON-RETRYABLE + * `ApplicationFailure(EMPLOYEE_EXISTS)` so the workflow fails fast with no + * compensation; any other (infra) error stays retryable. * * @param hire - The {@link NewHire} details. */ @@ -96,6 +99,14 @@ export async function createEmployee(hire: NewHire): Promise { ); } catch (err) { if (err instanceof ConnectError && err.code === Code.AlreadyExists) { + // Read-back equivalence: a retry may observe its own prior commit. + // If the stored row matches this hire, the create already succeeded. + const existing = await clients().directory.getEmployee(create(GetEmployeeRequestSchema, { id: hire.employeeId })); + const e = existing.employee; + if (e !== undefined && e.name === hire.name && e.email === hire.email && e.title === hire.title && e.department === hire.department && e.managerId === hire.managerId) { + return; + } + // A genuinely different employee already owns this id → terminal. throw ApplicationFailure.create({ message: err.message, type: EMPLOYEE_EXISTS, diff --git a/hris/src/temporal/workflows.ts b/hris/src/temporal/workflows.ts index f03648b..619eaf1 100644 --- a/hris/src/temporal/workflows.ts +++ b/hris/src/temporal/workflows.ts @@ -9,15 +9,16 @@ * runtime (that would break determinism). All I/O is in the activities; the * workflow only orchestrates. * - * Saga (compensation-stack pattern, samples-repo style): run the forward steps - * in order; after each side-effecting step `unshift` its compensation onto a - * stack; on ANY failure, run the compensations in LIFO order (each wrapped in - * its own try/catch so the unwind never throws), then rethrow the original - * error. The result: - * - provisionAccess fails → teardownPayroll → ... wait, LIFO from the failing - * point: revokeTimeOff → teardownPayroll → offboardEmployee - * - grantTimeOff fails → teardownPayroll → offboardEmployee - * - setupPayroll fails → offboardEmployee + * Saga (compensation-stack pattern): run the forward steps in order, registering + * each step's compensation on a LIFO stack — BEFORE the forward call (steps 2-4), + * so an ambiguous failure that DID commit the side effect is still unwound, or + * AFTER for step 1 (a failed create committed nothing, and a retry that observed + * its own commit is reconciled in the activity). On ANY failure, run the + * compensations in LIFO order (each wrapped in its own try/catch so the unwind + * never throws), then rethrow the original error. The result: + * - provisionAccess fails → revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee + * - grantTimeOff fails → revokeTimeOff → teardownPayroll → offboardEmployee + * - setupPayroll fails → teardownPayroll → offboardEmployee * - createEmployee fails → (non-retryable) fail fast, NOTHING to compensate. * * activateEmployee (step 5) pushes NO compensation: it is the terminal @@ -86,27 +87,34 @@ export async function OnboardingWorkflow(input: OnboardingWorkflowInput): Promis let status: OnboardingStatusT = OnboardingStatus.STARTED; setHandler(getOnboardingStatusQuery, () => status); - // LIFO compensation stack: unshift after each side-effecting forward step. + // LIFO compensation stack: each step registers its compensation here, BEFORE + // its forward call (steps 2-4) so an ambiguous failure that committed the + // side effect is still unwound, or AFTER for step 1. const compensations: Compensation[] = []; try { - // Step 1 — create the directory row (business failure here is - // non-retryable and fails the workflow fast; nothing pushed, nothing to - // undo). + // Step 1 — create the directory row. A business failure here (the id is + // already taken) is non-retryable and fails the workflow fast; registered + // AFTER, since a failed create committed nothing to offboard (and the + // activity reconciles a retry that observed its own prior commit). await acts.createEmployee({ employeeId, name, email, title, department, managerId }); compensations.unshift({ name: "offboardEmployee", run: () => acts.offboardEmployee({ employeeId }) }); - // Step 2 — enroll in payroll. - await acts.setupPayroll({ employeeId }); + // Step 2 — enroll in payroll. Register the teardown BEFORE the call: + // teardownPayroll is idempotent (a no-op if enrollment never committed), + // so an ambiguous setupPayroll failure is still unwound. compensations.unshift({ name: "teardownPayroll", run: () => acts.teardownPayroll({ employeeId }) }); + await acts.setupPayroll({ employeeId }); - // Step 3 — grant the annual PTO policy. - await acts.grantTimeOff({ employeeId }); + // Step 3 — grant the annual PTO policy. Register the revoke BEFORE the + // call (revokeTimeOff is a no-op if the grant never committed). compensations.unshift({ name: "revokeTimeOff", run: () => acts.revokeTimeOff({ employeeId }) }); + await acts.grantTimeOff({ employeeId }); - // Step 4 — provision system access. - await acts.provisionAccess({ employeeId, email }); + // Step 4 — provision system access. Register the revoke BEFORE the call + // (revokeAccess is a no-op if the account was never created). compensations.unshift({ name: "revokeAccess", run: () => acts.revokeAccess({ employeeId }) }); + await acts.provisionAccess({ employeeId, email }); // Step 5 — activate the employee (onboarding → active). Terminal; no // compensation. diff --git a/hris/tests/activity/activities.test.ts b/hris/tests/activity/activities.test.ts index bde4b23..196fd99 100644 --- a/hris/tests/activity/activities.test.ts +++ b/hris/tests/activity/activities.test.ts @@ -130,6 +130,20 @@ describe("Activities: real RPC wiring + compensation idempotency (in-process mon ); }); + it("createEmployee is idempotent: a retry observing its OWN prior commit succeeds (read-back equivalence)", async () => { + // The first create commits the row; a Temporal retry re-runs the activity + // and sees ALREADY_EXISTS. Read-back equivalence recognises the stored row + // as THIS hire's own and returns success — not a duplicate-id failure. + await run(activities.createEmployee, HIRE); + await run(activities.createEmployee, HIRE); + + // Still exactly this hire, still in onboarding (the retry did not mutate it). + const directory = server.localClient(DirectoryService); + const employee = await directory.getEmployee(create(GetEmployeeRequestSchema, { id: HIRE.employeeId })); + assert.equal(employee.employee?.status, "onboarding"); + assert.equal(employee.employee?.name, "New Hire"); + }); + it("offboardEmployee is idempotent: marking offboarded twice (and an unknown id) is a no-op success", async () => { const directory = server.localClient(DirectoryService); await run(activities.createEmployee, HIRE); diff --git a/hris/tests/e2e/onboarding.test.ts b/hris/tests/e2e/onboarding.test.ts index 55de6e0..dad0bc0 100644 --- a/hris/tests/e2e/onboarding.test.ts +++ b/hris/tests/e2e/onboarding.test.ts @@ -25,6 +25,7 @@ import { create } from "@bufbuild/protobuf"; import { Code, ConnectError } from "@connectrpc/connect"; import type { Server } from "@connectum/core"; import { MemoryAdapter } from "@connectum/events"; +import { QueryNotRegisteredError } from "@temporalio/client"; import { buildEventBus } from "#eventBus.ts"; import { GetOnboardingRequestSchema, OnboardEmployeeRequestSchema, OnboardingService } from "#gen/onboarding/v1/onboarding_pb.ts"; import { buildServer } from "#server.ts"; @@ -32,20 +33,44 @@ import type { OnboardingWorkflowClient } from "#services/onboardingService.ts"; import { resolveTopology } from "#topology.ts"; import { makeTestDb } from "../helpers/db.ts"; -/** A stub workflow client recording started ids and serving a live STARTED query. */ -function makeStubClient(started: string[]): OnboardingWorkflowClient { +/** How a stub handle answers GetOnboarding: a live query result, or a closed describe(). */ +interface StubHandleBehaviour { + /** Status the live Query returns; if `undefined`, the Query throws. */ + readonly queryStatus?: string; + /** Workflow status name returned by describe() when the Query is rejected. */ + readonly describeStatusName?: string; + /** + * When the Query throws (`queryStatus` undefined): `"closed"` (default) → a + * `QueryNotRegisteredError` (run closed/gone) → describe() fallback; + * `"transient"` → a generic error → surfaced as `Unavailable`. + */ + readonly queryError?: "closed" | "transient"; +} + +/** A stub workflow client recording started ids and answering from `handleBehaviour`. */ +function makeStubClient(started: string[], handleBehaviour: () => StubHandleBehaviour): OnboardingWorkflowClient { return { async start(_type, opts) { started.push(opts.workflowId); return { workflowId: opts.workflowId }; }, getHandle() { + const behaviour = handleBehaviour(); return { - async query() { - return "STARTED" as unknown as Ret; + async query(): Promise { + if (behaviour.queryStatus === undefined) { + if (behaviour.queryError === "transient") { + // A generic/infra failure — NOT a closed-run signal. + throw new Error("temporal frontend unavailable"); + } + // Closed/gone run: its query handler is not registered. + // (code 3 = INVALID_ARGUMENT; the handler keys off the type.) + throw new QueryNotRegisteredError("query handler not registered on a closed run", 3); + } + return behaviour.queryStatus as Ret; }, async describe() { - return { status: { name: "RUNNING" } }; + return { status: { name: behaviour.describeStatusName ?? "RUNNING" } }; }, }; }, @@ -55,12 +80,14 @@ function makeStubClient(started: string[]): OnboardingWorkflowClient { describe("E2E: onboarding edge (pre-check + start, stub Temporal)", () => { let server: Server; const started: string[] = []; + // Mutated per GetOnboarding test to drive the stub handle's query/describe. + let handleBehaviour: StubHandleBehaviour = { queryStatus: "STARTED" }; before(async () => { const topology = resolveTopology("*"); const eventBus = buildEventBus({ localTypeNames: topology.localTypeNames, adapter: MemoryAdapter() }); const db = await makeTestDb(); - server = buildServer({ port: 0, topology, eventBus, db, workflowClient: makeStubClient(started) }); + server = buildServer({ port: 0, topology, eventBus, db, workflowClient: makeStubClient(started, () => handleBehaviour) }); await server.start(); }); @@ -104,10 +131,36 @@ describe("E2E: onboarding edge (pre-check + start, stub Temporal)", () => { }); it("GetOnboarding reads the live status via the workflow query", async () => { + handleBehaviour = { queryStatus: "STARTED" }; const onboarding = server.localClient(OnboardingService); const res = await onboarding.getOnboarding(create(GetOnboardingRequestSchema, { employeeId: "e-300" })); assert.equal(res.onboarding?.status, "STARTED"); }); + + it("GetOnboarding falls back to a terminal status via describe() when the workflow has CLOSED", async () => { + // queryStatus undefined → the stub Query throws a QueryNotRegisteredError + // (run closed/gone), which the handler treats as "fall back to describe()". + const onboarding = server.localClient(OnboardingService); + handleBehaviour = { describeStatusName: "COMPLETED" }; + const done = await onboarding.getOnboarding(create(GetOnboardingRequestSchema, { employeeId: "e-300" })); + assert.equal(done.onboarding?.status, "COMPLETED"); + + handleBehaviour = { describeStatusName: "FAILED" }; + const failed = await onboarding.getOnboarding(create(GetOnboardingRequestSchema, { employeeId: "e-300" })); + assert.equal(failed.onboarding?.status, "FAILED"); + }); + + it("GetOnboarding surfaces Unavailable (NOT a terminal status) when the live Query fails transiently", async () => { + // Regression for the blanket catch that mapped ANY query error to a + // terminal status: a transient/infra failure must surface as Unavailable, + // never be silently reported as a COMPLETED/FAILED onboarding. + const onboarding = server.localClient(OnboardingService); + handleBehaviour = { queryError: "transient" }; + await assert.rejects( + onboarding.getOnboarding(create(GetOnboardingRequestSchema, { employeeId: "e-300" })), + (err: unknown) => err instanceof ConnectError && err.code === Code.Unavailable, + ); + }); }); describe("E2E: onboarding edge with NO Temporal client (pre-check still runs)", () => { diff --git a/hris/tests/workflow/onboardingWorkflow.test.ts b/hris/tests/workflow/onboardingWorkflow.test.ts index 96b184a..33f5043 100644 --- a/hris/tests/workflow/onboardingWorkflow.test.ts +++ b/hris/tests/workflow/onboardingWorkflow.test.ts @@ -11,10 +11,11 @@ * provisionAccess → activateEmployee. * - activateEmployee fails: the recorded tail unwinds in LIFO order — * revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee. - * - provisionAccess fails: only the 3→1 compensations run (provisionAccess - * pushed nothing before failing) — revokeTimeOff → teardownPayroll → - * offboardEmployee. - * - setupPayroll fails: only offboardEmployee runs. + * - provisionAccess fails: its OWN compensation runs too, because it is + * registered BEFORE the call (register-before) — revokeAccess → revokeTimeOff + * → teardownPayroll → offboardEmployee. + * - setupPayroll fails: teardownPayroll → offboardEmployee (the teardown is + * registered before the call). * - createEmployee fails (non-retryable): fails fast with NO compensation. * * Failures are forced by making the MOCK throw `ApplicationFailure.nonRetryable`, @@ -120,21 +121,25 @@ describe("OnboardingWorkflow: orchestration + compensation (time-skipping, mocke ]); }); - it("provisionAccess fails: the 3→1 compensations run (revokeTimeOff → teardownPayroll → offboardEmployee)", async () => { + it("provisionAccess fails: its PRE-registered revoke runs too (revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee)", async () => { const calls: string[] = []; await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "provisionAccess" }), "wf-provision-fail")); - // provisionAccess pushed NO compensation before failing, so the unwind - // starts from step 3's revokeTimeOff. - assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "revokeTimeOff", "teardownPayroll", "offboardEmployee"]); + // revokeAccess is registered BEFORE provisionAccess (register-before), so + // an ambiguous provisionAccess failure still unwinds it (no-op if the + // account was never created). With the old register-AFTER order it would + // have been missed. + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "revokeAccess", "revokeTimeOff", "teardownPayroll", "offboardEmployee"]); }); - it("setupPayroll fails: only offboardEmployee compensates (step 1's undo)", async () => { + it("setupPayroll fails: its PRE-registered teardown runs too (teardownPayroll → offboardEmployee)", async () => { const calls: string[] = []; await assert.rejects(runWorkflow(makeMockActivities(calls, { step: "setupPayroll" }), "wf-setup-fail")); - // Only createEmployee pushed a compensation before setupPayroll failed. - assert.deepEqual(calls, ["createEmployee", "setupPayroll", "offboardEmployee"]); + // teardownPayroll is registered BEFORE setupPayroll (register-before), so + // an ambiguous setupPayroll failure still unwinds it (no-op if enrollment + // never committed). createEmployee's offboard follows in LIFO order. + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "teardownPayroll", "offboardEmployee"]); }); it("createEmployee fails (non-retryable): fails fast with NO compensation", async () => {