Skip to content

Commit a7b5ad3

Browse files
committed
feat(examples): add playful flows and FastAPI app wiring
1 parent 7fb4171 commit a7b5ad3

11 files changed

Lines changed: 363 additions & 0 deletions

File tree

examples/__init__.py

Whitespace-only changes.

examples/app/Dockerfile

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
FROM python:3.13-slim
2+
3+
# Install fastapi-cloudflow from local package sources (only what's needed)
4+
WORKDIR /opt/fastapi-cloudflow
5+
COPY pyproject.toml ./
6+
COPY README.md ./
7+
COPY src/ ./src/
8+
RUN pip install --no-cache-dir .
9+
10+
# Set up the application directory with only the app code
11+
WORKDIR /app
12+
COPY examples/app/ .
13+
14+
# Install application dependencies
15+
RUN pip install --no-cache-dir -r requirements.txt
16+
17+
# Ensure the app root is on Python import path for CLI/tools and make Python friendlier in containers
18+
ENV PYTHONPATH=/app \
19+
PYTHONDONTWRITEBYTECODE=1 \
20+
PYTHONUNBUFFERED=1
21+
22+
# Expose port (Cloud Run will override this with PORT env var)
23+
EXPOSE 8080
24+
25+
# Run the application using PORT environment variable (Cloud Run standard)
26+
CMD uvicorn main:app --host 0.0.0.0 --port ${PORT:-8080}

examples/app/__init__.py

Whitespace-only changes.

examples/app/flows/echo_name.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel, Field
4+
5+
from fastapi_cloudflow import Arg, Context, HttpStep, step, workflow
6+
7+
8+
class EchoIn(BaseModel):
9+
name: str
10+
11+
12+
class HttpbinRes(BaseModel):
13+
# Avoid shadowing BaseModel methods by using an alias
14+
model_config = {"populate_by_name": True}
15+
json_: dict = Field(alias="json")
16+
17+
18+
class EchoOut(BaseModel):
19+
echoed_name: str
20+
21+
22+
class NameShout(BaseModel):
23+
name_upper: str
24+
length: int
25+
26+
27+
httpbin_echo = HttpStep(
28+
name="external-echo",
29+
input_model=EchoIn,
30+
output_model=HttpbinRes,
31+
method="POST",
32+
# Use a public echo endpoint (httpbin) via env, default set at deploy
33+
url=Arg.env("ECHO_URL"),
34+
)
35+
36+
37+
@step(name="extract-name")
38+
async def adapt_echo(ctx: Context, data: HttpbinRes) -> EchoOut:
39+
# httpbin returns payload under the "json" key
40+
name = str(data.json_.get("name", ""))
41+
return EchoOut(echoed_name=name)
42+
43+
44+
@step(name="name-shout")
45+
async def name_shout(ctx: Context, data: EchoOut) -> NameShout:
46+
up = data.echoed_name.upper()
47+
return NameShout(name_upper=up, length=len(up))
48+
49+
50+
ECHO_NAME_FLOW = (workflow("echo-name-flow") >> httpbin_echo >> adapt_echo >> name_shout).build()

examples/app/flows/jokes.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel, Field
4+
5+
from fastapi_cloudflow import Context, HttpStep, step, workflow
6+
7+
8+
class EmptyIn(BaseModel):
9+
pass
10+
11+
12+
class JokeAPIRes(BaseModel):
13+
id: str
14+
joke: str
15+
status: int
16+
17+
18+
class JokeBits(BaseModel):
19+
setup: str
20+
punch: str
21+
22+
23+
joke_fetch = HttpStep(
24+
name="joke-fetch",
25+
input_model=EmptyIn,
26+
output_model=JokeAPIRes,
27+
method="GET",
28+
url="https://icanhazdadjoke.com/",
29+
headers={"Accept": "application/json"},
30+
)
31+
32+
33+
@step(name="joke-split")
34+
async def joke_split(ctx: Context, data: JokeAPIRes) -> JokeBits:
35+
j = data.joke or ""
36+
mid = len(j) // 2
37+
return JokeBits(setup=j[:mid], punch=j[mid:])
38+
39+
40+
class JokeRated(BaseModel):
41+
setup: str
42+
punch: str
43+
rating: int = Field(ge=0, le=10)
44+
45+
46+
@step(name="joke-rate")
47+
async def joke_rate(ctx: Context, data: JokeBits) -> JokeRated:
48+
rating = (len(data.setup) + len(data.punch)) % 11
49+
return JokeRated(setup=data.setup, punch=data.punch, rating=rating)
50+
51+
52+
JOKE_FLOW = (workflow("joke-flow") >> joke_fetch >> joke_split >> joke_rate).build()

