Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fair-owls-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/workflow": patch
---

Fix awaiting child workflows from inside activities so parallel activity fan-out can start all children before waiting for completion.
71 changes: 71 additions & 0 deletions packages/cluster/test/ClusterWorkflowEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ describe.concurrent("ClusterWorkflowEngine", () => {
assert.isTrue(flags.get("child-end"))
}).pipe(Effect.provide(TestWorkflowLayer)))

it.effect("parallel child workflows inside an activity", () =>
Effect.gen(function*() {
const sharding = yield* Sharding.Sharding

const fiber = yield* ParallelParentWorkflow.execute({
id: "parallel-parent-1",
childCount: 3,
concurrency: 3
}).pipe(Effect.fork)

yield* TestClock.adjust(Duration.seconds(2))
yield* sharding.pollStorage
yield* TestClock.adjust(Duration.seconds(5))

const poll = yield* Fiber.poll(fiber)
assert.strictEqual(poll._tag, "Some")
if (poll._tag === "Some") {
assert.deepStrictEqual(poll.value, Exit.succeed(["done-0", "done-1", "done-2"]))
}
}).pipe(Effect.provide(TestWorkflowLayer)))

it.effect("routes durable clock wakeups to the workflow shard group", () =>
Effect.gen(function*() {
const driver = yield* MessageStorage.MemoryDriver
Expand Down Expand Up @@ -596,6 +617,54 @@ const ChildWorkflowLayer = ChildWorkflow.toLayer(Effect.fnUntraced(function*() {
flags.set("child-end", true)
}))

const ParallelParentWorkflow = Workflow.make({
name: "ParallelParentWorkflow",
payload: {
id: Schema.String,
childCount: Schema.Number,
concurrency: Schema.Number
},
success: Schema.Array(Schema.String),
idempotencyKey(payload) {
return payload.id
}
})

const ParallelChildWorkflow = Workflow.make({
name: "ParallelChildWorkflow",
payload: {
id: Schema.String,
index: Schema.Number
},
success: Schema.String,
idempotencyKey(payload) {
return payload.id
}
})

const ParallelParentWorkflowLayer = ParallelParentWorkflow.toLayer(Effect.fnUntraced(function*(payload) {
const indices = Array.from({ length: payload.childCount }, (_, i) => i)
return yield* Activity.make({
name: `parallel-parent-${payload.id}`,
success: Schema.Array(Schema.String),
error: Schema.Never,
execute: Effect.forEach(
indices,
(index) => ParallelChildWorkflow.execute({ id: `${payload.id}-child-${index}`, index }),
{ concurrency: payload.concurrency }
)
})
}))

const ParallelChildWorkflowLayer = ParallelChildWorkflow.toLayer(Effect.fnUntraced(function*(payload) {
yield* DurableClock.sleep({
name: `parallel-child-${payload.index}`,
duration: "2 seconds",
inMemoryThreshold: Duration.zero
})
return `done-${payload.index}`
}))

const ShardedClockWorkflow = Workflow.make({
name: "ShardedClockWorkflow",
payload: {
Expand Down Expand Up @@ -712,6 +781,8 @@ const TestWorkflowLayer = EmailWorkflowLayer.pipe(
Layer.merge(DurableRaceWorkflowLayer),
Layer.merge(ParentWorkflowLayer),
Layer.merge(ChildWorkflowLayer),
Layer.merge(ParallelParentWorkflowLayer),
Layer.merge(ParallelChildWorkflowLayer),
Layer.merge(ShardedClockWorkflowLayer),
Layer.merge(DiscardParentWorkflowLayer),
Layer.merge(DiscardChildWorkflowLayer),
Expand Down
3 changes: 2 additions & 1 deletion packages/workflow/src/Activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as Schema from "effect/Schema"
import type { Scope } from "effect/Scope"
import type * as Types from "effect/Types"
import * as DurableDeferred from "./DurableDeferred.js"
import { CurrentActivityExecution } from "./internal/activity.js"
import { makeHashDigest } from "./internal/crypto.js"
import * as Workflow from "./Workflow.js"
import type { WorkflowEngine, WorkflowInstance } from "./WorkflowEngine.js"
Expand Down Expand Up @@ -246,7 +247,7 @@ const makeExecute = Effect.fnUntraced(function*<
const attempt = yield* CurrentAttempt
yield* Effect.annotateCurrentSpan({ executionId: instance.executionId })
const result = yield* Workflow.wrapActivityResult(
engine.activityExecute(activity, attempt),
Effect.provideService(engine.activityExecute(activity, attempt), CurrentActivityExecution, true),
(_) => _._tag === "Suspended"
)
if (result._tag === "Suspended") {
Expand Down
15 changes: 15 additions & 0 deletions packages/workflow/src/WorkflowEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import * as Scope from "effect/Scope"
import type * as Activity from "./Activity.js"
import type { DurableClock } from "./DurableClock.js"
import type * as DurableDeferred from "./DurableDeferred.js"
import { CurrentActivityExecution } from "./internal/activity.js"
import * as Workflow from "./Workflow.js"

/**
Expand Down Expand Up @@ -349,6 +350,7 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] =>
const suspendedRetrySchedule = opts.suspendedRetrySchedule ?? defaultRetrySchedule
yield* Effect.annotateCurrentSpan({ executionId })
let result: Workflow.Result<Success["Type"], Error["Type"]> | undefined
const currentActivityExecution = yield* CurrentActivityExecution

// link interruption with parent workflow
const parentInstance = yield* Effect.serviceOption(WorkflowInstance)
Expand Down Expand Up @@ -379,6 +381,19 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] =>
parent: Option.getOrUndefined(parentInstance)
})
if (Option.isSome(parentInstance)) {
if (currentActivityExecution) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would keep the activity in-memory even if the workflow suspends for long time. Will need to find another way.

let sleep: Effect.Effect<any> | undefined
while (true) {
result = yield* run
if (result._tag === "Complete") {
return yield* result.exit
}
sleep ??= (yield* Schedule.driver(suspendedRetrySchedule)).next(void 0).pipe(
Effect.catchAll(() => Effect.dieMessage(`${self.name}.execute: suspendedRetrySchedule exhausted`))
)
yield* sleep
}
}
result = yield* Workflow.wrapActivityResult(
run,
(result) => result._tag === "Suspended"
Expand Down
7 changes: 7 additions & 0 deletions packages/workflow/src/internal/activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import * as Context from "effect/Context"

export class CurrentActivityExecution
extends Context.Reference<CurrentActivityExecution>()("@effect/workflow/internal/CurrentActivityExecution", {
defaultValue: () => false
})
{}
Loading