Skip to content

Commit abc7c55

Browse files
committed
feat(runtime): expose step endpoints and request-scoped workflow context
1 parent 6cd837a commit abc7c55

1 file changed

Lines changed: 70 additions & 0 deletions

File tree

src/fastapi_cloudflow/runtime.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import json
2+
import uuid
3+
from typing import Any
4+
5+
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
6+
from pydantic import ValidationError
7+
8+
from fastapi_cloudflow.core import Context, Step, WorkflowMeta, get_registry
9+
10+
11+
def _build_step_router() -> APIRouter:
12+
router = APIRouter(prefix="/steps")
13+
registry = get_registry()
14+
for step in registry.steps.values():
15+
if step.fn is None:
16+
continue
17+
18+
def make_handler(s: Step[Any, Any]):
19+
async def handler(request: Request, response: Response) -> Any:
20+
ctx: Context = request.state.context
21+
ctx.workflow.step = s.name
22+
try:
23+
payload_dict = await request.json()
24+
except json.JSONDecodeError as err:
25+
raise HTTPException(status_code=422, detail="Malformed JSON body") from err
26+
if payload_dict is None:
27+
raise HTTPException(status_code=422, detail="Request body required")
28+
# Accept either raw model or a wrapped {"payload": {...}} for compatibility with clients
29+
if (
30+
isinstance(payload_dict, dict)
31+
and "payload" in payload_dict
32+
and isinstance(payload_dict.get("payload"), dict)
33+
and len(payload_dict.keys()) == 1
34+
):
35+
payload_dict = payload_dict["payload"]
36+
try:
37+
body = s.input_model.model_validate(payload_dict)
38+
except ValidationError as err:
39+
raise HTTPException(status_code=422, detail=err.errors()) from err
40+
result = await s.fn(ctx, body) # type: ignore[call-arg]
41+
response.headers["X-Workflow-Run-Id"] = ctx.workflow.run_id or ""
42+
return result
43+
44+
return handler
45+
46+
router.add_api_route(
47+
f"/{step.name}",
48+
endpoint=make_handler(step),
49+
methods=["POST"],
50+
response_model=step.output_model,
51+
)
52+
return router
53+
54+
55+
def attach_to_fastapi(app: FastAPI) -> None:
56+
@app.middleware("http")
57+
async def inject_context(request: Request, call_next):
58+
run_id = request.headers.get("X-Workflow-Run-Id") or str(uuid.uuid4())
59+
name = request.headers.get("X-Workflow-Name")
60+
ctx = Context(request=request, workflow=WorkflowMeta(name=name, step=None, run_id=run_id))
61+
request.state.context = ctx
62+
return await call_next(request)
63+
64+
app.include_router(_build_step_router())
65+
66+
67+
def build_app() -> FastAPI:
68+
app = FastAPI()
69+
attach_to_fastapi(app)
70+
return app

0 commit comments

Comments
 (0)