From 25fa6aa8afdff3c0fbb579cb46f53769d5759e58 Mon Sep 17 00:00:00 2001 From: intech Date: Sun, 21 Jun 2026 21:11:09 +0400 Subject: [PATCH] feat(hris): EmployeeOnboarded broadcast / fan-out (Phase 5b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the multi-subscriber face of the EventBus, completing the hris example's three-mechanism story. The onboarding saga's terminal step now broadcasts `EmployeeOnboarded` ONCE, and three INDEPENDENT reactors consume it — welcome (welcome email), audit (onboarding record), headcount (per-department tally). - The publish-only bus uses the 1.1.0 `publishes: [OnboardingEventHandlers]` option, so the topic `onboarding.employee-onboarded` is resolved from the proto option with no raw topic hand-passed (a route-less publisher otherwise falls back to the message typeName). - Fan-out is four buses, not one: each reactor runs on its own bus with a DISTINCT consumer group (the per-bus duplicate-topic guard forces this), so on a broker every reactor gets every event. Reactors are idempotent (dedupe by employeeId — broker broadcast is at-least-once). Each runs as its own process (`node src/reactor.ts`, role by REACTOR), mirroring the RPC roles / worker. - The broadcast is fire-and-forget: it runs OUTSIDE the saga's compensation scope and a publish failure is logged, never rolled back — a lost broadcast must never undo a completed onboarding. The worker owns the publish bus lifecycle and injects it into the activities via setPublisherBus. Verified dockerless against the 1.1.0-shape (pkg.pr.new @165): typecheck clean, 37/37 tests — the broadcast fan-out (one publish → three reactors with the full message shape), topic-from-proto-option, idempotency, and the new broadcast-failure-tolerance workflow test. The LeaveApproved e2e is unchanged. Requires @connectum/events >= 1.1.0 (publishes/broadcast); the committed ^1.0.0 range resolves 1.1.0 once published. DRAFT until 1.1.0 is on npm. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01MdeH7fExPmiRHRirGuvGk3 --- hris/README.md | 53 +++++- hris/docker-compose.yml | 50 ++++++ hris/package.json | 1 + .../onboarding/v1/onboarding_events.proto | 45 +++++ hris/src/events/broadcastBus.ts | 108 ++++++++++++ hris/src/events/reactors.ts | 158 ++++++++++++++++++ hris/src/reactor.ts | 62 +++++++ hris/src/temporal/activities.ts | 59 +++++++ hris/src/temporal/workflows.ts | 15 +- hris/src/worker.ts | 9 + hris/tests/e2e/broadcast.test.ts | 126 ++++++++++++++ .../tests/workflow/onboardingWorkflow.test.ts | 16 +- 12 files changed, 695 insertions(+), 7 deletions(-) create mode 100644 hris/proto/onboarding/v1/onboarding_events.proto create mode 100644 hris/src/events/broadcastBus.ts create mode 100644 hris/src/events/reactors.ts create mode 100644 hris/src/reactor.ts create mode 100644 hris/tests/e2e/broadcast.test.ts diff --git a/hris/README.md b/hris/README.md index 13badbc..28c0261 100644 --- a/hris/README.md +++ b/hris/README.md @@ -11,7 +11,7 @@ the right tool for a different job: | Mechanism | Used for | In this example | |---|---|---| | **`ctx.call`** (synchronous) | a reply you need now | `RequestLeave` validates the employee against the directory | -| **EventBus** (fire-and-forget) | broadcast a fact | `LeaveApproved` → payroll decrements the balance | +| **EventBus** (fire-and-forget) | announce a fact to 1 or N consumers | `LeaveApproved` → payroll (single subscriber); `EmployeeOnboarded` → welcome + audit + headcount ([fan-out](#broadcast--fan-out-phase-5b)) | | **Temporal saga** (durable) | a long multi-step transaction with rollback | onboarding a new hire across four services | Five services, one integration event, and a durable saga: @@ -148,6 +148,48 @@ grpcurl -plaintext -d '{"employee_id":"e-100","name":"New Hire","email":"newhire open http://localhost:8088 ``` +## Broadcast / fan-out (Phase 5b) + +The example uses the EventBus for **both** of its shapes. `LeaveApproved` is a +**single** subscriber (payroll). `EmployeeOnboarded` is the **1→N broadcast**: +when the onboarding saga COMPLETES, the worker's terminal `announceOnboarded` +activity publishes the fact ONCE, and **three independent reactors** consume it — +**welcome** ("sends" a welcome email), **audit** (appends an onboarding record), +and **headcount** (tallies per-department headcount). EventBus is used here for +broadcast only; durability stays the saga's job — *a lost broadcast must never +roll back a completed onboarding*, so the publish runs outside the compensation +scope and a failure is logged, not retried into a rollback. + +The fan-out is **four buses, not one bus with three handlers**: + +- the **publisher** bus is publish-only (`routes: []`, `publishes: + [OnboardingEventHandlers]`). Listing the event service in `publishes` resolves + the topic `onboarding.employee-onboarded` from the proto + `(connectum.events.v1.event).topic` option, so the publisher passes **no raw + topic** — without it, a route-less publisher would fall back to the message + `typeName`. +- each **reactor** bus has one route under its **own distinct consumer group**. + Two routes on the same topic cannot share a bus (the duplicate-topic guard + throws at `start()`), so the three reactors are forced onto three buses. On a + broker, distinct groups give distinct durable consumers → each gets every event + (fan-out); a shared group would load-balance (one reactor steals each event). + +Each reactor runs as its own process (`node src/reactor.ts`, role by `REACTOR`), +the same one-image-many-roles pattern as the RPC roles and the worker. The +reactors are **idempotent** (dedupe by `employeeId`) because a broker broadcast +is at-least-once. + +```bash +# The reactors come up with the `saga` profile (welcome / audit / headcount): +docker compose --profile split --profile saga up +``` + +::: warning Requires @connectum/events ≥ 1.1.0 +The publish-only `publishes` option and the broadcast wiring ship in **1.1.0**. +The `^1.0.0` range in `package.json` resolves 1.1.0 once it is published; until +then, this Phase 5b broadcast is verified against pre-release snapshots. +::: + ## Layout ``` @@ -157,6 +199,7 @@ proto/ payroll/v1/payroll.proto # GetBalance + LeaveApproved + SetupPayroll/TeardownPayroll (saga) access/v1/access.proto # AccessService.ProvisionAccess/RevokeAccess (saga leaf) onboarding/v1/onboarding.proto # OnboardingService.OnboardEmployee/GetOnboarding (saga gateway) + onboarding/v1/onboarding_events.proto # EmployeeOnboarded + OnboardingEventHandlers (broadcast) connectum/events/v1/options.proto # (connectum.events.v1.event).topic option buf.gen.yaml # protoc-gen-es + protoc-gen-connectum-catalog (strategy: all) src/ @@ -174,7 +217,10 @@ src/ temporal/clients.ts # ConnectRPC clients the activities drive (*_ADDR) temporal/activities.ts # saga side effects (each one RPC) + compensations temporal/workflows.ts # OnboardingWorkflow — the durable saga (deterministic sandbox) - worker.ts # @temporalio/worker host (the ONLY native-addon process) + worker.ts # @temporalio/worker host (publishes EmployeeOnboarded); native-addon process + reactor.ts # EmployeeOnboarded broadcast subscriber (role by REACTOR) + events/broadcastBus.ts # publish-only bus (publishes) + per-reactor buses (distinct groups) + events/reactors.ts # welcome / audit / headcount reactors (idempotent) topology.ts # env → enabledServices + remoteResolver events.ts # LEAVE_APPROVED_TOPIC constant (publisher/subscriber match) eventBus.ts # one bus per process; payroll subscribes only when local @@ -190,7 +236,8 @@ tests/ e2e/onboarding.test.ts # onboarding edge — pre-check + start (stub Temporal) activity/activities.test.ts # real activities ↔ RPC wiring + compensation idempotency workflow/onboardingWorkflow.test.ts# saga orchestration + LIFO compensation (time-skipping) -docker-compose.yml # mono + split + saga profiles (NATS + Postgres + Temporal) + e2e/broadcast.test.ts # EmployeeOnboarded 1→N fan-out to three reactors (MemoryAdapter) +docker-compose.yml # mono + split + saga profiles (NATS + Postgres + Temporal + reactors) Dockerfile # one image, role chosen by SERVICES env (worker = node src/worker.ts) ``` diff --git a/hris/docker-compose.yml b/hris/docker-compose.yml index c3c8549..4ef1b16 100644 --- a/hris/docker-compose.yml +++ b/hris/docker-compose.yml @@ -304,7 +304,12 @@ services: - PAYROLL_ADDR=http://payroll:5003 - TIMEOFF_ADDR=http://timeoff:5002 - ACCESS_ADDR=http://access:5004 + # The terminal announceOnboarded activity publishes EmployeeOnboarded on + # the publish-only EventBus (Phase 5b broadcast). + - NATS_URL=nats://nats:4222 depends_on: + nats: + condition: service_healthy temporal: condition: service_healthy directory: @@ -319,6 +324,51 @@ services: - hris profiles: ["saga"] + # ── REACTORS — three independent EmployeeOnboarded broadcast subscribers ─── + # + # Each runs the SAME image with `node src/reactor.ts`, selecting its route + + # consumer group by REACTOR. Distinct groups → distinct NATS durable consumers + # → every reactor receives every event = fan-out (a shared group would + # load-balance / steal). No inbound RPC; a reactor only subscribes. + reactor-welcome: + build: . + command: ["node", "src/reactor.ts"] + environment: + - REACTOR=welcome + - NATS_URL=nats://nats:4222 + depends_on: + nats: + condition: service_healthy + networks: + - hris + profiles: ["saga"] + + reactor-audit: + build: . + command: ["node", "src/reactor.ts"] + environment: + - REACTOR=audit + - NATS_URL=nats://nats:4222 + depends_on: + nats: + condition: service_healthy + networks: + - hris + profiles: ["saga"] + + reactor-headcount: + build: . + command: ["node", "src/reactor.ts"] + environment: + - REACTOR=headcount + - NATS_URL=nats://nats:4222 + depends_on: + nats: + condition: service_healthy + networks: + - hris + profiles: ["saga"] + networks: hris: driver: bridge diff --git a/hris/package.json b/hris/package.json index f2516f7..8f4e370 100644 --- a/hris/package.json +++ b/hris/package.json @@ -12,6 +12,7 @@ "start": "node src/index.ts", "dev": "node --watch src/index.ts", "worker": "node src/worker.ts", + "reactor": "node src/reactor.ts", "typecheck": "tsc --noEmit", "buf:generate": "buf generate", "buf:lint": "buf lint", diff --git a/hris/proto/onboarding/v1/onboarding_events.proto b/hris/proto/onboarding/v1/onboarding_events.proto new file mode 100644 index 0000000..356ef9e --- /dev/null +++ b/hris/proto/onboarding/v1/onboarding_events.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package onboarding.v1; + +import "connectum/events/v1/options.proto"; +import "google/protobuf/empty.proto"; + +// EmployeeOnboarded is the integration event broadcast EXACTLY ONCE when the +// onboarding saga COMPLETES (the new hire is active). It is consumed +// INDEPENDENTLY by three reactors — welcome, audit-log, and headcount — each on +// its own EventBus with its own consumer group, so the single domain fact fans +// out 1→N rather than being orchestrated. +// +// EventBus is used here for BROADCAST only; durability/ordering stays the saga's +// (Temporal's) job. A lost broadcast must NEVER roll back a completed onboarding. +message EmployeeOnboarded { + // The onboarded employee's id (also the reactor idempotency key). + string employee_id = 1; + string name = 2; + // Work email — the welcome reactor's notification target. + string email = 3; + // Department — the headcount reactor tallies per department. + string department = 4; + // The new hire's manager id (empty for the top of the org chart). + string manager_id = 5; +} + +// OnboardingEventHandlers is an event-handler service: each RPC is a +// subscription. The (connectum.events.v1.event).topic option pins the topic so +// the PUBLISHER (which lists this service in `publishes`) and EACH subscriber +// (which registers this service via `routes` on its OWN bus) agree on the topic +// from the proto option alone — no raw topic string is ever hand-passed to +// publish(). +// +// One shared service is reused by the publisher and all three reactors; +// independence between reactors comes from SEPARATE buses + DISTINCT consumer +// groups, not from separate proto services (registering the same service on +// three separate buses is fine — the duplicate-topic guard is per-bus). +service OnboardingEventHandlers { + // OnEmployeeOnboarded reacts to a completed onboarding. Each reactor binds its + // OWN handler function to this RPC on its OWN bus. + rpc OnEmployeeOnboarded(EmployeeOnboarded) returns (google.protobuf.Empty) { + option (connectum.events.v1.event).topic = "onboarding.employee-onboarded"; + } +} diff --git a/hris/src/events/broadcastBus.ts b/hris/src/events/broadcastBus.ts new file mode 100644 index 0000000..99e3250 --- /dev/null +++ b/hris/src/events/broadcastBus.ts @@ -0,0 +1,108 @@ +/** + * EventBus factories for the Phase 5b broadcast — ONE publisher, THREE reactors. + * + * Phase 5b adds the multi-subscriber face of the EventBus to the example. The + * existing `LeaveApproved` flow is a SINGLE subscriber (payroll); this is the + * fire-and-forget 1→N BROADCAST: the single domain fact "an employee was + * onboarded" is published ONCE as `EmployeeOnboarded` and consumed INDEPENDENTLY + * by three reactors. EventBus is used for BROADCAST only — never orchestration + * (that is the saga's job). + * + * The fan-out is achieved with FOUR separate buses, not one bus with three + * handlers: + * + * - the PUBLISHER bus is publish-only (`routes: []`, `publishes: + * [OnboardingEventHandlers]`). Listing the event service in `publishes` + * populates the publish-topic lookup from the proto + * `(connectum.events.v1.event).topic` option, so `publish(EmployeeOnboardedSchema, + * …)` resolves `onboarding.employee-onboarded` with NO raw `{topic}` + * hand-passed. This is the whole point of `publishes`: a pure publisher has no + * subscriber routes, so without it the topic would silently fall back to the + * message `typeName`. + * - each REACTOR bus has ONE route and its OWN DISTINCT consumer group. Two + * routes resolving to the same topic CANNOT share one bus (the duplicate-topic + * guard throws at `start()`), so three reactors on + * `onboarding.employee-onboarded` are FORCED onto three buses. On a real + * broker, distinct groups give distinct durable consumers → each gets every + * event = fan-out. A shared group would load-balance (one reactor steals each + * event) = queue. + * + * The adapter is pluggable: NATS by default (`NATS_URL`), but tests pass ONE + * shared `MemoryAdapter()` to all four buses so a single publish reaches all + * three reactors in-process with no broker (MemoryAdapter broadcasts to every + * matching subscription and ignores group; the distinct groups are still written + * so the SAME wiring fans out on NATS). + * + * @module events/broadcastBus + */ + +import { createEventBus } from "@connectum/events"; +import type { EventAdapter, EventBus, EventRoute } from "@connectum/events"; +import type { EventBusLike } from "@connectum/core"; +import { NatsAdapter } from "@connectum/events-nats"; +import { OnboardingEventHandlers } from "#gen/onboarding/v1/onboarding_events_pb.ts"; + +/** A reactor's stable identity: which side effect + which consumer group. */ +export const REACTOR_GROUP = { + /** Welcome reactor — "sends" a welcome email to the new hire. */ + welcome: "hris-welcome", + /** Audit-log reactor — appends one immutable record per onboarded employee. */ + audit: "hris-audit", + /** Headcount reactor — tallies active headcount per department. */ + headcount: "hris-headcount", +} as const; + +/** One of the reactor selector keys (`welcome` | `audit` | `headcount`). */ +export type ReactorKey = keyof typeof REACTOR_GROUP; + +/** A bus that can both `publish` (the `EventBus` API) and start/stop (`EventBusLike`). */ +export type ManagedBus = EventBus & EventBusLike; + +/** Default NATS JetStream stream for the example's broadcast topic. */ +const NATS_STREAM = "hris"; + +/** + * Build the NATS adapter for a process, reading `NATS_URL` (default + * `nats://localhost:4222`). One adapter instance per bus in the split topology, + * so each reactor owns an independent broker connection + durable consumer. + */ +function natsAdapter(): EventAdapter { + return NatsAdapter({ servers: process.env.NATS_URL ?? "nats://localhost:4222", stream: NATS_STREAM }); +} + +/** + * Build the PUBLISH-ONLY bus that the worker (the saga's terminal activity) uses + * to broadcast `EmployeeOnboarded`. + * + * It has NO routes and lists `OnboardingEventHandlers` in `publishes`, so the + * topic `onboarding.employee-onboarded` is resolved from the proto option — the + * publisher passes NO raw topic. + * + * @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`); + * defaults to a NATS adapter from `NATS_URL`. + */ +export function buildPublisherBus(options: { readonly adapter?: EventAdapter } = {}): ManagedBus { + return createEventBus({ + adapter: options.adapter ?? natsAdapter(), + routes: [], + publishes: [OnboardingEventHandlers], + }); +} + +/** + * Build ONE reactor bus: a single route subscribed to + * `onboarding.employee-onboarded` (topic from the route's proto option) under + * its OWN distinct consumer group. + * + * @param options.key - The reactor selector, fixing its consumer group. + * @param options.route - The reactor's event route (its `OnEmployeeOnboarded` handler). + * @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`); + * defaults to a NATS adapter from `NATS_URL`. + */ +export function buildReactorBus(options: { readonly key: ReactorKey; readonly route: EventRoute; readonly adapter?: EventAdapter }): ManagedBus { + return createEventBus({ + adapter: options.adapter ?? natsAdapter(), + routes: [options.route], + group: REACTOR_GROUP[options.key], + }); +} diff --git a/hris/src/events/reactors.ts b/hris/src/events/reactors.ts new file mode 100644 index 0000000..4b44bcf --- /dev/null +++ b/hris/src/events/reactors.ts @@ -0,0 +1,158 @@ +/** + * The three independent `EmployeeOnboarded` reactors and their in-memory state. + * + * Each reactor is a SEPARATE consumer of the ONE `onboarding.employee-onboarded` + * broadcast: + * + * - WELCOME — "sends" a welcome email to the new hire. + * - AUDIT-LOG — appends one immutable record per onboarded employee. + * - HEADCOUNT — tallies active headcount per department. + * + * Every reactor binds its OWN handler function to the SAME + * `OnboardingEventHandlers` service descriptor, but on its OWN bus with its OWN + * consumer group (built in `broadcastBus.ts`). Reusing one proto service across + * three buses is fine — the duplicate-topic guard is per-bus — and independence + * comes from the separate buses + distinct groups, not from separate proto + * services. + * + * IDEMPOTENCY — every reactor dedupes by `employeeId`. On a real broker the + * broadcast is at-least-once (a worker crash after publish-before-ack re-fires + * the reactors), and a redelivery would otherwise send a second welcome email or + * double-count a headcount. Deduping by `employeeId` absorbs the redelivery and + * also makes the dockerless test deterministic. This is why broadcast reactors + * are written idempotent (durable/ordered delivery is the saga's job, not the + * EventBus's). + * + * `reset*()` / inspect helpers are the test seams. + * + * @module events/reactors + */ + +import type { EventRoute } from "@connectum/events"; +import { OnboardingEventHandlers } from "#gen/onboarding/v1/onboarding_events_pb.ts"; + +// ── Welcome reactor ───────────────────────────────────────────────────────── + +/** Employee ids already welcomed (idempotency set). */ +const welcomeSeen = new Set(); +/** The email addresses a welcome was "sent" to (one per distinct employee). */ +const sentWelcomes: string[] = []; + +/** The list of emails the welcome reactor has sent to (test/inspection helper). */ +export function welcomeEmails(): readonly string[] { + return sentWelcomes; +} + +/** Reset the welcome reactor's state — used between tests. */ +export function resetWelcome(): void { + welcomeSeen.clear(); + sentWelcomes.length = 0; +} + +/** + * Welcome route: on each `EmployeeOnboarded`, "send" a welcome email ONCE per + * `employeeId` (a redelivery is a no-op). + */ +export const welcomeReactorRoutes: EventRoute = (events) => { + events.service(OnboardingEventHandlers, { + async onEmployeeOnboarded(event, ctx) { + if (!welcomeSeen.has(event.employeeId)) { + welcomeSeen.add(event.employeeId); + sentWelcomes.push(event.email); + } + await ctx.ack(); + }, + }); +}; + +// ── Audit-log reactor ─────────────────────────────────────────────────────── + +/** One immutable audit record per onboarded employee (the full event shape). */ +export interface AuditRecord { + readonly employeeId: string; + readonly name: string; + readonly email: string; + readonly department: string; + readonly managerId: string; +} + +/** Append-only audit log (one record per distinct `employeeId`). */ +const auditLog: AuditRecord[] = []; +/** Employee ids already audited (idempotency set). */ +const auditSeen = new Set(); + +/** The audit records appended so far (test/inspection helper). */ +export function auditRecords(): readonly AuditRecord[] { + return auditLog; +} + +/** Reset the audit reactor's state — used between tests. */ +export function resetAudit(): void { + auditLog.length = 0; + auditSeen.clear(); +} + +/** + * Audit route: append one immutable record per `EmployeeOnboarded`, ONCE per + * `employeeId`. The record mirrors the full documented event shape. + */ +export const auditReactorRoutes: EventRoute = (events) => { + events.service(OnboardingEventHandlers, { + async onEmployeeOnboarded(event, ctx) { + if (!auditSeen.has(event.employeeId)) { + auditSeen.add(event.employeeId); + auditLog.push({ + employeeId: event.employeeId, + name: event.name, + email: event.email, + department: event.department, + managerId: event.managerId, + }); + } + await ctx.ack(); + }, + }); +}; + +// ── Headcount reactor ─────────────────────────────────────────────────────── + +/** Employee ids already counted (idempotency set). */ +const headcountSeen = new Set(); +/** Active headcount per department (department → count). */ +const headcountByDept = new Map(); + +/** The current headcount for a department (test/inspection helper). */ +export function departmentHeadcount(department: string): number { + return headcountByDept.get(department) ?? 0; +} + +/** Reset the headcount reactor's state — used between tests. */ +export function resetHeadcount(): void { + headcountSeen.clear(); + headcountByDept.clear(); +} + +/** + * Headcount route: increment the new hire's department headcount ONCE per + * `employeeId` (a redelivery is a no-op). + */ +export const headcountReactorRoutes: EventRoute = (events) => { + events.service(OnboardingEventHandlers, { + async onEmployeeOnboarded(event, ctx) { + if (!headcountSeen.has(event.employeeId)) { + headcountSeen.add(event.employeeId); + headcountByDept.set(event.department, (headcountByDept.get(event.department) ?? 0) + 1); + } + await ctx.ack(); + }, + }); +}; + +// ── Shared test seam ──────────────────────────────────────────────────────── + +/** Reset all three reactors' state — used between tests. */ +export function resetAllReactors(): void { + resetWelcome(); + resetAudit(); + resetHeadcount(); +} diff --git a/hris/src/reactor.ts b/hris/src/reactor.ts new file mode 100644 index 0000000..3a3e07e --- /dev/null +++ b/hris/src/reactor.ts @@ -0,0 +1,62 @@ +/** + * Reactor entry — a long-lived `EmployeeOnboarded` broadcast subscriber. + * + * One image, role by env: `REACTOR=welcome|audit|headcount` selects WHICH + * reactor route this process mounts and WHICH consumer group it joins. Each + * reactor runs as its OWN process with its OWN EventBus + DISTINCT group, so on + * NATS each gets its own durable consumer → every process receives every + * `onboarding.employee-onboarded` event = fan-out (a shared group would + * load-balance / steal instead). + * + * This mirrors `index.ts` (role by `SERVICES`) and `worker.ts` (the saga host): + * the same binary, a different process boundary chosen by env. Co-hosting all + * three reactor buses in one process would change the process count but NOT the + * fan-out semantics (still three durable consumers). + * + * The bus is NATS-backed (`NATS_URL`); there is no inbound RPC and no HTTP + * server — a reactor only subscribes. SIGINT/SIGTERM stop the bus cleanly. + * + * @module reactor + */ + +import type { EventRoute } from "@connectum/events"; +import { buildReactorBus } from "#events/broadcastBus.ts"; +import type { ManagedBus, ReactorKey } from "#events/broadcastBus.ts"; +import { auditReactorRoutes, headcountReactorRoutes, welcomeReactorRoutes } from "#events/reactors.ts"; + +/** Map each reactor selector to its route. The group is fixed by the key in `buildReactorBus`. */ +const REACTOR_ROUTES: Readonly> = { + welcome: welcomeReactorRoutes, + audit: auditReactorRoutes, + headcount: headcountReactorRoutes, +}; + +/** Read + validate the `REACTOR` selector from env. */ +function selectReactor(): ReactorKey { + const key = process.env.REACTOR; + if (key === "welcome" || key === "audit" || key === "headcount") { + return key; + } + throw new Error(`REACTOR must be one of welcome|audit|headcount (got ${key === undefined ? "" : `"${key}"`})`); +} + +async function main(): Promise { + const key = selectReactor(); + const bus: ManagedBus = buildReactorBus({ key, route: REACTOR_ROUTES[key] }); + + await bus.start(); + console.log(`hris reactor ready — REACTOR=${key} topic=onboarding.employee-onboarded nats=${process.env.NATS_URL ?? "nats://localhost:4222"}`); + + const stop = async (): Promise => { + await bus.stop(); + console.log(`hris reactor stopped — REACTOR=${key}`); + process.exit(0); + }; + process.on("SIGINT", () => void stop()); + process.on("SIGTERM", () => void stop()); +} + +main().catch((err: unknown) => { + console.error("hris reactor error:", err); + process.exitCode = 1; +}); diff --git a/hris/src/temporal/activities.ts b/hris/src/temporal/activities.ts index 0463c11..c3ca4c1 100644 --- a/hris/src/temporal/activities.ts +++ b/hris/src/temporal/activities.ts @@ -29,8 +29,11 @@ import { Code, ConnectError } from "@connectrpc/connect"; import { ApplicationFailure } from "@temporalio/activity"; import { ProvisionAccessRequestSchema, RevokeAccessRequestSchema } from "#gen/access/v1/access_pb.ts"; import { ActivateEmployeeRequestSchema, CreateEmployeeRequestSchema, OffboardEmployeeRequestSchema } from "#gen/directory/v1/directory_pb.ts"; +import { EmployeeOnboardedSchema } from "#gen/onboarding/v1/onboarding_events_pb.ts"; import { SetupPayrollRequestSchema, TeardownPayrollRequestSchema } from "#gen/payroll/v1/payroll_pb.ts"; import { GrantTimeOffRequestSchema, RevokeTimeOffRequestSchema } from "#gen/timeoff/v1/timeoff_pb.ts"; +import type { ManagedBus } from "#events/broadcastBus.ts"; +import { buildPublisherBus } from "#events/broadcastBus.ts"; import type { ServiceClients } from "#temporal/clients.ts"; import { createServiceClients } from "#temporal/clients.ts"; @@ -62,6 +65,38 @@ function clients(): ServiceClients { return sharedClients; } +/** + * The publish-only EventBus used by {@link announceOnboarded} to broadcast + * `EmployeeOnboarded` (Phase 5b). The worker INJECTS a started bus via + * {@link setPublisherBus} and owns its lifecycle (`start()`/`stop()`); this + * activity only `publish()`es on it. Tests inject a `MemoryAdapter`-backed bus + * through the same seam. + */ +let publisherBus: ManagedBus | undefined; + +/** + * Inject the publish-only bus the broadcast activity publishes on (the worker's + * seam, also used by the dockerless broadcast test). Pass `undefined` to reset. + * + * @param bus - A started publish-only bus, or `undefined` to reset (tests). + */ +export function setPublisherBus(bus: ManagedBus | undefined): void { + publisherBus = bus; +} + +/** + * Get the injected publisher bus, or lazily build + start a default NATS one. + * The worker injects a bus before polling, so the lazy path is only a fallback. + */ +async function getPublisherBus(): Promise { + if (publisherBus === undefined) { + const bus = buildPublisherBus(); + await bus.start(); + publisherBus = bus; + } + return publisherBus; +} + /** Details of the new hire threaded through the forward steps. */ export interface NewHire { readonly employeeId: string; @@ -148,3 +183,27 @@ export async function revokeAccess(input: { employeeId: string }): Promise export async function activateEmployee(input: { employeeId: string }): Promise { await clients().directory.activateEmployee(create(ActivateEmployeeRequestSchema, { id: input.employeeId })); } + +/** + * Step 6 (Phase 5b) — broadcast `EmployeeOnboarded` ONCE on completion. This is + * a fire-and-forget 1→N broadcast (the welcome / audit / headcount reactors), + * NOT an orchestrated step: it pushes no compensation and a failure here must + * NEVER roll back a completed onboarding (the employee is already active). The + * topic `onboarding.employee-onboarded` is resolved from the proto option via + * the bus's `publishes` list — no raw topic is passed. + * + * @param hire - The {@link NewHire} details, carried into the event. + */ +export async function announceOnboarded(hire: NewHire): Promise { + const bus = await getPublisherBus(); + await bus.publish( + EmployeeOnboardedSchema, + create(EmployeeOnboardedSchema, { + employeeId: hire.employeeId, + name: hire.name, + email: hire.email, + department: hire.department, + managerId: hire.managerId, + }), + ); +} diff --git a/hris/src/temporal/workflows.ts b/hris/src/temporal/workflows.ts index f03648b..859fdba 100644 --- a/hris/src/temporal/workflows.ts +++ b/hris/src/temporal/workflows.ts @@ -112,8 +112,6 @@ export async function OnboardingWorkflow(input: OnboardingWorkflowInput): Promis // compensation. await acts.activateEmployee({ employeeId }); status = OnboardingStatus.COMPLETED; - - return status; } catch (err) { // Unwind in LIFO order; each compensation is isolated so the unwind // never throws. Temporal already retried each forward+comp activity. @@ -130,4 +128,17 @@ export async function OnboardingWorkflow(input: OnboardingWorkflowInput): Promis if (err instanceof ApplicationFailure) throw err; throw ApplicationFailure.create({ message: String(err), type: "OnboardingWorkflowFailed" }); } + + // Phase 5b — broadcast `EmployeeOnboarded` ONCE on completion. This runs + // OUTSIDE the compensation scope (the catch above always rethrows, so we are + // here only on success): a fire-and-forget 1→N broadcast whose failure must + // NEVER roll back a now-active employee. It is logged and swallowed, never + // rethrown, so the workflow stays COMPLETED even if the broadcast is lost. + try { + await acts.announceOnboarded({ employeeId, name, email, title, department, managerId }); + } catch (err) { + log.warn("EmployeeOnboarded broadcast failed (onboarding already complete; not rolling back)", { employeeId, error: String(err) }); + } + + return status; } diff --git a/hris/src/worker.ts b/hris/src/worker.ts index 1d7e3b1..9130fa7 100644 --- a/hris/src/worker.ts +++ b/hris/src/worker.ts @@ -18,11 +18,19 @@ import { fileURLToPath } from "node:url"; import { NativeConnection, Worker } from "@temporalio/worker"; +import { buildPublisherBus } from "#events/broadcastBus.ts"; import * as activities from "#temporal/activities.ts"; import { TEMPORAL_ADDRESS, TEMPORAL_NAMESPACE, TEMPORAL_TASK_QUEUE } from "#temporal/config.ts"; async function main(): Promise { const connection = await NativeConnection.connect({ address: TEMPORAL_ADDRESS }); + // Build + start the publish-only EventBus (Phase 5b broadcast) and inject it + // into the activities module BEFORE the worker polls, so the terminal + // `announceOnboarded` activity publishes on a ready bus. The worker owns its + // lifecycle: stopped in the `finally` below. + const publisherBus = buildPublisherBus(); + await publisherBus.start(); + activities.setPublisherBus(publisherBus); try { const worker = await Worker.create({ connection, @@ -36,6 +44,7 @@ async function main(): Promise { console.log(`hris temporal worker ready — taskQueue=${TEMPORAL_TASK_QUEUE} namespace=${TEMPORAL_NAMESPACE} temporal=${TEMPORAL_ADDRESS}`); await worker.run(); } finally { + await publisherBus.stop(); await connection.close(); } } diff --git a/hris/tests/e2e/broadcast.test.ts b/hris/tests/e2e/broadcast.test.ts new file mode 100644 index 0000000..4439116 --- /dev/null +++ b/hris/tests/e2e/broadcast.test.ts @@ -0,0 +1,126 @@ +/** + * Phase 5b EventBus broadcast / fan-out tests — DOCKERLESS. + * + * Proves the multi-subscriber face of the EventBus: ONE `EmployeeOnboarded` + * published on the saga's terminal step fans out to THREE INDEPENDENT reactors. + * No broker — one shared `MemoryAdapter()` feeds the publisher bus AND all three + * reactor buses, so a single publish reaches all three in-process (MemoryAdapter + * broadcasts to every matching subscription and ignores group; the distinct + * groups are still written so the SAME wiring fans out on NATS). + * + * Tests: + * + * 1. PRIMARY — drives the actual publish SITE: injects the publisher bus into + * the activities module (the same `setPublisherBus` seam the worker uses), + * runs the REAL `announceOnboarded` activity body via `MockActivityEnvironment`, + * and asserts ALL THREE reactors fired with the FULL `EmployeeOnboarded` + * shape (the audit record equals the documented five-field contract, not + * just "an event arrived"). + * 2. TOPIC — `resolveTopicName(OnboardingEventHandlers.method.onEmployeeOnboarded)` + * is exactly `"onboarding.employee-onboarded"`, pinning the topic to the + * proto option (not the `typeName` fallback). No raw `{topic}` is passed. + * 3. IDEMPOTENCY — a redelivery of the same `employeeId` does NOT double-apply. + * + * @module tests/e2e/broadcast + */ + +import assert from "node:assert/strict"; +import { after, before, beforeEach, describe, it } from "node:test"; +import { MemoryAdapter, resolveTopicName } from "@connectum/events"; +import type { EventAdapter } from "@connectum/events"; +import { MockActivityEnvironment } from "@temporalio/testing"; +import { OnboardingEventHandlers } from "#gen/onboarding/v1/onboarding_events_pb.ts"; +import { buildPublisherBus, buildReactorBus } from "#events/broadcastBus.ts"; +import type { ManagedBus } from "#events/broadcastBus.ts"; +import { auditRecords, departmentHeadcount, resetAllReactors, welcomeEmails, welcomeReactorRoutes, auditReactorRoutes, headcountReactorRoutes } from "#events/reactors.ts"; +import * as activities from "#temporal/activities.ts"; +import type { NewHire } from "#temporal/activities.ts"; + +const env = new MockActivityEnvironment(); + +/** Run a real activity body inside a mocked Temporal Activity Context. */ +function run(fn: (...args: A) => Promise, ...args: A): Promise { + return env.run(fn, ...args); +} + +/** The new hire carried into the broadcast event. */ +const HIRE: NewHire = { + employeeId: "e-100", + name: "New Hire", + email: "newhire@example.com", + title: "Software Engineer", + department: "Engineering", + managerId: "e-002", +}; + +describe("Phase 5b broadcast: one EmployeeOnboarded fans out to three independent reactors (dockerless, MemoryAdapter)", () => { + let adapter: EventAdapter; + let publisher: ManagedBus; + let welcome: ManagedBus; + let audit: ManagedBus; + let headcount: ManagedBus; + + before(async () => { + // ONE shared in-memory adapter feeds all four buses: a publish on the + // publisher bus reaches every reactor's subscription on the SAME adapter. + // Groups are ignored in-memory but written so the wiring fans out on NATS. + adapter = MemoryAdapter(); + publisher = buildPublisherBus({ adapter }); + welcome = buildReactorBus({ key: "welcome", route: welcomeReactorRoutes, adapter }); + audit = buildReactorBus({ key: "audit", route: auditReactorRoutes, adapter }); + headcount = buildReactorBus({ key: "headcount", route: headcountReactorRoutes, adapter }); + await Promise.all([publisher.start(), welcome.start(), audit.start(), headcount.start()]); + // Inject the publisher bus into the activities module — the SAME seam the + // worker uses — so the REAL activity publishes on it. + activities.setPublisherBus(publisher); + }); + + beforeEach(() => { + resetAllReactors(); + }); + + after(async () => { + // Stop ONLY after every assertion (the FIRST stop calls the shared + // adapter's disconnect(), which wipes all subscriptions). + activities.setPublisherBus(undefined); + await Promise.all([publisher.stop(), welcome.stop(), audit.stop(), headcount.stop()]); + }); + + it("PRIMARY: the terminal announceOnboarded activity broadcasts ONCE, all three reactors react with the FULL message shape", async () => { + // Drive the actual publish SITE: the real activity body, not a hand-built + // publish. + await run(activities.announceOnboarded, HIRE); + + // WELCOME reacted — a welcome was "sent" to the hire's email. + assert.deepEqual(welcomeEmails(), ["newhire@example.com"]); + + // HEADCOUNT reacted — the hire's department was incremented once. + assert.equal(departmentHeadcount("Engineering"), 1); + + // AUDIT reacted — one record with the FULL documented five-field shape. + const records = auditRecords(); + assert.equal(records.length, 1); + assert.deepEqual(records[0], { + employeeId: "e-100", + name: "New Hire", + email: "newhire@example.com", + department: "Engineering", + managerId: "e-002", + }); + }); + + it("TOPIC: the topic is pinned to the proto option, not the message typeName", () => { + assert.equal(resolveTopicName(OnboardingEventHandlers.method.onEmployeeOnboarded), "onboarding.employee-onboarded"); + }); + + it("IDEMPOTENCY: a redelivery of the same employeeId does not double-apply in any reactor", async () => { + await run(activities.announceOnboarded, HIRE); + // Publish the SAME employee again (at-least-once redelivery on a broker). + await run(activities.announceOnboarded, HIRE); + + // Every reactor deduped by employeeId — counted/recorded/sent exactly once. + assert.equal(welcomeEmails().length, 1); + assert.equal(departmentHeadcount("Engineering"), 1); + assert.equal(auditRecords().length, 1); + }); +}); diff --git a/hris/tests/workflow/onboardingWorkflow.test.ts b/hris/tests/workflow/onboardingWorkflow.test.ts index 96b184a..dccd4fe 100644 --- a/hris/tests/workflow/onboardingWorkflow.test.ts +++ b/hris/tests/workflow/onboardingWorkflow.test.ts @@ -68,6 +68,7 @@ function makeMockActivities(calls: string[], failing?: { step: string }): Record provisionAccess: async () => record("provisionAccess"), revokeAccess: async () => record("revokeAccess"), activateEmployee: async () => record("activateEmployee"), + announceOnboarded: async () => record("announceOnboarded"), }; } @@ -93,12 +94,23 @@ describe("OnboardingWorkflow: orchestration + compensation (time-skipping, mocke return worker.runUntil(testEnv.client.workflow.execute(OnboardingWorkflow, { args: [INPUT], taskQueue: TASK_QUEUE, workflowId })); } - it("success: runs the forward steps in order and COMPLETES", async () => { + it("success: runs the forward steps in order, COMPLETES, and broadcasts EmployeeOnboarded last", async () => { const calls: string[] = []; const result = await runWorkflow(makeMockActivities(calls), "wf-success"); assert.equal(result, "COMPLETED"); - assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "activateEmployee"]); + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "activateEmployee", "announceOnboarded"]); + }); + + it("a failed broadcast does NOT roll back or fail a completed onboarding (fire-and-forget)", async () => { + const calls: string[] = []; + // announceOnboarded throws, but it runs OUTSIDE the saga's compensation + // scope and the workflow swallows it — the run still resolves COMPLETED, + // no compensation fires, and the employee stays active. + const result = await runWorkflow(makeMockActivities(calls, { step: "announceOnboarded" }), "wf-broadcast-fail"); + + assert.equal(result, "COMPLETED"); + assert.deepEqual(calls, ["createEmployee", "setupPayroll", "grantTimeOff", "provisionAccess", "activateEmployee", "announceOnboarded"]); }); it("activateEmployee fails: compensations run in REVERSE order (revokeAccess → revokeTimeOff → teardownPayroll → offboardEmployee)", async () => {