examples/app/flows/order.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from __future__ import annotations
2+
3+
from decimal import Decimal
4+
from uuid import uuid4
5+
6+
from pydantic import BaseModel
7+
8+
from fastapi_cloudflow import Context, step, workflow
9+
10+
11+
class CreateOrder(BaseModel):
12+
account_id: int
13+
sku: str
14+
qty: int
15+
16+
17+
class OrderDraft(BaseModel):
18+
order_id: str
19+
price: Decimal
20+
21+
22+
class PaymentAuth(BaseModel):
23+
order_id: str
24+
status: str
25+
26+
27+
@step(name="price-order")
28+
async def price_order(ctx: Context, data: CreateOrder) -> OrderDraft:
29+
return OrderDraft(order_id=f"o-{uuid4().hex}", price=Decimal("12.34"))
30+
31+
32+
@step(name="auth-payment")
33+
async def auth_payment(ctx: Context, data: OrderDraft) -> PaymentAuth:
34+
return PaymentAuth(order_id=data.order_id, status="approved")
35+
36+
37+
ORDER_FLOW = (workflow("order-flow") >> price_order >> auth_payment).build()

examples/app/flows/payments.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel
4+
5+
from fastapi_cloudflow import Arg, AssignStep, Context, HttpStep, step, workflow
6+
7+
8+
class Cart(BaseModel):
9+
total: float
10+
currency: str
11+
12+
13+
class PSPReq(BaseModel):
14+
amount: float
15+
currency: str
16+
17+
18+
class PSPRes(BaseModel):
19+
status: str
20+
psp_id: str
21+
22+
23+
@step(name="validate-cart")
24+
async def validate_cart(ctx: Context, data: Cart) -> Cart:
25+
assert data.total >= 0
26+
return data
27+
28+
29+
to_psp = AssignStep("cart->psp", Cart, PSPReq, expr={"amount": "${payload.total}", "currency": "${payload.currency}"})
30+
31+
psp = HttpStep(
32+
"psp-charge",
33+
PSPReq,
34+
PSPRes,
35+
method="POST",
36+
url=Arg.env("PSP_URL") / "charge",
37+
auth={"type": "OIDC", "audience": Arg.env("PSP_URL")},
38+
)
39+
40+
41+
class ChargeResult(BaseModel):
42+
ok: bool
43+
txn_id: str | None
44+
45+
46+
@step(name="summarize-charge")
47+
async def summarize_charge(ctx: Context, data: PSPRes) -> ChargeResult:
48+
return ChargeResult(ok=(data.status == "approved"), txn_id=data.psp_id)
49+
50+
51+
PAYMENT_FLOW = (workflow("payment-flow") >> validate_cart >> to_psp >> psp >> summarize_charge).build()

