Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions hris/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ the right tool for a different job:
| Mechanism | Used for | In this example |
|---|---|---|
| **`ctx.call`** (synchronous) | a reply you need now | `RequestLeave` validates the employee against the directory |
| **EventBus** (fire-and-forget) | broadcast a fact | `LeaveApproved` → payroll decrements the balance |
| **EventBus** (fire-and-forget) | announce a fact to 1 or N consumers | `LeaveApproved` → payroll (single subscriber); `EmployeeOnboarded` → welcome + audit + headcount ([fan-out](#broadcast--fan-out-phase-5b)) |
| **Temporal saga** (durable) | a long multi-step transaction with rollback | onboarding a new hire across four services |

Five services, one integration event, and a durable saga:
Expand Down Expand Up @@ -148,6 +148,48 @@ grpcurl -plaintext -d '{"employee_id":"e-100","name":"New Hire","email":"newhire
open http://localhost:8088
```

## Broadcast / fan-out (Phase 5b)

The example uses the EventBus for **both** of its shapes. `LeaveApproved` is a
**single** subscriber (payroll). `EmployeeOnboarded` is the **1→N broadcast**:
when the onboarding saga COMPLETES, the worker's terminal `announceOnboarded`
activity publishes the fact ONCE, and **three independent reactors** consume it —
**welcome** ("sends" a welcome email), **audit** (appends an onboarding record),
and **headcount** (tallies per-department headcount). EventBus is used here for
broadcast only; durability stays the saga's job — *a lost broadcast must never
roll back a completed onboarding*, so the publish runs outside the compensation
scope and a failure is logged, not retried into a rollback.

The fan-out is **four buses, not one bus with three handlers**:

- the **publisher** bus is publish-only (`routes: []`, `publishes:
[OnboardingEventHandlers]`). Listing the event service in `publishes` resolves
the topic `onboarding.employee-onboarded` from the proto
`(connectum.events.v1.event).topic` option, so the publisher passes **no raw
topic** — without it, a route-less publisher would fall back to the message
`typeName`.
- each **reactor** bus has one route under its **own distinct consumer group**.
Two routes on the same topic cannot share a bus (the duplicate-topic guard
throws at `start()`), so the three reactors are forced onto three buses. On a
broker, distinct groups give distinct durable consumers → each gets every event
(fan-out); a shared group would load-balance (one reactor steals each event).

Each reactor runs as its own process (`node src/reactor.ts`, role by `REACTOR`),
the same one-image-many-roles pattern as the RPC roles and the worker. The
reactors are **idempotent** (dedupe by `employeeId`) because a broker broadcast
is at-least-once.

```bash
# The reactors come up with the `saga` profile (welcome / audit / headcount):
docker compose --profile split --profile saga up
```

::: warning Requires @connectum/events ≥ 1.1.0
The publish-only `publishes` option and the broadcast wiring ship in **1.1.0**.
The `^1.0.0` range in `package.json` resolves 1.1.0 once it is published; until
then, this Phase 5b broadcast is verified against pre-release snapshots.
:::

## Layout

```
Expand All @@ -157,6 +199,7 @@ proto/
payroll/v1/payroll.proto # GetBalance + LeaveApproved + SetupPayroll/TeardownPayroll (saga)
access/v1/access.proto # AccessService.ProvisionAccess/RevokeAccess (saga leaf)
onboarding/v1/onboarding.proto # OnboardingService.OnboardEmployee/GetOnboarding (saga gateway)
onboarding/v1/onboarding_events.proto # EmployeeOnboarded + OnboardingEventHandlers (broadcast)
connectum/events/v1/options.proto # (connectum.events.v1.event).topic option
buf.gen.yaml # protoc-gen-es + protoc-gen-connectum-catalog (strategy: all)
src/
Expand All @@ -174,7 +217,10 @@ src/
temporal/clients.ts # ConnectRPC clients the activities drive (*_ADDR)
temporal/activities.ts # saga side effects (each one RPC) + compensations
temporal/workflows.ts # OnboardingWorkflow — the durable saga (deterministic sandbox)
worker.ts # @temporalio/worker host (the ONLY native-addon process)
worker.ts # @temporalio/worker host (publishes EmployeeOnboarded); native-addon process
reactor.ts # EmployeeOnboarded broadcast subscriber (role by REACTOR)
events/broadcastBus.ts # publish-only bus (publishes) + per-reactor buses (distinct groups)
events/reactors.ts # welcome / audit / headcount reactors (idempotent)
topology.ts # env → enabledServices + remoteResolver
events.ts # LEAVE_APPROVED_TOPIC constant (publisher/subscriber match)
eventBus.ts # one bus per process; payroll subscribes only when local
Expand All @@ -190,7 +236,8 @@ tests/
e2e/onboarding.test.ts # onboarding edge — pre-check + start (stub Temporal)
activity/activities.test.ts # real activities ↔ RPC wiring + compensation idempotency
workflow/onboardingWorkflow.test.ts# saga orchestration + LIFO compensation (time-skipping)
docker-compose.yml # mono + split + saga profiles (NATS + Postgres + Temporal)
e2e/broadcast.test.ts # EmployeeOnboarded 1→N fan-out to three reactors (MemoryAdapter)
docker-compose.yml # mono + split + saga profiles (NATS + Postgres + Temporal + reactors)
Dockerfile # one image, role chosen by SERVICES env (worker = node src/worker.ts)
```

Expand Down
50 changes: 50 additions & 0 deletions hris/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,12 @@ services:
- PAYROLL_ADDR=http://payroll:5003
- TIMEOFF_ADDR=http://timeoff:5002
- ACCESS_ADDR=http://access:5004
# The terminal announceOnboarded activity publishes EmployeeOnboarded on
# the publish-only EventBus (Phase 5b broadcast).
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
temporal:
condition: service_healthy
directory:
Expand All @@ -319,6 +324,51 @@ services:
- hris
profiles: ["saga"]

# ── REACTORS — three independent EmployeeOnboarded broadcast subscribers ───
#
# Each runs the SAME image with `node src/reactor.ts`, selecting its route +
# consumer group by REACTOR. Distinct groups → distinct NATS durable consumers
# → every reactor receives every event = fan-out (a shared group would
# load-balance / steal). No inbound RPC; a reactor only subscribes.
reactor-welcome:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=welcome
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- hris
profiles: ["saga"]

reactor-audit:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=audit
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- hris
profiles: ["saga"]

reactor-headcount:
build: .
command: ["node", "src/reactor.ts"]
environment:
- REACTOR=headcount
- NATS_URL=nats://nats:4222
depends_on:
nats:
condition: service_healthy
networks:
- hris
profiles: ["saga"]

networks:
hris:
driver: bridge
Expand Down
1 change: 1 addition & 0 deletions hris/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"start": "node src/index.ts",
"dev": "node --watch src/index.ts",
"worker": "node src/worker.ts",
"reactor": "node src/reactor.ts",
"typecheck": "tsc --noEmit",
"buf:generate": "buf generate",
"buf:lint": "buf lint",
Expand Down
45 changes: 45 additions & 0 deletions hris/proto/onboarding/v1/onboarding_events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
syntax = "proto3";

package onboarding.v1;

import "connectum/events/v1/options.proto";
import "google/protobuf/empty.proto";

// EmployeeOnboarded is the integration event broadcast EXACTLY ONCE when the
// onboarding saga COMPLETES (the new hire is active). It is consumed
// INDEPENDENTLY by three reactors — welcome, audit-log, and headcount — each on
// its own EventBus with its own consumer group, so the single domain fact fans
// out 1→N rather than being orchestrated.
//
// EventBus is used here for BROADCAST only; durability/ordering stays the saga's
// (Temporal's) job. A lost broadcast must NEVER roll back a completed onboarding.
message EmployeeOnboarded {
// The onboarded employee's id (also the reactor idempotency key).
string employee_id = 1;
string name = 2;
// Work email — the welcome reactor's notification target.
string email = 3;
// Department — the headcount reactor tallies per department.
string department = 4;
// The new hire's manager id (empty for the top of the org chart).
string manager_id = 5;
}

// OnboardingEventHandlers is an event-handler service: each RPC is a
// subscription. The (connectum.events.v1.event).topic option pins the topic so
// the PUBLISHER (which lists this service in `publishes`) and EACH subscriber
// (which registers this service via `routes` on its OWN bus) agree on the topic
// from the proto option alone — no raw topic string is ever hand-passed to
// publish().
//
// One shared service is reused by the publisher and all three reactors;
// independence between reactors comes from SEPARATE buses + DISTINCT consumer
// groups, not from separate proto services (registering the same service on
// three separate buses is fine — the duplicate-topic guard is per-bus).
service OnboardingEventHandlers {
// OnEmployeeOnboarded reacts to a completed onboarding. Each reactor binds its
// OWN handler function to this RPC on its OWN bus.
rpc OnEmployeeOnboarded(EmployeeOnboarded) returns (google.protobuf.Empty) {
option (connectum.events.v1.event).topic = "onboarding.employee-onboarded";
}
}
108 changes: 108 additions & 0 deletions hris/src/events/broadcastBus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* EventBus factories for the Phase 5b broadcast — ONE publisher, THREE reactors.
*
* Phase 5b adds the multi-subscriber face of the EventBus to the example. The
* existing `LeaveApproved` flow is a SINGLE subscriber (payroll); this is the
* fire-and-forget 1→N BROADCAST: the single domain fact "an employee was
* onboarded" is published ONCE as `EmployeeOnboarded` and consumed INDEPENDENTLY
* by three reactors. EventBus is used for BROADCAST only — never orchestration
* (that is the saga's job).
*
* The fan-out is achieved with FOUR separate buses, not one bus with three
* handlers:
*
* - the PUBLISHER bus is publish-only (`routes: []`, `publishes:
* [OnboardingEventHandlers]`). Listing the event service in `publishes`
* populates the publish-topic lookup from the proto
* `(connectum.events.v1.event).topic` option, so `publish(EmployeeOnboardedSchema,
* …)` resolves `onboarding.employee-onboarded` with NO raw `{topic}`
* hand-passed. This is the whole point of `publishes`: a pure publisher has no
* subscriber routes, so without it the topic would silently fall back to the
* message `typeName`.
* - each REACTOR bus has ONE route and its OWN DISTINCT consumer group. Two
* routes resolving to the same topic CANNOT share one bus (the duplicate-topic
* guard throws at `start()`), so three reactors on
* `onboarding.employee-onboarded` are FORCED onto three buses. On a real
* broker, distinct groups give distinct durable consumers → each gets every
* event = fan-out. A shared group would load-balance (one reactor steals each
* event) = queue.
*
* The adapter is pluggable: NATS by default (`NATS_URL`), but tests pass ONE
* shared `MemoryAdapter()` to all four buses so a single publish reaches all
* three reactors in-process with no broker (MemoryAdapter broadcasts to every
* matching subscription and ignores group; the distinct groups are still written
* so the SAME wiring fans out on NATS).
*
* @module events/broadcastBus
*/

import { createEventBus } from "@connectum/events";
import type { EventAdapter, EventBus, EventRoute } from "@connectum/events";
import type { EventBusLike } from "@connectum/core";
import { NatsAdapter } from "@connectum/events-nats";
import { OnboardingEventHandlers } from "#gen/onboarding/v1/onboarding_events_pb.ts";

/** A reactor's stable identity: which side effect + which consumer group. */
export const REACTOR_GROUP = {
/** Welcome reactor — "sends" a welcome email to the new hire. */
welcome: "hris-welcome",
/** Audit-log reactor — appends one immutable record per onboarded employee. */
audit: "hris-audit",
/** Headcount reactor — tallies active headcount per department. */
headcount: "hris-headcount",
} as const;

/** One of the reactor selector keys (`welcome` | `audit` | `headcount`). */
export type ReactorKey = keyof typeof REACTOR_GROUP;

/** A bus that can both `publish` (the `EventBus` API) and start/stop (`EventBusLike`). */
export type ManagedBus = EventBus & EventBusLike;

/** Default NATS JetStream stream for the example's broadcast topic. */
const NATS_STREAM = "hris";

/**
* Build the NATS adapter for a process, reading `NATS_URL` (default
* `nats://localhost:4222`). One adapter instance per bus in the split topology,
* so each reactor owns an independent broker connection + durable consumer.
*/
function natsAdapter(): EventAdapter {
return NatsAdapter({ servers: process.env.NATS_URL ?? "nats://localhost:4222", stream: NATS_STREAM });
}

/**
* Build the PUBLISH-ONLY bus that the worker (the saga's terminal activity) uses
* to broadcast `EmployeeOnboarded`.
*
* It has NO routes and lists `OnboardingEventHandlers` in `publishes`, so the
* topic `onboarding.employee-onboarded` is resolved from the proto option — the
* publisher passes NO raw topic.
*
* @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`);
* defaults to a NATS adapter from `NATS_URL`.
*/
export function buildPublisherBus(options: { readonly adapter?: EventAdapter } = {}): ManagedBus {
return createEventBus({
adapter: options.adapter ?? natsAdapter(),
routes: [],
publishes: [OnboardingEventHandlers],
});
}

/**
* Build ONE reactor bus: a single route subscribed to
* `onboarding.employee-onboarded` (topic from the route's proto option) under
* its OWN distinct consumer group.
*
* @param options.key - The reactor selector, fixing its consumer group.
* @param options.route - The reactor's event route (its `OnEmployeeOnboarded` handler).
* @param options.adapter - Adapter override (tests pass a shared `MemoryAdapter()`);
* defaults to a NATS adapter from `NATS_URL`.
*/
export function buildReactorBus(options: { readonly key: ReactorKey; readonly route: EventRoute; readonly adapter?: EventAdapter }): ManagedBus {
return createEventBus({
adapter: options.adapter ?? natsAdapter(),
routes: [options.route],
group: REACTOR_GROUP[options.key],
});
}
Loading