Skip to content

Commit dfde083

Browse files
authored
Merge pull request #4 from flamingo-run/feature/subworkflows
Subworkflows
2 parents 8b86643 + cada93a commit dfde083

11 files changed

Lines changed: 580 additions & 7 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ Priority order for upcoming features:
116116
| --- | --- | --- | --- |
117117
| 1 | **Retries** | ✅ Completed | `RetryPolicy` with configurable backoff and predicates |
118118
| 2 | **Try/catch** | ✅ Completed | `TryCatchStep` for exception handling with fallback flows |
119-
| 3 | **Subworkflows** | 📋 Planned | Call other workflows, composition patterns |
119+
| 3 | **Subworkflows** | ✅ Completed | Call other workflows with `>> WORKFLOW`, dependency tracking |
120120
| 4 | **GCP connectors** | 📋 Planned | Direct service calls, native Cloud Workflows connectors |
121121
| 5 | **Deployment API** | 📋 Planned | Programmatic deployment of workflows & EventArc triggers via GCP APIs |
122122
| 6 | **Loops** | 📋 Planned | For/while constructs, iteration over collections |
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Order processing with payment as a subworkflow."""
2+
3+
from __future__ import annotations
4+
5+
from pydantic import BaseModel
6+
7+
from fastapi_cloudflow import Context, step, workflow
8+
9+
10+
# Payment workflow models and steps
11+
class PaymentRequest(BaseModel):
12+
amount: float
13+
customer_id: str
14+
payment_method: str
15+
16+
17+
class PaymentResult(BaseModel):
18+
transaction_id: str
19+
status: str
20+
amount: float
21+
22+
23+
@step(name="validate-payment")
24+
async def validate_payment(ctx: Context, data: PaymentRequest) -> PaymentRequest:
25+
"""Validate payment details."""
26+
if data.amount <= 0:
27+
raise ValueError("Invalid payment amount")
28+
return data
29+
30+
31+
@step(name="charge-card")
32+
async def charge_card(ctx: Context, data: PaymentRequest) -> PaymentResult:
33+
"""Process the payment."""
34+
return PaymentResult(transaction_id=f"txn_{data.customer_id}_{data.amount}", status="success", amount=data.amount)
35+
36+
37+
# Build the payment workflow
38+
PAYMENT_WORKFLOW = (workflow("payment-processing") >> validate_payment >> charge_card).build()
39+
40+
41+
# Order workflow models and steps
42+
class OrderRequest(BaseModel):
43+
order_id: str
44+
customer_id: str
45+
items: list[str]
46+
total_amount: float
47+
48+
49+
class OrderValidated(BaseModel):
50+
order_id: str
51+
customer_id: str
52+
amount: float
53+
payment_method: str
54+
55+
56+
class OrderComplete(BaseModel):
57+
order_id: str
58+
transaction_id: str
59+
status: str
60+
61+
62+
@step(name="validate-order")
63+
async def validate_order(ctx: Context, data: OrderRequest) -> PaymentRequest:
64+
"""Validate the order and prepare payment request."""
65+
return PaymentRequest(amount=data.total_amount, customer_id=data.customer_id, payment_method="credit_card")
66+
67+
68+
@step(name="ship-order")
69+
async def ship_order(ctx: Context, data: PaymentResult) -> OrderComplete:
70+
"""Ship the order after successful payment."""
71+
return OrderComplete(
72+
order_id="order_123", # Would normally come from context
73+
transaction_id=data.transaction_id,
74+
status="shipped",
75+
)
76+
77+
78+
# Build the order workflow using payment as a subworkflow
79+
ORDER_WORKFLOW = (
80+
workflow("order-processing")
81+
>> validate_order
82+
>> PAYMENT_WORKFLOW # Use payment workflow as a subworkflow
83+
>> ship_order
84+
).build()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastapi-cloudflow"
3-
version = "0.0.2"
3+
version = "0.0.3"
44
description = "Typed Python-to-Google Cloud Workflows framework with FastAPI codegen"
55
readme = "README.md"
66
requires-python = ">=3.13"

src/fastapi_cloudflow/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
ModelAdapter,
77
RetryPolicy,
88
Step,
9+
SubworkflowStep,
910
TryCatchBuilder,
1011
TryCatchStep,
1112
Workflow,
1213
get_registry,
14+
get_workflow_dependencies,
1315
get_workflows,
1416
step,
1517
try_catch,
@@ -25,13 +27,15 @@
2527
"Step",
2628
"Workflow",
2729
"RetryPolicy",
30+
"SubworkflowStep",
2831
"TryCatchStep",
2932
"TryCatchBuilder",
3033
"AssignStep",
3134
"HttpStep",
3235
"ModelAdapter",
3336
"Arg",
3437
"get_registry",
38+
"get_workflow_dependencies",
3539
"get_workflows",
3640
"attach_to_fastapi",
3741
"build_app",

src/fastapi_cloudflow/codegen/workflows.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,35 @@ def _process_single_step(
8080
)
8181
return steps, have_run_id
8282

83+
# Handle SubworkflowStep
84+
from fastapi_cloudflow.core.subworkflow import SubworkflowStep
85+
86+
if isinstance(node, SubworkflowStep):
87+
result_var = f"res_{idx}"
88+
args = {"workflow_id": node.workflow_id, "argument": f"${{{payload_var}}}"}
89+
90+
# Apply input mapping if specified
91+
if node.input_mapping:
92+
mapped_args = {k: _as_yaml_expr(v) for k, v in node.input_mapping.items()}
93+
args["argument"] = f"${mapped_args}"
94+
95+
step_def: dict[str, Any] = {"call": "workflows.executeWorkflow", "args": args}
96+
97+
if node.wait:
98+
step_def["result"] = result_var
99+
steps.append({f"call_{node.name}": step_def})
100+
# Extract result and apply output mapping
101+
if node.output_mapping:
102+
mapped_output = {k: f"${{{result_var}.{v}}}" for k, v in node.output_mapping.items()}
103+
steps.append({f"map_output_{idx}": {"assign": [{payload_var: mapped_output}]}})
104+
else:
105+
steps.append({f"set_payload_{idx}": {"assign": [{payload_var: f"${{{result_var}}}"}]}})
106+
else:
107+
# Fire-and-forget mode (no result)
108+
steps.append({f"call_{node.name}": step_def})
109+
110+
return steps, have_run_id
111+
83112
if isinstance(node, HttpStep):
84113
method = node.method.lower()
85114
result_var = f"res_{idx}"

src/fastapi_cloudflow/core/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from fastapi_cloudflow.core.arg import Arg, ArgExpr
22
from fastapi_cloudflow.core.error_handling import TryCatchBuilder, TryCatchStep, try_catch
33
from fastapi_cloudflow.core.step import AssignStep, HttpStep, ModelAdapter, Step
4+
from fastapi_cloudflow.core.subworkflow import SubworkflowStep
45
from fastapi_cloudflow.core.types import Context, RetryPolicy, WorkflowMeta
56
from fastapi_cloudflow.core.workflow import (
67
Registry,
78
Workflow,
89
WorkflowBuilder,
910
get_registry,
11+
get_workflow_dependencies,
1012
get_workflows,
1113
step,
1214
workflow,
@@ -22,6 +24,7 @@
2224
"AssignStep",
2325
"HttpStep",
2426
"ModelAdapter",
27+
"SubworkflowStep",
2528
"TryCatchStep",
2629
"TryCatchBuilder",
2730
"try_catch",
@@ -30,6 +33,7 @@
3033
"WorkflowBuilder",
3134
"workflow",
3235
"get_registry",
36+
"get_workflow_dependencies",
3337
"get_workflows",
3438
"step",
3539
]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Subworkflow support for Cloud Workflows."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any, TypeVar
6+
7+
from pydantic import BaseModel
8+
9+
from fastapi_cloudflow.core.step import Step
10+
11+
InT = TypeVar("InT", bound=BaseModel)
12+
OutT = TypeVar("OutT", bound=BaseModel)
13+
14+
15+
class SubworkflowStep[InT: BaseModel, OutT: BaseModel](Step[InT, OutT]):
16+
"""
17+
A step that executes another workflow.
18+
19+
This step represents a call to another Cloud Workflow, enabling
20+
workflow composition and reusability.
21+
"""
22+
23+
def __init__(
24+
self,
25+
workflow_id: str,
26+
input_model: type[InT],
27+
output_model: type[OutT],
28+
wait: bool = True,
29+
input_mapping: dict[str, Any] | None = None,
30+
output_mapping: dict[str, Any] | None = None,
31+
) -> None:
32+
"""
33+
Initialize a subworkflow step.
34+
35+
Args:
36+
workflow_id: The ID of the workflow to call
37+
input_model: The input model type for this step
38+
output_model: The output model type for this step
39+
wait: Whether to wait for the subworkflow to complete (default: True)
40+
input_mapping: Optional mapping for input parameters
41+
output_mapping: Optional mapping for output results
42+
"""
43+
super().__init__(
44+
name=f"call_{workflow_id.replace('-', '_')}",
45+
input_model=input_model,
46+
output_model=output_model,
47+
fn=None, # Subworkflows are not callable locally
48+
)
49+
self.workflow_id = workflow_id
50+
self.wait = wait
51+
self.input_mapping = input_mapping or {}
52+
self.output_mapping = output_mapping or {}

src/fastapi_cloudflow/core/workflow.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,24 @@ class Workflow:
1414
def __init__(self, name: str, nodes: list[Step[Any, Any]]) -> None:
1515
self.name = name
1616
self.nodes = nodes
17+
self.dependencies: set[str] = set()
18+
19+
def to_subworkflow_step(self) -> Step[Any, Any]:
20+
"""Convert this workflow to a SubworkflowStep for composition."""
21+
from fastapi_cloudflow.core.subworkflow import SubworkflowStep
22+
23+
# Get input/output models from first and last steps
24+
if not self.nodes:
25+
raise ValueError(f"Workflow '{self.name}' has no steps")
26+
27+
first_step = self.nodes[0]
28+
last_step = self.nodes[-1]
29+
30+
return SubworkflowStep(
31+
workflow_id=self.name,
32+
input_model=first_step.input_model,
33+
output_model=last_step.output_model,
34+
)
1735

1836

1937
class Registry:
@@ -44,21 +62,36 @@ class WorkflowBuilder:
4462
def __init__(self, name: str, nodes: list[Step[Any, Any]] | None = None) -> None:
4563
self.name = name
4664
self.nodes = nodes or []
65+
self._dependencies: set[str] = set()
66+
67+
def __rshift__(self, other: Step[Any, Any] | Workflow) -> WorkflowBuilder:
68+
# Convert Workflow to SubworkflowStep if needed
69+
if isinstance(other, Workflow):
70+
step = other.to_subworkflow_step()
71+
# Track dependency
72+
self._dependencies.add(other.name)
73+
else:
74+
step = other
4775

48-
def __rshift__(self, other: Step[Any, Any]) -> WorkflowBuilder:
4976
if self.nodes:
5077
prev = self.nodes[-1]
51-
if prev.output_model is not other.input_model:
78+
if prev.output_model is not step.input_model:
5279
raise TypeError(
5380
f"Type mismatch: {prev.name} outputs {prev.output_model.__name__} "
54-
f"but {other.name} expects {other.input_model.__name__}"
81+
f"but {step.name} expects {step.input_model.__name__}"
5582
)
56-
return WorkflowBuilder(self.name, self.nodes + [other])
83+
84+
# Create new builder with updated nodes and preserve dependencies
85+
new_builder = WorkflowBuilder(self.name, self.nodes + [step])
86+
new_builder._dependencies = self._dependencies.copy()
87+
return new_builder
5788

5889
def build(self) -> Workflow:
5990
if not self.nodes:
6091
raise ValueError("Workflow has no steps")
6192
wf = Workflow(self.name, self.nodes)
93+
# Transfer any tracked dependencies
94+
wf.dependencies = self._dependencies
6295
_REGISTRY.register_workflow(wf)
6396
return wf
6497

@@ -109,3 +142,12 @@ def get_registry() -> Registry:
109142

110143
def get_workflows() -> list[Workflow]:
111144
return _REGISTRY.get_workflows()
145+
146+
147+
def get_workflow_dependencies() -> dict[str, set[str]]:
148+
"""Get all workflow dependencies as a mapping."""
149+
deps: dict[str, set[str]] = {}
150+
for wf in _REGISTRY.workflows.values():
151+
if wf.dependencies:
152+
deps[wf.name] = wf.dependencies
153+
return deps

0 commit comments

Comments
 (0)