diff --git a/.changeset/fuzzy-ravens-listen.md b/.changeset/fuzzy-ravens-listen.md new file mode 100644 index 00000000000..6fbc096a572 --- /dev/null +++ b/.changeset/fuzzy-ravens-listen.md @@ -0,0 +1,5 @@ +--- +"@effect/cluster": patch +--- + +Fix entity defect restarts losing active durable requests during replay. diff --git a/packages/cluster/src/internal/entityManager.ts b/packages/cluster/src/internal/entityManager.ts index 5744d9543ef..a57fc8c1a11 100644 --- a/packages/cluster/src/internal/entityManager.ts +++ b/packages/cluster/src/internal/entityManager.ts @@ -145,6 +145,8 @@ export const make = Effect.fnUntraced(function*< const activeRequests: EntityState["activeRequests"] = new Map() let defectRequestIds: Array = [] + let isRestartingAfterDefect = false + const withDefectRestartLock = Effect.unsafeMakeSemaphore(1).withPermits(1) // the server is stored in a ref, so if there is a defect, we can // swap the server without losing the active requests @@ -192,7 +194,11 @@ export const make = Effect.fnUntraced(function*< Exit.isInterrupted(response.exit) && (isShuttingDown || Uninterruptible.forServer(request.rpc.annotations)) ) { - if (!isShuttingDown) { + if (isRestartingAfterDefect && isShuttingDown) { + // Closing the old server during a defect restart interrupts active handlers. + // Keep durable requests registered so the new server can replay them below. + return Effect.void + } else if (!isShuttingDown) { return server.write(0, { ...request.message.envelope, id: RequestId(request.message.envelope.requestId), @@ -279,7 +285,9 @@ export const make = Effect.fnUntraced(function*< if (defectRequestIds.length > 0) { for (const id of defectRequestIds) { - const { lastSentChunk, message } = activeRequests.get(id)! + const request = activeRequests.get(id) + if (!request) continue + const { lastSentChunk, message } = request yield* server.write(0, { ...message.envelope, id: RequestId(message.envelope.requestId), @@ -298,9 +306,16 @@ export const make = Effect.fnUntraced(function*< ) function onDefect(cause: Cause.Cause): Effect.Effect { + return withDefectRestartLock(Effect.suspend(() => restartOnDefect(cause))).pipe( + Effect.catchAllCause(onDefect) + ) + } + + function restartOnDefect(cause: Cause.Cause): Effect.Effect { if (!activeServers.has(address.entityId)) { return endLatch.open } + isRestartingAfterDefect = true const effect = writeRef.unsafeRebuild() defectRequestIds = Array.from(activeRequests.keys()) return Effect.logError("Defect in entity, restarting", cause).pipe( @@ -311,7 +326,9 @@ export const make = Effect.fnUntraced(function*< address, runner: options.runnerAddress }), - Effect.catchAllCause(onDefect) + Effect.ensuring(Effect.sync(() => { + isRestartingAfterDefect = false + })) ) } diff --git a/packages/cluster/test/Sharding.test.ts b/packages/cluster/test/Sharding.test.ts index bcde4d7d8d0..731ee043d21 100644 --- a/packages/cluster/test/Sharding.test.ts +++ b/packages/cluster/test/Sharding.test.ts @@ -511,6 +511,24 @@ describe.concurrent("Sharding", () => { expect(result).toEqual(new User({ id: 123, name: "User 123" })) expect(state.layerBuilds.current).toEqual(2) }).pipe(Effect.provide(TestSharding))) + + it.scoped("restart on defect with another active request", () => + Effect.gen(function*() { + yield* TestClock.adjust(1) + const state = yield* TestEntityState + const makeClient = yield* TestEntity.client + const client = makeClient("1") + + const fiber = yield* client.NeverFork().pipe(Effect.fork) + yield* TestClock.adjust(1) + + MutableRef.set(state.defectTrigger, true) + const result = yield* client.GetUser({ id: 123 }) + expect(result).toEqual(new User({ id: 123, name: "User 123" })) + expect(state.layerBuilds.current).toEqual(2) + + yield* Fiber.interrupt(fiber) + }).pipe(Effect.provide(TestSharding))) }) const TestShardingConfig = ShardingConfig.layer({