|
| 1 | +from pathlib import Path |
| 2 | +from typing import Any |
| 3 | + |
| 4 | +import yaml |
| 5 | + |
| 6 | +from ..core import AssignStep, HttpStep, Workflow |
| 7 | + |
| 8 | + |
| 9 | +def _is_arg_expr(v: Any) -> bool: |
| 10 | + return hasattr(v, "expr") and isinstance(v.expr, str) |
| 11 | + |
| 12 | + |
| 13 | +def _as_yaml_expr(v: Any) -> Any: |
| 14 | + if _is_arg_expr(v): |
| 15 | + return f"${{{v.expr}}}" |
| 16 | + return v |
| 17 | + |
| 18 | + |
| 19 | +def _concat_expr(left_expr: str, right_literal: str) -> str: |
| 20 | + return f'${{{left_expr} + "{right_literal}"}}' |
| 21 | + |
| 22 | + |
| 23 | +WORKFLOW_NAME_EXPR = 'sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")' |
| 24 | + |
| 25 | + |
| 26 | +def workflow_to_yaml_dict(wf: Workflow, base_url_expr: str = 'sys.get_env("BASE_URL")') -> dict[str, Any]: |
| 27 | + steps: list[dict[str, Any]] = [] |
| 28 | + payload_var = "payload" |
| 29 | + have_run_id = False |
| 30 | + |
| 31 | + def _with_required_headers( |
| 32 | + existing: dict[str, Any] | None, include_run_id: bool, include_content_type: bool |
| 33 | + ) -> dict[str, Any]: |
| 34 | + headers: dict[str, Any] = {} |
| 35 | + # Always include workflow name from env |
| 36 | + headers["X-Workflow-Name"] = f"${{{WORKFLOW_NAME_EXPR}}}" |
| 37 | + if include_run_id: |
| 38 | + headers["X-Workflow-Run-Id"] = "${run_id}" |
| 39 | + # Only set Content-Type when sending a body |
| 40 | + if include_content_type: |
| 41 | + headers["Content-Type"] = "application/json" |
| 42 | + if existing: |
| 43 | + headers.update({k: _as_yaml_expr(v) for k, v in existing.items()}) |
| 44 | + return headers |
| 45 | + |
| 46 | + for idx, node in enumerate(wf.nodes): |
| 47 | + if isinstance(node, AssignStep): |
| 48 | + steps.append( |
| 49 | + {f"assign_{idx}": {"assign": [{payload_var: {k: _as_yaml_expr(v) for k, v in node.expr.items()}}]}} |
| 50 | + ) |
| 51 | + continue |
| 52 | + |
| 53 | + if isinstance(node, HttpStep): |
| 54 | + method = node.method.lower() |
| 55 | + result_var = f"res_{idx}" |
| 56 | + args: dict[str, Any] = {"url": _as_yaml_expr(node.url)} |
| 57 | + # Only include body for non-GET methods; http.get does not accept a body argument |
| 58 | + if method != "get": |
| 59 | + args["body"] = f"${{{payload_var}}}" |
| 60 | + args["headers"] = _with_required_headers( |
| 61 | + node.headers or {}, have_run_id, include_content_type=(method != "get") |
| 62 | + ) |
| 63 | + if node.auth: |
| 64 | + args["auth"] = {k: _as_yaml_expr(v) for k, v in node.auth.items()} |
| 65 | + if node.timeout: |
| 66 | + args["timeout"] = int(node.timeout.total_seconds()) |
| 67 | + |
| 68 | + steps.append({f"call_{node.name}": {"call": f"http.{method}", "args": args, "result": result_var}}) |
| 69 | + steps.append({f"set_payload_{idx}": {"assign": [{payload_var: f"${{{result_var}.body}}"}]}}) |
| 70 | + continue |
| 71 | + |
| 72 | + # Python step via FastAPI endpoint |
| 73 | + result_var = f"res_{idx}" |
| 74 | + url_expr = _concat_expr(base_url_expr, f"/steps/{node.name}") |
| 75 | + args = { |
| 76 | + "url": url_expr, |
| 77 | + "body": f"${{{payload_var}}}", |
| 78 | + "headers": _with_required_headers({}, have_run_id, include_content_type=True), |
| 79 | + # Authenticate calls to Cloud Run using the workflow's service account |
| 80 | + "auth": {"type": "OIDC", "audience": f"${{{base_url_expr}}}"}, |
| 81 | + } |
| 82 | + steps.append({f"call_{node.name}": {"call": "http.post", "args": args, "result": result_var}}) |
| 83 | + steps.append({f"set_payload_{idx}": {"assign": [{payload_var: f"${{{result_var}.body}}"}]}}) |
| 84 | + if not have_run_id: |
| 85 | + steps.append( |
| 86 | + {f"capture_run_id_{idx}": {"assign": [{"run_id": f'${{{result_var}.headers["X-Workflow-Run-Id"]}}'}]}} |
| 87 | + ) |
| 88 | + have_run_id = True |
| 89 | + |
| 90 | + steps.append({"return_final": {"return": f"${{{payload_var}}}"}}) |
| 91 | + return {"main": {"params": [payload_var], "steps": steps}} |
| 92 | + |
| 93 | + |
| 94 | +def emit_workflow_yaml(wf: Workflow, out_dir: Path, base_url_expr: str | None = None) -> Path: |
| 95 | + out_dir.mkdir(parents=True, exist_ok=True) |
| 96 | + data = workflow_to_yaml_dict(wf, base_url_expr=base_url_expr or 'sys.get_env("BASE_URL")') |
| 97 | + path = out_dir / f"{wf.name}.yaml" |
| 98 | + with path.open("w", encoding="utf-8") as fh: |
| 99 | + yaml.safe_dump(data, fh, sort_keys=False) |
| 100 | + return path |
0 commit comments