Skip to content

Commit ce39a29

Browse files
committed
feat: add support to PubSub connector
1 parent 7c4f40f commit ce39a29

10 files changed

Lines changed: 289 additions & 5 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ Priority order for upcoming features:
117117
| 1 | **Retries** | ✅ Completed | `RetryPolicy` with configurable backoff and predicates |
118118
| 2 | **Try/catch** | ✅ Completed | `TryCatchStep` for exception handling with fallback flows |
119119
| 3 | **Subworkflows** | ✅ Completed | Call other workflows with `>> WORKFLOW`, dependency tracking |
120-
| 4 | **GCP connectors** | 📋 Planned | Direct service calls, native Cloud Workflows connectors |
120+
| 4 | **GCP connectors** | ✅ Completed | Direct service calls using Cloud Workflows connectors (Pub/Sub first) |
121121
| 5 | **Deployment API** | 📋 Planned | Programmatic deployment of workflows & EventArc triggers via GCP APIs |
122122
| 6 | **Loops** | 📋 Planned | For/while constructs, iteration over collections |
123123
| 7 | **Conditionals / switch** | 📋 Planned | Branching logic, switch statements |
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""Example workflow demonstrating Pub/Sub connector support."""
2+
3+
from __future__ import annotations
4+
5+
from pydantic import BaseModel
6+
7+
from fastapi_cloudflow import Context, step, workflow
8+
from fastapi_cloudflow.core.connectors import pubsub_publish_step
9+
10+
11+
class PublishRequest(BaseModel):
12+
topic: str
13+
payload: str
14+
15+
16+
class PublishResult(BaseModel):
17+
message_ids: list[str]
18+
19+
20+
@step(name="prepare-message")
21+
async def prepare_message(ctx: Context, data: PublishRequest) -> PublishRequest:
22+
return data
23+
24+
25+
publish_step = pubsub_publish_step(
26+
name="publish-topic",
27+
topic="projects/demo/topics/example",
28+
input_model=PublishRequest,
29+
data_field="payload",
30+
)
31+
32+
33+
PUBSUB_EXAMPLE_FLOW = (workflow("pubsub-example-flow") >> prepare_message >> publish_step).build()

src/fastapi_cloudflow/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from fastapi_cloudflow.core import (
22
Arg,
33
AssignStep,
4+
ConnectorCall,
5+
ConnectorStep,
46
Context,
57
HttpStep,
68
ModelAdapter,
9+
PubSubPublishResult,
710
RetryPolicy,
811
Step,
912
SubworkflowStep,
@@ -27,11 +30,14 @@
2730
"Step",
2831
"Workflow",
2932
"RetryPolicy",
33+
"ConnectorCall",
34+
"PubSubPublishResult",
3035
"SubworkflowStep",
3136
"TryCatchStep",
3237
"TryCatchBuilder",
3338
"AssignStep",
3439
"HttpStep",
40+
"ConnectorStep",
3541
"ModelAdapter",
3642
"Arg",
3743
"get_registry",

src/fastapi_cloudflow/codegen/workflows.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import yaml
55

6-
from ..core import AssignStep, HttpStep, TryCatchStep, Workflow
6+
from ..core import AssignStep, ConnectorStep, HttpStep, TryCatchStep, Workflow
77

88

99
def _is_arg_expr(v: Any) -> bool:
@@ -132,6 +132,29 @@ def _process_single_step(
132132
steps.append({f"set_payload_{idx}": {"assign": [{payload_var: f"${{{result_var}.body}}"}]}})
133133
return steps, have_run_id
134134

135+
if isinstance(node, ConnectorStep):
136+
result_var = f"res_{idx}"
137+
call = node.call
138+
139+
args = {k: _as_yaml_expr(v) for k, v in call.args.items()}
140+
141+
step_def: dict[str, Any] = {
142+
"call": call.call,
143+
"args": args,
144+
"result": result_var,
145+
}
146+
147+
if node.timeout:
148+
step_def.setdefault("args", {})["timeout"] = int(node.timeout.total_seconds())
149+
150+
retry_config = _emit_retry_config(node.retry)
151+
if retry_config:
152+
step_def["retry"] = retry_config
153+
154+
steps.append({f"call_{node.name}": step_def})
155+
steps.append({f"set_payload_{idx}": {"assign": [{payload_var: f"${{{result_var}}}"}]}})
156+
return steps, have_run_id
157+
135158
# Python step via FastAPI endpoint
136159
result_var = f"res_{idx}"
137160
url_expr = _concat_expr(base_url_expr, f"/steps/{node.name}")

src/fastapi_cloudflow/core/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from fastapi_cloudflow.core.arg import Arg, ArgExpr
2+
from fastapi_cloudflow.core.connectors import PubSubPublishResult
23
from fastapi_cloudflow.core.error_handling import TryCatchBuilder, TryCatchStep, try_catch
3-
from fastapi_cloudflow.core.step import AssignStep, HttpStep, ModelAdapter, Step
4+
from fastapi_cloudflow.core.step import AssignStep, ConnectorStep, HttpStep, ModelAdapter, Step
45
from fastapi_cloudflow.core.subworkflow import SubworkflowStep
5-
from fastapi_cloudflow.core.types import Context, RetryPolicy, WorkflowMeta
6+
from fastapi_cloudflow.core.types import ConnectorCall, Context, RetryPolicy, WorkflowMeta
67
from fastapi_cloudflow.core.workflow import (
78
Registry,
89
Workflow,
@@ -18,11 +19,14 @@
1819
"Context",
1920
"WorkflowMeta",
2021
"RetryPolicy",
22+
"ConnectorCall",
23+
"PubSubPublishResult",
2124
"ArgExpr",
2225
"Arg",
2326
"Step",
2427
"AssignStep",
2528
"HttpStep",
29+
"ConnectorStep",
2630
"ModelAdapter",
2731
"SubworkflowStep",
2832
"TryCatchStep",

src/fastapi_cloudflow/core/arg.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import json
4+
35

46
class ArgExpr:
57
def __init__(self, expr: str) -> None:
@@ -38,3 +40,15 @@ def param(path: str) -> ArgExpr:
3840
@staticmethod
3941
def ctx(key: str) -> ArgExpr:
4042
return ArgExpr(f"ctx.{key}")
43+
44+
@staticmethod
45+
def expr(expression: str) -> ArgExpr:
46+
return ArgExpr(expression)
47+
48+
@staticmethod
49+
def base64(value: str | ArgExpr) -> ArgExpr:
50+
if isinstance(value, ArgExpr):
51+
inner = value.expr
52+
else:
53+
inner = json.dumps(value)
54+
return ArgExpr(f"base64.encode({inner})")
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""Helpers for Google Cloud connector steps."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import Mapping
6+
from datetime import timedelta
7+
from typing import Any, TypeVar
8+
9+
from pydantic import BaseModel
10+
11+
from fastapi_cloudflow.core.arg import Arg, ArgExpr
12+
from fastapi_cloudflow.core.step import ConnectorStep
13+
from fastapi_cloudflow.core.types import ConnectorCall, RetryPolicy
14+
15+
InT = TypeVar("InT", bound=BaseModel)
16+
OutT = TypeVar("OutT", bound=BaseModel)
17+
18+
19+
_PUBSUB_TOPIC_PREFIX = "projects/"
20+
21+
22+
def _validate_topic_path(topic: str) -> None:
23+
if not topic.startswith(_PUBSUB_TOPIC_PREFIX):
24+
raise ValueError("Pub/Sub topic must be fully qualified: 'projects/<project>/topics/<topic_id>'")
25+
parts = topic.split("/")
26+
if len(parts) != 4:
27+
raise ValueError("Pub/Sub topic must follow 'projects/<project>/topics/<topic_id>' format")
28+
_, project, resource, topic_id = parts
29+
if not project or resource != "topics" or not topic_id:
30+
raise ValueError("Pub/Sub topic must include non-empty project and topic id")
31+
32+
33+
def pubsub_message(
34+
*,
35+
data: str | ArgExpr,
36+
attributes: Mapping[str, str | ArgExpr] | ArgExpr | None = None,
37+
ordering_key: str | ArgExpr | None = None,
38+
) -> dict[str, Any]:
39+
"""Build a Pub/Sub message payload for the publish API."""
40+
41+
message: dict[str, Any] = {"data": Arg.base64(data)}
42+
43+
if attributes is not None:
44+
if isinstance(attributes, ArgExpr):
45+
message["attributes"] = attributes
46+
else:
47+
message["attributes"] = attributes.copy()
48+
49+
if ordering_key is not None:
50+
message["orderingKey"] = ordering_key
51+
52+
return message
53+
54+
55+
def pubsub_publish(
56+
*,
57+
topic: str | ArgExpr,
58+
data: str | ArgExpr | None = None,
59+
attributes: Mapping[str, str | ArgExpr] | ArgExpr | None = None,
60+
ordering_key: str | ArgExpr | None = None,
61+
messages: list[dict[str, Any]] | None = None,
62+
) -> ConnectorCall:
63+
"""Create a ConnectorCall for the Pub/Sub publish API."""
64+
65+
if isinstance(topic, str):
66+
_validate_topic_path(topic)
67+
68+
if messages is None:
69+
if data is None:
70+
raise ValueError("Provide either `data` or `messages` when publishing to Pub/Sub")
71+
messages = [pubsub_message(data=data, attributes=attributes, ordering_key=ordering_key)]
72+
73+
request: dict[str, Any] = {
74+
"topic": topic,
75+
"messages": messages,
76+
}
77+
78+
args: dict[str, Any] = {
79+
"connector": "googleapis.pubsub.v1",
80+
"operation": "projects.topics.publish",
81+
"request": request,
82+
}
83+
84+
return ConnectorCall(call="connectors.googleapis.pubsub.v1.projects.topics.publish", args=args)
85+
86+
87+
class PubSubPublishResult(BaseModel):
88+
message_ids: list[str]
89+
90+
91+
def pubsub_publish_step(
92+
*,
93+
name: str,
94+
topic: str,
95+
input_model: type[InT],
96+
output_model: type[OutT] | None = None,
97+
data: str | ArgExpr | None = None,
98+
data_field: str | None = "payload",
99+
attributes: Mapping[str, str | ArgExpr] | ArgExpr | None = None,
100+
attributes_field: str | None = None,
101+
ordering_key: str | ArgExpr | None = None,
102+
ordering_key_field: str | None = None,
103+
retry: RetryPolicy | None = None,
104+
timeout: timedelta | None = None,
105+
) -> ConnectorStep[InT, OutT]:
106+
"""High-level helper that returns a ConnectorStep publishing to Pub/Sub."""
107+
108+
if isinstance(topic, str):
109+
_validate_topic_path(topic)
110+
111+
data_expr: str | ArgExpr | None = data
112+
if data_expr is None:
113+
if not data_field:
114+
raise ValueError("Provide either `data` or `data_field` for pubsub_publish_step")
115+
data_expr = Arg.param(data_field)
116+
117+
attributes_expr = attributes
118+
if attributes_expr is None and attributes_field is not None:
119+
attributes_expr = Arg.param(attributes_field)
120+
121+
ordering_expr = ordering_key
122+
if ordering_expr is None and ordering_key_field is not None:
123+
ordering_expr = Arg.param(ordering_key_field)
124+
125+
call = pubsub_publish(
126+
topic=topic,
127+
data=data_expr,
128+
attributes=attributes_expr,
129+
ordering_key=ordering_expr,
130+
)
131+
132+
result_model = output_model or PubSubPublishResult # type: ignore[assignment]
133+
134+
return ConnectorStep(
135+
name=name,
136+
input_model=input_model,
137+
output_model=result_model,
138+
call=call,
139+
retry=retry,
140+
timeout=timeout,
141+
)
142+
143+
144+
__all__ = ["pubsub_publish", "pubsub_message", "pubsub_publish_step", "PubSubPublishResult"]

src/fastapi_cloudflow/core/step.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pydantic import BaseModel
88

99
from fastapi_cloudflow.core.arg import ArgExpr
10-
from fastapi_cloudflow.core.types import Context, RetryPolicy
10+
from fastapi_cloudflow.core.types import ConnectorCall, Context, RetryPolicy
1111

1212
InT = TypeVar("InT", bound=BaseModel)
1313
OutT = TypeVar("OutT", bound=BaseModel)
@@ -138,6 +138,22 @@ def __init__(
138138
self.auth = auth
139139

140140

141+
class ConnectorStep(Step[InT, OutT]):
142+
def __init__(
143+
self,
144+
name: str,
145+
input_model: type[InT],
146+
output_model: type[OutT],
147+
call: ConnectorCall,
148+
retry: RetryPolicy | None = None,
149+
timeout: timedelta | None = None,
150+
) -> None:
151+
super().__init__(
152+
name=name, input_model=input_model, output_model=output_model, fn=None, retry=retry, timeout=timeout
153+
)
154+
self.call = call
155+
156+
141157
class ModelAdapter(Step[InT, OutT]):
142158
def __init__(self, name: str, input_model: type[InT], output_model: type[OutT], mapping: dict[str, Any]) -> None:
143159
super().__init__(name=name, input_model=input_model, output_model=output_model, fn=None)

src/fastapi_cloudflow/core/types.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
from dataclasses import dataclass
4+
from typing import Any
45

56
from fastapi import Request
67

@@ -35,3 +36,9 @@ def idempotent_http() -> RetryPolicy:
3536
multiplier=2.0,
3637
predicate="http.default_retry_predicate",
3738
)
39+
40+
41+
@dataclass
42+
class ConnectorCall:
43+
call: str
44+
args: dict[str, Any]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
main:
2+
params:
3+
- payload
4+
steps:
5+
- call_prepare-message:
6+
call: http.post
7+
args:
8+
url: ${sys.get_env("BASE_URL") + "/steps/prepare-message"}
9+
body: ${payload}
10+
headers:
11+
X-Workflow-Name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
12+
Content-Type: application/json
13+
auth:
14+
type: OIDC
15+
audience: ${sys.get_env("BASE_URL")}
16+
result: res_0
17+
- set_payload_0:
18+
assign:
19+
- payload: ${res_0.body}
20+
- capture_run_id_0:
21+
assign:
22+
- run_id: ${res_0.headers["X-Workflow-Run-Id"]}
23+
- call_publish-topic:
24+
call: connectors.googleapis.pubsub.v1.projects.topics.publish
25+
args:
26+
connector: googleapis.pubsub.v1
27+
operation: projects.topics.publish
28+
request:
29+
topic: ${params.topic}
30+
messages:
31+
- data: ${base64.encode(params.payload)}
32+
result: res_1
33+
- set_payload_1:
34+
assign:
35+
- payload: ${res_1}
36+
- return_final:
37+
return: ${payload}

0 commit comments

Comments
 (0)