diff --git a/.changeset/plenty-tables-wait.md b/.changeset/plenty-tables-wait.md new file mode 100644 index 0000000..4e63895 --- /dev/null +++ b/.changeset/plenty-tables-wait.md @@ -0,0 +1,5 @@ +--- +"@inngest/workflow-kit": patch +--- + +Fixes BFS traversal of conditional branches diff --git a/packages/workflow/src/engine.execution.test.ts b/packages/workflow/src/engine.execution.test.ts index c922e86..e39e9dc 100644 --- a/packages/workflow/src/engine.execution.test.ts +++ b/packages/workflow/src/engine.execution.test.ts @@ -131,6 +131,138 @@ test("execution", async () => { expect(es.state.get("stepB-true")).toBe(42*99); expect(es.state.get("stepC-true")).toBe(2*9); + // Shouldn't run, as the if-a step is true + expect(es.state.get("stepB-false")).toBe(undefined); + expect(es.state.get("stepC-false")).toBe(undefined); +}) + +test("execution-conditional", async () => { + + const engine = new Engine({ + actions: [ + ...Object.values(builtinActions), + { + kind: "multiply", + name: "Multiply some numbers", + handler: async (args) => { + return (args.workflowAction?.inputs?.a || 0) * (args.workflowAction?.inputs?.b || 0) + }, + inputs: { + a: { + type: Type.Number({ + description: "Numerator", + }), + }, + b: { + type: Type.Number({ + description: "Denominator", + }), + }, + }, + outputs: Type.Number(), + } + ] + }); + + const workflow: Workflow = { + actions: [ + { + id: "stepA", + kind: "multiply", + name: "Multiply some numbers", + inputs: { + a: 7, + b: 6, + }, + }, + + // Only continue if the result is 7*6 + { + id: "if-a", + kind: "builtin:if", + name: "If A is 42", + inputs: { + condition: { + "==": [ + "!ref($.state.stepA)", + 7*6, + ], + }, + }, + }, + + { + id: "stepB-true", + kind: "multiply", + name: "multiply result of A", + inputs: { + a: "!ref($.state.stepA)", + b: "!ref($.event.data.age)", + }, + }, + { + id: "stepB-false", + kind: "multiply", + name: "Should never run.", + inputs: { + a: "!ref($.state.stepA)", + b: "!ref($.event.data.age)", + }, + }, + + // Finally, run a conditional to match on numeric values. + { + id: "stepC-true", + kind: "multiply", + name: "Multiply 2 and 9", + inputs: { + a: 2, + b: 9, + }, + }, + { + id: "stepC-false", + kind: "multiply", + name: "Never runs, as equality is false", + inputs: { + a: 2, + b: 9, + }, + }, + ], + edges: [ + { from: SourceNodeID, to: "stepA" }, + + { from: "stepA", to: "if-a" }, + + // Check "true" branches of the builtin:if action + { from: "if-a", to: "stepB-true", conditional: { type: "if", ref: "!ref($.output.result)" } }, + // if-a should evaluate to true so this never runs. + { from: "if-a", to: "stepB-false", conditional: { type: "else", ref: "!ref($.output.result)" } }, + + // Check that step C true path is executed. + { from: "stepB-true", to: "stepC-true" }, + // Check that execution of the false path stops at stepB and does not continue to step C. + { from: "stepB-false", to: "stepC-false" }, + ], + }; + + const es = await engine.run({ + workflow, + event: { + name: "auth/user.created", + data: { + name: "test user", + age: 99, + } + }, + step: {}, + }); + + expect(es.state.get("stepA")).toBe(42); + expect(es.state.get("stepB-true")).toBe(42*99); + expect(es.state.get("stepC-true")).toBe(2*9); + // Shouldn't run, as the if-a step is true expect(es.state.get("stepB-false")).toBe(undefined); expect(es.state.get("stepC-false")).toBe(undefined); diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 660531d..766e533 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -195,8 +195,34 @@ export class ExecutionState { execute = async () => { const { event, step, graph, workflow, engine } = this.#opts; - await bfs(graph, async (action, edge) => { - if (edge.conditional) { + await bfs( + graph, + async (action) => { + // Find the base action from the workflow class. This includes the handler + // to invoke. + const base = engine.actions[action.kind]; + if (!base) { + throw new Error(`Unable to find workflow action for kind: ${action.kind}`); + } + + // Invoke the action directly. + // + // Note: The handler should use Inngest's step API within handlers, ensuring + // that nodes in the workflow execute once, durably. + const workflowAction = { ...action, inputs: this.resolveInputs(action) }; + + const result = await base.handler({ + event, + step, + workflow, + workflowAction, + state: this.#state, + }); + + // And set our state. This may be a previously memoized output. + this.#state.set(action.id, result); + }, + (edge) => { const { type, ref, value } = edge.conditional || {}; // We allow "!ref($.output)" to refer to the previous action's output. @@ -208,13 +234,13 @@ export class ExecutionState { case "if": if (!input) { // This doesn't match, so we skip this edge. - return; + return false; } break; case "else": if (!!input) { // This doesn't match, so we skip this edge. - return; + return false; } break case "match": @@ -222,35 +248,12 @@ export class ExecutionState { // values here. if (JSON.stringify(input) !== JSON.stringify(value)) { // This doesn't match, so we skip this edge. - return; + return false; } } + return true; } - - // Find the base action from the workflow class. This includes the handler - // to invoke. - const base = engine.actions[action.kind]; - if (!base) { - throw new Error(`Unable to find workflow action for kind: ${action.kind}`); - } - - // Invoke the action directly. - // - // Note: The handler should use Inngest's step API within handlers, ensuring - // that nodes in the workflow execute once, durably. - const workflowAction = { ...action, inputs: this.resolveInputs(action) }; - - const result = await base.handler({ - event, - step, - workflow, - workflowAction, - state: this.#state, - }); - - // And set our state. This may be a previously memoized output. - this.#state.set(action.id, result); - }); + ); } /** diff --git a/packages/workflow/src/graph.test.ts b/packages/workflow/src/graph.test.ts index ddc03bb..39e539f 100644 --- a/packages/workflow/src/graph.test.ts +++ b/packages/workflow/src/graph.test.ts @@ -153,3 +153,89 @@ test("bfs with a tree that has multiple paths to the same node", async () => { // a2 should not be hit twice expect(hits).toEqual(4); }); + +test("bfs with conditionals", async () => { + const a1 = { id: "a1", "kind": "test" }; + const a2 = { id: "a2", "kind": "test" }; + const a3 = { id: "a3", "kind": "test" }; + + const dag = newDAG({ + actions: [a1, a2, a3], + edges: [ + { from: "$source", to: "a1" }, + { from: "a1", to: "a2", conditional: { type: "if", ref: "!ref($.output)", value: true }}, + { from: "a1", to: "a3", conditional: { type: "else", ref: "!ref($.output)", value: false }}, + ], + }); + + let hits = 0; + let visited: string[] = []; + await bfs(dag, async (n, _e) => { + // Assert the order is deterministic, based off of edge ordering, and that a3 is never encountered. + switch (hits) { + case 0: + expect(n).toEqual(a1); + break; + case 1: + expect(n).toEqual(a2); + break; + } + visited.push(n.id); + hits++; + }, (edge) => { + if (edge.conditional?.type === "if") { + return true; + } + if (edge.conditional?.type === "else") { + return false; + } + return true + }); + + expect(hits).toEqual(2); + expect(visited).toEqual(["a1", "a2"]); + + const a4 = { id: "a4", "kind": "test" }; + const a5 = { id: "a5", "kind": "test" }; + + const dag2 = newDAG({ + actions: [a1, a2, a3, a4, a5], + edges: [ + { from: "$source", to: "a1" }, + { from: "a1", to: "a2", conditional: { type: "if", ref: "!ref($.output)", value: true }}, + { from: "a1", to: "a3", conditional: { type: "else", ref: "!ref($.output)", value: false }}, + { from: "a2", to: "a4" }, + { from: "a3", to: "a5" }, + ], + }); + + hits = 0; + visited = []; + await bfs(dag2, async (n, _e) => { + // Assert the order is deterministic, based off of edge ordering, and that a3 and a5 are never encountered. + switch (hits) { + case 0: + expect(n).toEqual(a1); + break; + case 1: + expect(n).toEqual(a2); + break; + case 2: + expect(n).toBe(a4); + break; + } + visited.push(n.id); + hits++; + }, (edge) => { + if (edge.conditional?.type === "if") { + return true; + } + if (edge.conditional?.type === "else") { + return false; + } + return true + }); + + expect(hits).toEqual(3); + expect(visited).toEqual(["a1", "a2", "a4"]); +}) \ No newline at end of file diff --git a/packages/workflow/src/graph.ts b/packages/workflow/src/graph.ts index 71d8e65..6ac5a85 100644 --- a/packages/workflow/src/graph.ts +++ b/packages/workflow/src/graph.ts @@ -43,7 +43,7 @@ export const newDAG = (flow: Workflow): DAG => { return g; } -export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowEdge) => Promise): Promise => { +export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowEdge) => Promise, conditional?: (edge: WorkflowEdge) => boolean,): Promise => { if (graph.order <= 1) { // Only the event/source exists; do nothing. return; @@ -62,6 +62,13 @@ export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowE if (seen.has(id)) { return; } + + // Check conditional + const edge = graph.getEdgeAttributes(next, id); + if (conditional && !conditional(edge.edge)) { + return; + } + // We want to iterate into each action afterwards, outside of this function // for async support. nodes.push(node);