examples/app/flows/post_story.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel
4+
5+
from fastapi_cloudflow import Context, HttpStep, step, workflow
6+
7+
8+
class StoryIn(BaseModel):
9+
topic: str
10+
mood: str
11+
author_id: int
12+
13+
14+
class PostOut(BaseModel):
15+
id: int
16+
title: str
17+
body: str
18+
userId: int
19+
20+
21+
class PostSummary(BaseModel):
22+
id: int
23+
slug: str
24+
short_title: str
25+
26+
27+
@step(name="build-story")
28+
async def build_story(ctx: Context, data: StoryIn) -> PostOut:
29+
title = f"{data.topic} in a {data.mood} mood"
30+
body = f"Once upon a time about {data.topic}"
31+
return PostOut(id=0, title=title, body=body, userId=data.author_id)
32+
33+
34+
jp_create_post = HttpStep(
35+
name="create-post",
36+
input_model=PostOut,
37+
output_model=PostOut,
38+
method="POST",
39+
url="https://jsonplaceholder.typicode.com/posts",
40+
)
41+
42+
43+
@step(name="summarize-post")
44+
async def summarize_post(ctx: Context, data: PostOut) -> PostSummary:
45+
slug = f"{data.userId}-{(data.title or '').lower().replace(' ', '-')[:24]}"
46+
short = (data.title or "")[:16]
47+
return PostSummary(id=data.id, slug=slug, short_title=short)
48+
49+
50+
POST_STORY_FLOW = (workflow("post-story-flow") >> build_story >> jp_create_post >> summarize_post).build()

examples/app/flows/user.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel, EmailStr, Field
4+
5+
from fastapi_cloudflow import Arg, AssignStep, Context, HttpStep, step, workflow
6+
7+
8+
class SignupRequest(BaseModel):
9+
email: EmailStr
10+
password: str = Field(min_length=8)
11+
12+
13+
class UserDraft(BaseModel):
14+
email: EmailStr
15+
hashed_password: str
16+
17+
18+
class IdentityReq(BaseModel):
19+
email: EmailStr
20+
secret: str
21+
22+
23+
class IdentityRes(BaseModel):
24+
external_id: str
25+
ok: bool
26+
27+
28+
class UserCreated(BaseModel):
29+
user_id: str
30+
email: EmailStr
31+
32+
33+
@step(name="hash-password")
34+
async def hash_password(ctx: Context, data: SignupRequest) -> UserDraft:
35+
return UserDraft(email=data.email, hashed_password=f"hashed:{data.password}")
36+
37+
38+
adapt_to_idp = AssignStep(
39+
"draft->idp",
40+
UserDraft,
41+
IdentityReq,
42+
expr={
43+
"email": "${payload.email}",
44+
"secret": "${payload.hashed_password}",
45+
},
46+
)
47+
48+
49+
idp_call = HttpStep(
50+
name="idp-call",
51+
input_model=IdentityReq,
52+
output_model=IdentityRes,
53+
method="POST",
54+
url=Arg.env("IDP_URL") / "signup",
55+
auth={"type": "OIDC", "audience": Arg.env("IDP_URL")},
56+
)
57+
58+
59+
@step(name="persist-user")
60+
async def persist_user(ctx: Context, data: IdentityRes) -> UserCreated:
61+
return UserCreated(user_id=data.external_id, email="user@example.com")
62+
63+
64+
USER_SIGNUP = (workflow("user-signup") >> hash_password >> adapt_to_idp >> idp_call >> persist_user).build()

examples/app/main.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from fastapi import FastAPI
2+
from flows import echo_name, jokes, order, payments, post_story, user # noqa: F401
3+
4+
from fastapi_cloudflow import attach_to_fastapi
5+
6+
7+
def create_app() -> FastAPI:
8+
app = FastAPI()
9+
# Import types used for stub endpoints
10+
from flows.payments import PSPReq, PSPRes
11+
from flows.user import IdentityReq, IdentityRes
12+
13+
@app.get("/health")
14+
def health() -> dict[str, str]: # noqa: D401
15+
return {"status": "ok"}
16+
17+
# Simple stubs so example HttpSteps resolve successfully during smoke tests
18+
@app.post("/signup", response_model=IdentityRes)
19+
def signup(req: IdentityReq) -> IdentityRes:
20+
return IdentityRes(external_id="u-0001", ok=True)
21+
22+
@app.post("/charge", response_model=PSPRes)
23+
def charge(req: PSPReq) -> PSPRes:
24+
return PSPRes(status="approved", psp_id="p-0001")
25+
26+
attach_to_fastapi(app)
27+
return app
28+
29+
30+
app = create_app()

0 commit comments

Comments
 (0)