Skip to content

Commit 4a7d95a

Browse files
committed
infra: drive rpc interruptibility from request type
1 parent 7f75dee commit 4a7d95a

4 files changed

Lines changed: 91 additions & 15 deletions

File tree

.changeset/curly-beds-sing.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect-app/infra": patch
3+
---
4+
5+
Use request-type RPC annotations to drive interruptibility and retry behavior, apply command uninterruptibility through the router wrapper path, and add an in-memory E2E test covering command vs query interruption behavior.

packages/infra/src/api/routing.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@ import { typedKeysOf, typedValuesOf } from "effect-app/utils"
1111
import { type Yieldable } from "effect/Effect"
1212
import { Rpc, RpcGroup, type RpcSerialization, RpcServer } from "effect/unstable/rpc"
1313
import { type LayerUtils } from "./layerUtils.js"
14-
import { type RouterMiddleware } from "./routing/middleware.js"
14+
import { RequestType as RequestTypeAnnotation, type RouterMiddleware } from "./routing/middleware.js"
1515

1616
export * from "./routing/middleware.js"
1717

18+
export const applyRequestTypeInterruptibility = <A, E, R>(
19+
requestType: "command" | "query",
20+
effect: Effect.Effect<A, E, R>
21+
) => requestType === "command" ? Rpc.uninterruptible(effect) : effect
22+
1823
// it's the result of extending S.Req setting success, config
1924
// it's a schema plus some metadata
2025
export type AnyRequestModule = S.Top & {
2126
_tag: string // unique identifier for the request module
27+
type: "command" | "query"
2228
config: any // ?
2329
success: S.Top // validates the success response
2430
error: S.Top // validates the failure response
@@ -386,12 +392,15 @@ export const makeRouter = <
386392
static success = S.toEncoded(resource.success)
387393
} as any
388394
: resource,
389-
(payload: any, headers: any) =>
390-
(handler.handler(payload, headers) as Effect.Effect<unknown, unknown, unknown>).pipe(
395+
(payload: any, headers: any) => {
396+
const effect = (handler.handler(payload, headers) as Effect.Effect<unknown, unknown, unknown>).pipe(
391397
Effect.withSpan(`Request.${meta.moduleName}.${resource._tag}`, {}, {
392398
captureStackTrace: () => handler.stack // capturing the handler stack is the main reason why we are doing the span here
393399
})
394400
)
401+
402+
return applyRequestTypeInterruptibility(resource.type, effect)
403+
}
395404
] as const
396405
return acc
397406
}, {} as any) as {
@@ -418,6 +427,7 @@ export const makeRouter = <
418427
return Rpc
419428
.make(resource._tag, { payload: resource, success: resource.success, error: resource.error })
420429
.annotate(middleware.requestContext, resource.config ?? {})
430+
.annotate(RequestTypeAnnotation, resource.type)
421431
})
422432
)
423433
.prefix(`${meta.moduleName}.`)

packages/infra/src/api/routing/middleware/middleware.ts

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import { Cause, Config, Effect, Layer, Schema } from "effect"
33
import { ConfigureInterruptibilityMiddleware, DevMode, DevModeMiddleware, LoggerMiddleware, RequestCacheMiddleware } from "effect-app/middleware"
44
import { RpcContextMap, type RpcMiddleware } from "effect-app/rpc"
55
import { pretty } from "effect-app/utils"
6+
import * as Context from "effect/Context"
67
import { type Rpc } from "effect/unstable/rpc"
78
import { logError, reportError } from "../../../errorReporter.js"
89
import { InfraLogger } from "../../../logger.js"
910
import { WithNsTransaction } from "../../../Store/SQL.js"
10-
import { determineMethod, isCommand } from "../utils.js"
1111

