diff --git a/.changeset/fair-owls-listen.md b/.changeset/fair-owls-listen.md new file mode 100644 index 00000000000..775dcb0d8e1 --- /dev/null +++ b/.changeset/fair-owls-listen.md @@ -0,0 +1,5 @@ +--- +"@effect/workflow": patch +--- + +Fix awaiting child workflows from inside activities so parallel activity fan-out can start all children before waiting for completion. diff --git a/packages/cluster/test/ClusterWorkflowEngine.test.ts b/packages/cluster/test/ClusterWorkflowEngine.test.ts index a5f369ff062..1913b7b215e 100644 --- a/packages/cluster/test/ClusterWorkflowEngine.test.ts +++ b/packages/cluster/test/ClusterWorkflowEngine.test.ts @@ -225,6 +225,27 @@ describe.concurrent("ClusterWorkflowEngine", () => { assert.isTrue(flags.get("child-end")) }).pipe(Effect.provide(TestWorkflowLayer))) + it.effect("parallel child workflows inside an activity", () => + Effect.gen(function*() { + const sharding = yield* Sharding.Sharding + + const fiber = yield* ParallelParentWorkflow.execute({ + id: "parallel-parent-1", + childCount: 3, + concurrency: 3 + }).pipe(Effect.fork) + + yield* TestClock.adjust(Duration.seconds(2)) + yield* sharding.pollStorage + yield* TestClock.adjust(Duration.seconds(5)) + + const poll = yield* Fiber.poll(fiber) + assert.strictEqual(poll._tag, "Some") + if (poll._tag === "Some") { + assert.deepStrictEqual(poll.value, Exit.succeed(["done-0", "done-1", "done-2"])) + } + }).pipe(Effect.provide(TestWorkflowLayer))) + it.effect("routes durable clock wakeups to the workflow shard group", () => Effect.gen(function*() { const driver = yield* MessageStorage.MemoryDriver @@ -596,6 +617,54 @@ const ChildWorkflowLayer = ChildWorkflow.toLayer(Effect.fnUntraced(function*() { flags.set("child-end", true) })) +const ParallelParentWorkflow = Workflow.make({ + name: "ParallelParentWorkflow", + payload: { + id: Schema.String, + childCount: Schema.Number, + concurrency: Schema.Number + }, + success: Schema.Array(Schema.String), + idempotencyKey(payload) { + return payload.id + } +}) + +const ParallelChildWorkflow = Workflow.make({ + name: "ParallelChildWorkflow", + payload: { + id: Schema.String, + index: Schema.Number + }, + success: Schema.String, + idempotencyKey(payload) { + return payload.id + } +}) + +const ParallelParentWorkflowLayer = ParallelParentWorkflow.toLayer(Effect.fnUntraced(function*(payload) { + const indices = Array.from({ length: payload.childCount }, (_, i) => i) + return yield* Activity.make({ + name: `parallel-parent-${payload.id}`, + success: Schema.Array(Schema.String), + error: Schema.Never, + execute: Effect.forEach( + indices, + (index) => ParallelChildWorkflow.execute({ id: `${payload.id}-child-${index}`, index }), + { concurrency: payload.concurrency } + ) + }) +})) + +const ParallelChildWorkflowLayer = ParallelChildWorkflow.toLayer(Effect.fnUntraced(function*(payload) { + yield* DurableClock.sleep({ + name: `parallel-child-${payload.index}`, + duration: "2 seconds", + inMemoryThreshold: Duration.zero + }) + return `done-${payload.index}` +})) + const ShardedClockWorkflow = Workflow.make({ name: "ShardedClockWorkflow", payload: { @@ -712,6 +781,8 @@ const TestWorkflowLayer = EmailWorkflowLayer.pipe( Layer.merge(DurableRaceWorkflowLayer), Layer.merge(ParentWorkflowLayer), Layer.merge(ChildWorkflowLayer), + Layer.merge(ParallelParentWorkflowLayer), + Layer.merge(ParallelChildWorkflowLayer), Layer.merge(ShardedClockWorkflowLayer), Layer.merge(DiscardParentWorkflowLayer), Layer.merge(DiscardChildWorkflowLayer), diff --git a/packages/workflow/src/Activity.ts b/packages/workflow/src/Activity.ts index 60146d4d7d0..36669c68943 100644 --- a/packages/workflow/src/Activity.ts +++ b/packages/workflow/src/Activity.ts @@ -13,6 +13,7 @@ import * as Schema from "effect/Schema" import type { Scope } from "effect/Scope" import type * as Types from "effect/Types" import * as DurableDeferred from "./DurableDeferred.js" +import { CurrentActivityExecution } from "./internal/activity.js" import { makeHashDigest } from "./internal/crypto.js" import * as Workflow from "./Workflow.js" import type { WorkflowEngine, WorkflowInstance } from "./WorkflowEngine.js" @@ -246,7 +247,7 @@ const makeExecute = Effect.fnUntraced(function*< const attempt = yield* CurrentAttempt yield* Effect.annotateCurrentSpan({ executionId: instance.executionId }) const result = yield* Workflow.wrapActivityResult( - engine.activityExecute(activity, attempt), + Effect.provideService(engine.activityExecute(activity, attempt), CurrentActivityExecution, true), (_) => _._tag === "Suspended" ) if (result._tag === "Suspended") { diff --git a/packages/workflow/src/WorkflowEngine.ts b/packages/workflow/src/WorkflowEngine.ts index bf84d944b4c..79ea69dff9f 100644 --- a/packages/workflow/src/WorkflowEngine.ts +++ b/packages/workflow/src/WorkflowEngine.ts @@ -15,6 +15,7 @@ import * as Scope from "effect/Scope" import type * as Activity from "./Activity.js" import type { DurableClock } from "./DurableClock.js" import type * as DurableDeferred from "./DurableDeferred.js" +import { CurrentActivityExecution } from "./internal/activity.js" import * as Workflow from "./Workflow.js" /** @@ -349,6 +350,7 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => const suspendedRetrySchedule = opts.suspendedRetrySchedule ?? defaultRetrySchedule yield* Effect.annotateCurrentSpan({ executionId }) let result: Workflow.Result | undefined + const currentActivityExecution = yield* CurrentActivityExecution // link interruption with parent workflow const parentInstance = yield* Effect.serviceOption(WorkflowInstance) @@ -379,6 +381,19 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => parent: Option.getOrUndefined(parentInstance) }) if (Option.isSome(parentInstance)) { + if (currentActivityExecution) { + let sleep: Effect.Effect | undefined + while (true) { + result = yield* run + if (result._tag === "Complete") { + return yield* result.exit + } + sleep ??= (yield* Schedule.driver(suspendedRetrySchedule)).next(void 0).pipe( + Effect.catchAll(() => Effect.dieMessage(`${self.name}.execute: suspendedRetrySchedule exhausted`)) + ) + yield* sleep + } + } result = yield* Workflow.wrapActivityResult( run, (result) => result._tag === "Suspended" diff --git a/packages/workflow/src/internal/activity.ts b/packages/workflow/src/internal/activity.ts new file mode 100644 index 00000000000..791203d19db --- /dev/null +++ b/packages/workflow/src/internal/activity.ts @@ -0,0 +1,7 @@ +import * as Context from "effect/Context" + +export class CurrentActivityExecution + extends Context.Reference()("@effect/workflow/internal/CurrentActivityExecution", { + defaultValue: () => false + }) +{}