Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/plenty-tables-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@inngest/workflow-kit": patch
---

Fixes BFS traversal of conditional branches
132 changes: 132 additions & 0 deletions packages/workflow/src/engine.execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,138 @@ test("execution", async () => {
expect(es.state.get("stepB-true")).toBe(42*99);
expect(es.state.get("stepC-true")).toBe(2*9);

// Shouldn't run, as the if-a step is true
expect(es.state.get("stepB-false")).toBe(undefined);
expect(es.state.get("stepC-false")).toBe(undefined);
})

test("execution-conditional", async () => {

const engine = new Engine({
actions: [
...Object.values(builtinActions),
{
kind: "multiply",
name: "Multiply some numbers",
handler: async (args) => {
return (args.workflowAction?.inputs?.a || 0) * (args.workflowAction?.inputs?.b || 0)
},
inputs: {
a: {
type: Type.Number({
description: "Numerator",
}),
},
b: {
type: Type.Number({
description: "Denominator",
}),
},
},
outputs: Type.Number(),
}
]
});

const workflow: Workflow = {
actions: [
{
id: "stepA",
kind: "multiply",
name: "Multiply some numbers",
inputs: {
a: 7,
b: 6,
},
},

// Only continue if the result is 7*6
{
id: "if-a",
kind: "builtin:if",
name: "If A is 42",
inputs: {
condition: {
"==": [
"!ref($.state.stepA)",
7*6,
],
},
},
},

{
id: "stepB-true",
kind: "multiply",
name: "multiply result of A",
inputs: {
a: "!ref($.state.stepA)",
b: "!ref($.event.data.age)",
},
},
{
id: "stepB-false",
kind: "multiply",
name: "Should never run.",
inputs: {
a: "!ref($.state.stepA)",
b: "!ref($.event.data.age)",
},
},

// Finally, run a conditional to match on numeric values.
{
id: "stepC-true",
kind: "multiply",
name: "Multiply 2 and 9",
inputs: {
a: 2,
b: 9,
},
},
{
id: "stepC-false",
kind: "multiply",
name: "Never runs, as equality is false",
inputs: {
a: 2,
b: 9,
},
},
],
edges: [
{ from: SourceNodeID, to: "stepA" },

{ from: "stepA", to: "if-a" },

// Check "true" branches of the builtin:if action
{ from: "if-a", to: "stepB-true", conditional: { type: "if", ref: "!ref($.output.result)" } },
// if-a should evaluate to true so this never runs.
{ from: "if-a", to: "stepB-false", conditional: { type: "else", ref: "!ref($.output.result)" } },

// Check that step C true path is executed.
{ from: "stepB-true", to: "stepC-true" },
// Check that execution of the false path stops at stepB and does not continue to step C.
{ from: "stepB-false", to: "stepC-false" },
],
};

const es = await engine.run({
workflow,
event: {
name: "auth/user.created",
data: {
name: "test user",
age: 99,
}
},
step: {},
});

expect(es.state.get("stepA")).toBe(42);
expect(es.state.get("stepB-true")).toBe(42*99);
expect(es.state.get("stepC-true")).toBe(2*9);

// Shouldn't run, as the if-a step is true
expect(es.state.get("stepB-false")).toBe(undefined);
expect(es.state.get("stepC-false")).toBe(undefined);
Expand Down
63 changes: 33 additions & 30 deletions packages/workflow/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,34 @@ export class ExecutionState {
execute = async () => {
const { event, step, graph, workflow, engine } = this.#opts;

await bfs(graph, async (action, edge) => {
if (edge.conditional) {
await bfs(
graph,
async (action) => {
// Find the base action from the workflow class. This includes the handler
// to invoke.
const base = engine.actions[action.kind];
if (!base) {
throw new Error(`Unable to find workflow action for kind: ${action.kind}`);
}

// Invoke the action directly.
//
// Note: The handler should use Inngest's step API within handlers, ensuring
// that nodes in the workflow execute once, durably.
const workflowAction = { ...action, inputs: this.resolveInputs(action) };

const result = await base.handler({
event,
step,
workflow,
workflowAction,
state: this.#state,
});

// And set our state. This may be a previously memoized output.
this.#state.set(action.id, result);
},
(edge) => {
const { type, ref, value } = edge.conditional || {};

// We allow "!ref($.output)" to refer to the previous action's output.
Expand All @@ -208,49 +234,26 @@ export class ExecutionState {
case "if":
if (!input) {
// This doesn't match, so we skip this edge.
return;
return false;
}
break;
case "else":
if (!!input) {
// This doesn't match, so we skip this edge.
return;
return false;
}
break
case "match":
// Because object equality is what it is, we JSON stringify both
// values here.
if (JSON.stringify(input) !== JSON.stringify(value)) {
// This doesn't match, so we skip this edge.
return;
return false;
}
}
return true;
}

// Find the base action from the workflow class. This includes the handler
// to invoke.
const base = engine.actions[action.kind];
if (!base) {
throw new Error(`Unable to find workflow action for kind: ${action.kind}`);
}

// Invoke the action directly.
//
// Note: The handler should use Inngest's step API within handlers, ensuring
// that nodes in the workflow execute once, durably.
const workflowAction = { ...action, inputs: this.resolveInputs(action) };

const result = await base.handler({
event,
step,
workflow,
workflowAction,
state: this.#state,
});

// And set our state. This may be a previously memoized output.
this.#state.set(action.id, result);
});
);
}

/**
Expand Down
86 changes: 86 additions & 0 deletions packages/workflow/src/graph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,89 @@ test("bfs with a tree that has multiple paths to the same node", async () => {
// a2 should not be hit twice
expect(hits).toEqual(4);
});

test("bfs with conditionals", async () => {
const a1 = { id: "a1", "kind": "test" };
const a2 = { id: "a2", "kind": "test" };
const a3 = { id: "a3", "kind": "test" };

const dag = newDAG({
actions: [a1, a2, a3],
edges: [
{ from: "$source", to: "a1" },
{ from: "a1", to: "a2", conditional: { type: "if", ref: "!ref($.output)", value: true }},
{ from: "a1", to: "a3", conditional: { type: "else", ref: "!ref($.output)", value: false }},
],
});

let hits = 0;
let visited: string[] = [];
await bfs(dag, async (n, _e) => {
// Assert the order is deterministic, based off of edge ordering, and that a3 is never encountered.
switch (hits) {
case 0:
expect(n).toEqual(a1);
break;
case 1:
expect(n).toEqual(a2);
break;
}
visited.push(n.id);
hits++;
}, (edge) => {
if (edge.conditional?.type === "if") {
return true;
}
if (edge.conditional?.type === "else") {
return false;
}
return true
});

expect(hits).toEqual(2);
expect(visited).toEqual(["a1", "a2"]);

const a4 = { id: "a4", "kind": "test" };
const a5 = { id: "a5", "kind": "test" };

const dag2 = newDAG({
actions: [a1, a2, a3, a4, a5],
edges: [
{ from: "$source", to: "a1" },
{ from: "a1", to: "a2", conditional: { type: "if", ref: "!ref($.output)", value: true }},
{ from: "a1", to: "a3", conditional: { type: "else", ref: "!ref($.output)", value: false }},
{ from: "a2", to: "a4" },
{ from: "a3", to: "a5" },
],
});

hits = 0;
visited = [];
await bfs(dag2, async (n, _e) => {
// Assert the order is deterministic, based off of edge ordering, and that a3 and a5 are never encountered.
switch (hits) {
case 0:
expect(n).toEqual(a1);
break;
case 1:
expect(n).toEqual(a2);
break;
case 2:
expect(n).toBe(a4);
break;
}
visited.push(n.id);
hits++;
}, (edge) => {
if (edge.conditional?.type === "if") {
return true;
}
if (edge.conditional?.type === "else") {
return false;
}
return true
});

expect(hits).toEqual(3);
expect(visited).toEqual(["a1", "a2", "a4"]);
})
9 changes: 8 additions & 1 deletion packages/workflow/src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const newDAG = (flow: Workflow): DAG => {
return g;
}

export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowEdge) => Promise<any>): Promise<any> => {
export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowEdge) => Promise<any>, conditional?: (edge: WorkflowEdge) => boolean,): Promise<any> => {
if (graph.order <= 1) {
// Only the event/source exists; do nothing.
return;
Expand All @@ -62,6 +62,13 @@ export const bfs = async (graph: DAG, cb: (node: WorkflowAction, edge: WorkflowE
if (seen.has(id)) {
return;
}

// Check conditional
const edge = graph.getEdgeAttributes(next, id);
if (conditional && !conditional(edge.edge)) {
return;
}

// We want to iterate into each action afterwards, outside of this function
// for async support.
nodes.push(node);
Expand Down