1212
const logRequestError = logError("Request")
1313
const reportRequestError = reportError("Request")
@@ -29,22 +29,20 @@ export const RequestCacheMiddlewareLive = Layer.succeed(
2929
const isOptimisticConcurrencyException = (input: unknown) =>
3030
typeof input === "object" && input !== null && "_tag" in input && input._tag === "OptimisticConcurrencyException"
3131

32+
export const RequestType = Context.Reference<"command" | "query">(
33+
"@effect-app/infra/api/routing/RequestType",
34+
{ defaultValue: () => "query" }
35+
)
36+
3237
export const ConfigureInterruptibilityMiddlewareLive = Layer.effect(
3338
ConfigureInterruptibilityMiddleware,
3439
Effect.gen(function*() {
35-
const cache = new Map()
36-
const getCached = (key: string, schema: Schema.Top) => {
37-
const existing = cache.get(key)
38-
if (existing) return existing
39-
const n = determineMethod(key, schema)
40-
cache.set(key, n)
41-
return n
42-
}
4340
return (effect, { rpc }) => {
44-
const method = getCached(rpc._tag, rpc.payloadSchema)
41+
const requestType = Context.get(rpc.annotations, RequestType)
42+
const isCommand = requestType === "command"
4543

46-
effect = isCommand(method)
47-
? Effect.retry(Effect.uninterruptible(effect), { times: 1, while: isOptimisticConcurrencyException })
44+
effect = isCommand
45+
? Effect.retry(effect, { times: 1, while: isOptimisticConcurrencyException })
4846
: Effect.interruptible(effect)
4947

5048
return effect
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { describe, expect, it } from "@effect/vitest"
2+
import { Effect, Fiber, Layer, Ref } from "effect"
3+
import { S } from "effect-app"
4+
import { ConfigureInterruptibilityMiddleware } from "effect-app/middleware"
5+
import { Rpc, RpcGroup, RpcTest } from "effect/unstable/rpc"
6+
import { applyRequestTypeInterruptibility } from "../src/api/routing.js"
7+
import { ConfigureInterruptibilityMiddlewareLive, RequestType } from "../src/api/routing/middleware.js"
8+
9+
const InterruptibilityRpcs = RpcGroup.make(
10+
Rpc
11+
.make("doCommand", { success: S.Void })
12+
.annotate(RequestType, "command")
13+
.middleware(ConfigureInterruptibilityMiddleware),
14+
Rpc
15+
.make("doQuery", { success: S.Void })
16+
.annotate(RequestType, "query")
17+
.middleware(ConfigureInterruptibilityMiddleware)
18+
)
19+
20+
const makeImplLayer = (commandDone: Ref.Ref<boolean>, queryDone: Ref.Ref<boolean>) =>
21+
InterruptibilityRpcs.toLayer({
22+
doCommand: () =>
23+
applyRequestTypeInterruptibility(
24+
"command",
25+
Effect.sleep("120 millis").pipe(Effect.andThen(Ref.set(commandDone, true)))
26+
),
27+
doQuery: () =>
28+
applyRequestTypeInterruptibility(
29+
"query",
30+
Effect.sleep("120 millis").pipe(Effect.andThen(Ref.set(queryDone, true)))
31+
)
32+
})
33+
34+
describe("routing interruptibility", () => {
35+
it.live(
36+
"e2e: command continues after client interrupt, query does not",
37+
() =>
38+
Effect.gen(function*() {
39+
const commandDone = yield* Ref.make(false)
40+
const queryDone = yield* Ref.make(false)
41+
42+
const client = yield* RpcTest
43+
.makeClient(InterruptibilityRpcs)
44+
.pipe(
45+
Effect.provide(
46+
Layer.mergeAll(makeImplLayer(commandDone, queryDone), ConfigureInterruptibilityMiddlewareLive)
47+
)
48+
)
49+
50+
const commandFiber = yield* Effect.forkDetach(client.doCommand())
51+
yield* Effect.sleep("20 millis")
52+
yield* Fiber.interrupt(commandFiber)
53+
yield* Effect.sleep("180 millis")
54+
expect(yield* Ref.get(commandDone)).toBe(true)
55+
56+
const queryFiber = yield* Effect.forkDetach(client.doQuery())
57+
yield* Effect.sleep("20 millis")
58+
yield* Fiber.interrupt(queryFiber)
59+
yield* Effect.sleep("180 millis")
60+
expect(yield* Ref.get(queryDone)).toBe(false)
61+
})
62+
)
63+
})

0 commit comments

Comments
 (0)