From dd95e69181353b8045bd88f68b104ed745c1cd93 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 7 May 2026 19:30:27 +0200 Subject: [PATCH 1/5] fix(appkit): decouple cache in-flight execution from per-caller abort signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cache in-flight deduplication map shared a single Promise across concurrent callers with the same cacheKey. When one caller's signal aborted (e.g. a React StrictMode mount/cleanup pair), fn()'s rejection cascaded through the shared promise to every other awaiter — including still-connected SSE consumers, which broadcast the abort as UPSTREAM_ERROR to the browser. Replace the bare promise map with a reference-counted in-flight entry owning its own AbortController. Callers pass their own callerSignal; the shared controller aborts only when every caller has bailed (refCount -> 0). Each caller's await is raced against its own signal so local aborts reject locally without poisoning the shared result. The catch block no longer wraps abort errors as ExecutionError.statementFailed when the shared controller has already aborted, since no live awaiter would observe them anyway. CacheInterceptor forwards context.signal as callerSignal and swaps context.signal to the cache's shared signal for the duration of fn() so the inner interceptor chain (timeout/retry/telemetry) and the underlying I/O (e.g. analytics SDK calls) observe the shared lifecycle rather than the per-request stream signal. Existing cache + plugin + stream + analytics test suites pass unchanged (715 tests). Signed-off-by: MarioCadenas --- packages/appkit/src/cache/index.ts | 154 ++++++++++++++++-- .../appkit/src/plugin/interceptors/cache.ts | 20 ++- 2 files changed, 154 insertions(+), 20 deletions(-) diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 873aada18..3ae84a6d3 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -12,6 +12,28 @@ import { InMemoryStorage, PersistentStorage } from "./storage"; const logger = createLogger("cache"); +/** + * Reference-counted in-flight cache execution entry. + * + * `sharedController` decouples the cached `fn()` from any single caller's + * abort signal. Callers join an in-flight entry by incrementing `refCount`; + * when a caller aborts, refCount is decremented. The shared controller is + * aborted only when refCount drops to 0 — i.e. all callers have abandoned + * the request. This prevents one caller's cancellation (e.g. React + * StrictMode unmount) from poisoning the in-flight result for other still- + * connected awaiters. + */ +interface InFlightEntry { + promise: Promise; + refCount: number; + sharedController: AbortController; +} + +function createAbortError(signal: AbortSignal): unknown { + if (signal.reason !== undefined) return signal.reason; + return new DOMException("The operation was aborted.", "AbortError"); +} + /** * Cache manager class to handle cache operations. * Can be used with in-memory storage or persistent storage (Lakebase). @@ -34,7 +56,7 @@ export class CacheManager { private storage: CacheStorage; private config: CacheConfig; - private inFlightRequests: Map>; + private inFlightRequests: Map>; private cleanupInProgress: boolean; private lastCleanupAttempt: number; @@ -174,20 +196,37 @@ export class CacheManager { } /** - * Get or execute a function and cache the result - * @param key - Cache key - * @param fn - Function to execute - * @param userKey - User key + * Get or execute a function and cache the result. + * + * Multiple concurrent callers with the same `cacheKey` are deduplicated + * onto a single in-flight execution. Each caller may pass its own + * `callerSignal`; the underlying `fn()` is run with a shared, internally + * managed `AbortSignal` that aborts only when *all* callers have + * abandoned the request (reference counted). This decouples a single + * caller's cancellation (e.g. React StrictMode unmount) from the shared + * result, so other still-connected callers receive the cached value + * normally. + * + * @param key - Cache key parts + * @param fn - Function to execute. Receives the cache-owned shared signal; + * pass it through to the underlying I/O so the work is cancelled when + * no caller is left waiting. + * @param userKey - User key for cache namespacing * @param options - Options for the cache * @returns Promise of the result */ async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { - if (!this.config.enabled) return fn(); + if (!this.config.enabled) return fn(options?.callerSignal); + + const callerSignal = options?.callerSignal; + if (callerSignal?.aborted) { + throw createAbortError(callerSignal); + } const cacheKey = this.generateKey(key, userKey); @@ -218,9 +257,14 @@ export class CacheManager { return cached.value; } - // check if the value is being processed by another request - const inFlight = this.inFlightRequests.get(cacheKey); - if (inFlight) { + // check if the value is being processed by another request — join + // the existing in-flight entry under reference counting so this + // caller's abort doesn't poison the shared result. + const existing = this.inFlightRequests.get(cacheKey) as + | InFlightEntry + | undefined; + if (existing) { + existing.refCount++; span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); span.addEvent("cache.deduplication_used", { @@ -239,10 +283,10 @@ export class CacheManager { }); span.end(); - return inFlight as Promise; + return await this._waitWithRefCount(existing, callerSignal); } - // cache miss - execute function + // cache miss - execute function under a shared abort controller span.setAttribute("cache.hit", false); span.addEvent("cache.miss", { "cache.key": cacheKey }); this.telemetryMetrics.cacheMissCount.add(1, { @@ -254,7 +298,14 @@ export class CacheManager { cache_key: cacheKey, }); - const promise = fn() + const sharedController = new AbortController(); + const entry: InFlightEntry = { + promise: undefined as unknown as Promise, + refCount: 1, + sharedController, + }; + + entry.promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); span.addEvent("cache.value_stored", { @@ -266,8 +317,13 @@ export class CacheManager { .catch((error) => { span.recordException(error); span.setStatus({ code: SpanStatusCode.ERROR }); - // Preserve AppKit errors and Databricks API errors (with status codes) - // so route handlers can map them to proper HTTP responses. + // If the shared controller aborted, all callers have already + // abandoned the request (or are about to via their own signals) + // — propagate the original error without wrapping. No live + // awaiter will observe this rejection. + if (sharedController.signal.aborted) { + throw error; + } if (error instanceof AppKitError || error instanceof ApiError) { throw error; } @@ -279,9 +335,14 @@ export class CacheManager { this.inFlightRequests.delete(cacheKey); }); - this.inFlightRequests.set(cacheKey, promise); + // Suppress unhandled rejection warnings when every caller bailed + // before fn() resolved (their own promises rejected via + // _waitWithRefCount; the underlying entry.promise has no awaiter). + entry.promise.catch(() => {}); + + this.inFlightRequests.set(cacheKey, entry as InFlightEntry); - const result = await promise; + const result = await this._waitWithRefCount(entry, callerSignal); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { @@ -296,6 +357,63 @@ export class CacheManager { ); } + /** + * Wait on an in-flight entry, racing the underlying promise against the + * caller's abort signal. When the caller aborts, the entry's refCount is + * decremented; if it hits zero the shared controller is aborted so the + * underlying `fn()` can stop. Other callers continue to await the same + * entry and receive the result when it arrives. + */ + private _waitWithRefCount( + entry: InFlightEntry, + callerSignal?: AbortSignal, + ): Promise { + if (!callerSignal) return entry.promise; + + return new Promise((resolve, reject) => { + let settled = false; + + const release = () => { + if (entry.refCount > 0) entry.refCount--; + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + entry.sharedController.abort( + callerSignal.reason ?? "all cache callers aborted", + ); + } + }; + + const onAbort = () => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + release(); + reject(createAbortError(callerSignal)); + }; + + if (callerSignal.aborted) { + onAbort(); + return; + } + + callerSignal.addEventListener("abort", onAbort, { once: true }); + + entry.promise.then( + (value) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + resolve(value); + }, + (error) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); + } + /** * Get a cached value * @param key - Cache key diff --git a/packages/appkit/src/plugin/interceptors/cache.ts b/packages/appkit/src/plugin/interceptors/cache.ts index b8af0ca32..d1637afed 100644 --- a/packages/appkit/src/plugin/interceptors/cache.ts +++ b/packages/appkit/src/plugin/interceptors/cache.ts @@ -18,11 +18,27 @@ export class CacheInterceptor implements ExecutionInterceptor { return fn(); } + const callerSignal = context.signal; + + // The cache may dedupe this request onto a shared in-flight execution. + // Swap context.signal to the cache-owned shared signal for the duration + // of fn() so the inner interceptor chain (timeout/retry/telemetry) and + // the underlying I/O observe abort only when *all* callers have left, + // not just this one. Without this swap, mount #1's abort under React + // StrictMode poisons mount #2's joined inflight result. return this.cacheManager.getOrExecute( this.config.cacheKey, - fn, + async (sharedSignal) => { + const previousSignal = context.signal; + context.signal = sharedSignal; + try { + return await fn(); + } finally { + context.signal = previousSignal; + } + }, context.userKey, - { ttl: this.config.ttl }, + { ttl: this.config.ttl, callerSignal }, ); } } From 8457ca2c2fd74782390ba33ad9b35d8fc4234cec Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Fri, 22 May 2026 16:11:13 +0200 Subject: [PATCH 2/5] test(appkit): add abort/ref-counting tests and update stale mock signatures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - cache-manager.test.ts — add 5 abort/ref-counting tests covering: caller abort while another waits, all callers abort triggers shared abort, pre-aborted signal rejects immediately, single caller abort mid-flight, deduped caller abort doesn't poison first caller - cache-manager.test.ts — regression test for fn ignoring signal param - cache.test.ts — MockCacheManager passes shared signal to fn(), add signal-swap test verifying context.signal is swapped during execution and restored afterward - 16 plugin test files — update stale getOrExecute mock signatures from fn: () => Promise to fn: (signal?: AbortSignal) => Promise Signed-off-by: Pawel Kosiec --- .../src/cache/tests/cache-manager.test.ts | 185 ++++++++++++++++++ .../appkit/src/plugin/tests/cache.test.ts | 42 +++- .../agents/tests/approval-config.test.ts | 5 +- .../agents/tests/approval-route.test.ts | 5 +- .../agents/tests/dispatch-tool-call.test.ts | 5 +- .../plugins/agents/tests/dos-limits.test.ts | 5 +- .../agents/tests/route-handler-errors.test.ts | 5 +- .../files/tests/download-endpoint.test.ts | 5 +- .../files/tests/error-handling.test.ts | 5 +- .../src/plugins/files/tests/mkdir.test.ts | 5 +- .../files/tests/path-validation.test.ts | 5 +- .../src/plugins/files/tests/plugin.test.ts | 5 +- .../plugins/files/tests/raw-endpoint.test.ts | 5 +- .../src/plugins/files/tests/shutdown.test.ts | 5 +- .../src/plugins/files/tests/upload.test.ts | 5 +- .../src/plugins/genie/tests/genie.test.ts | 5 +- .../tests/lakebase-agent-tool.test.ts | 5 +- .../src/plugins/serving/tests/serving.test.ts | 5 +- 18 files changed, 274 insertions(+), 33 deletions(-) diff --git a/packages/appkit/src/cache/tests/cache-manager.test.ts b/packages/appkit/src/cache/tests/cache-manager.test.ts index 3916ef1eb..f5a65fa00 100644 --- a/packages/appkit/src/cache/tests/cache-manager.test.ts +++ b/packages/appkit/src/cache/tests/cache-manager.test.ts @@ -378,6 +378,191 @@ describe("CacheManager", () => { expect(r2).toBe("result-2"); expect(fn).toHaveBeenCalledTimes(2); }); + + test("should work when fn ignores signal parameter (non-signal caller regression)", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + // Simulates direct callers like telemetry-example-plugin that pass + // a plain async function without accepting the shared signal. + const fn = vi.fn().mockResolvedValue("no-signal-result"); + + const result = await cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + }); + + expect(result).toBe("no-signal-result"); + expect(fn).toHaveBeenCalledTimes(1); + // The cache passes the shared AbortSignal even if the fn ignores it + expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal)); + }); + }); + + describe("abort / ref-counting", () => { + test("one caller aborts while another still waits — waiting caller resolves normally", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Abort caller 1 + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Resolve the underlying fn — caller 2 should still get the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("all callers abort — shared controller signal is aborted", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + expect(capturedSignal.aborted).toBe(false); + + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal still active — one caller remains + expect(capturedSignal.aborted).toBe(false); + + controller2.abort(); + await expect(p2).rejects.toThrow(); + + // Now shared signal should be aborted + expect(capturedSignal.aborted).toBe(true); + }); + + test("pre-aborted callerSignal throws immediately without executing fn", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const controller = new AbortController(); + controller.abort(); + + const fn = vi.fn().mockResolvedValue("should-not-run"); + + await expect( + cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }), + ).rejects.toThrow(); + + expect(fn).not.toHaveBeenCalled(); + }); + + test("single caller abort mid-flight rejects with abort error and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // With only one caller, shared controller should also abort + expect(capturedSignal.aborted).toBe(true); + }); + + test("deduped caller abort does not poison the first caller's result", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controllerDeduped = new AbortController(); + + // First caller — no signal (or non-aborting) + const p1 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Second caller joins with an abort signal + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controllerDeduped.signal, + }); + + // Abort the deduped caller + controllerDeduped.abort(); + await expect(p2).rejects.toThrow(); + + // Resolve underlying fn — first caller should get the result + resolveFn("first-caller-result"); + await expect(p1).resolves.toBe("first-caller-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); }); describe("disabled cache", () => { diff --git a/packages/appkit/src/plugin/tests/cache.test.ts b/packages/appkit/src/plugin/tests/cache.test.ts index 7e01e7787..12730f67b 100644 --- a/packages/appkit/src/plugin/tests/cache.test.ts +++ b/packages/appkit/src/plugin/tests/cache.test.ts @@ -20,9 +20,9 @@ class MockCacheManager { async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { const cacheKey = this.generateKey(key, userKey); const cached = await this.get(cacheKey); @@ -35,7 +35,8 @@ class MockCacheManager { return inFlight as Promise; } - const promise = fn() + const sharedController = new AbortController(); + const promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); return result; @@ -263,4 +264,39 @@ describe("CacheInterceptor", () => { expect(fn).toHaveBeenCalledTimes(2); }); + + test("should swap context.signal to shared signal during fn() and restore afterward", async () => { + const config: CacheConfig = { + enabled: true, + cacheKey: ["test", "signal-swap"], + }; + const callerSignal = new AbortController().signal; + const contextWithSignal: InterceptorContext = { + metadata: new Map(), + userKey: "service", + signal: callerSignal, + }; + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); + + let signalDuringExecution: AbortSignal | undefined; + const fn = vi.fn().mockImplementation(async () => { + signalDuringExecution = contextWithSignal.signal; + return "result"; + }); + + await interceptor.intercept(fn, contextWithSignal); + + // During fn(), context.signal should have been swapped to the shared signal + expect(signalDuringExecution).toBeDefined(); + expect(signalDuringExecution).not.toBe(callerSignal); + expect(signalDuringExecution).toBeInstanceOf(AbortSignal); + + // After intercept completes, context.signal should be restored + expect(contextWithSignal.signal).toBe(callerSignal); + }); }); diff --git a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts index 0827da35e..855bf716b 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as unknown as typeof CacheManager.getInstanceSync; diff --git a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts index 6e090bd2f..61595fcbf 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts @@ -51,8 +51,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as any; diff --git a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts index 047836648..2767766c3 100644 --- a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts index aecf75fb9..8aba4ce1d 100644 --- a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts @@ -52,8 +52,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), // biome-ignore lint/suspicious/noExplicitAny: test mock diff --git a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts index 9ea5cb2bf..2fc493ef4 100644 --- a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts +++ b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts @@ -23,8 +23,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts index 5d7bc3bae..550c20be1 100644 --- a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/error-handling.test.ts b/packages/appkit/src/plugins/files/tests/error-handling.test.ts index c0ce31c41..a16e40d0a 100644 --- a/packages/appkit/src/plugins/files/tests/error-handling.test.ts +++ b/packages/appkit/src/plugins/files/tests/error-handling.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/mkdir.test.ts b/packages/appkit/src/plugins/files/tests/mkdir.test.ts index e6557acd2..8f667f5c0 100644 --- a/packages/appkit/src/plugins/files/tests/mkdir.test.ts +++ b/packages/appkit/src/plugins/files/tests/mkdir.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/path-validation.test.ts b/packages/appkit/src/plugins/files/tests/path-validation.test.ts index e4b4ac64f..8ef0cdb62 100644 --- a/packages/appkit/src/plugins/files/tests/path-validation.test.ts +++ b/packages/appkit/src/plugins/files/tests/path-validation.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/plugin.test.ts b/packages/appkit/src/plugins/files/tests/plugin.test.ts index da70b538b..3136a21c0 100644 --- a/packages/appkit/src/plugins/files/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/files/tests/plugin.test.ts @@ -44,8 +44,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(), }; diff --git a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts index c3de70aba..99641bce4 100644 --- a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/shutdown.test.ts b/packages/appkit/src/plugins/files/tests/shutdown.test.ts index dcf761c48..99eaa1e86 100644 --- a/packages/appkit/src/plugins/files/tests/shutdown.test.ts +++ b/packages/appkit/src/plugins/files/tests/shutdown.test.ts @@ -30,8 +30,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/upload.test.ts b/packages/appkit/src/plugins/files/tests/upload.test.ts index baf3aa549..725756a99 100644 --- a/packages/appkit/src/plugins/files/tests/upload.test.ts +++ b/packages/appkit/src/plugins/files/tests/upload.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index fcb2e167c..0af6c25b3 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -21,7 +21,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), diff --git a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts index 24d37341f..5f96d6efe 100644 --- a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts +++ b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts @@ -15,8 +15,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/plugins/serving/tests/serving.test.ts b/packages/appkit/src/plugins/serving/tests/serving.test.ts index 8fbe79bba..216c2b727 100644 --- a/packages/appkit/src/plugins/serving/tests/serving.test.ts +++ b/packages/appkit/src/plugins/serving/tests/serving.test.ts @@ -20,7 +20,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), From 83d7499719150facd5eacad81e87afe3d60af693 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Fri, 22 May 2026 17:03:06 +0200 Subject: [PATCH 3/5] fix(appkit): prevent race conditions in cache ref-counted abort handling - Skip joining in-flight entries whose sharedController is already aborted - Guard finally-block delete to only remove its own entry, not a replacement - Remove early span.end() in dedup path (let outer finally handle it) - Add edge-case tests: fn rejection, post-resolve abort, fresh-after-abort Signed-off-by: Pawel Kosiec --- packages/appkit/src/cache/index.ts | 7 +- .../src/cache/tests/cache-manager.test.ts | 119 +++++++++++++++++- 2 files changed, 122 insertions(+), 4 deletions(-) diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 3ae84a6d3..c4ae8d107 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -263,7 +263,7 @@ export class CacheManager { const existing = this.inFlightRequests.get(cacheKey) as | InFlightEntry | undefined; - if (existing) { + if (existing && !existing.sharedController.signal.aborted) { existing.refCount++; span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); @@ -282,7 +282,6 @@ export class CacheManager { cache_deduplication: true, }); - span.end(); return await this._waitWithRefCount(existing, callerSignal); } @@ -332,7 +331,9 @@ export class CacheManager { ); }) .finally(() => { - this.inFlightRequests.delete(cacheKey); + if (this.inFlightRequests.get(cacheKey) === entry) { + this.inFlightRequests.delete(cacheKey); + } }); // Suppress unhandled rejection warnings when every caller bailed diff --git a/packages/appkit/src/cache/tests/cache-manager.test.ts b/packages/appkit/src/cache/tests/cache-manager.test.ts index f5a65fa00..3222623d8 100644 --- a/packages/appkit/src/cache/tests/cache-manager.test.ts +++ b/packages/appkit/src/cache/tests/cache-manager.test.ts @@ -424,7 +424,11 @@ describe("CacheManager", () => { callerSignal: controller2.signal, }); - // Abort caller 1 + // Wait for fn to be invoked and both callers to join the entry + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + await new Promise((r) => setTimeout(r, 0)); + + // Abort caller 1 — caller 2 still holds a ref controller1.abort(); await expect(p1).rejects.toThrow(); @@ -563,6 +567,119 @@ describe("CacheManager", () => { expect(fn).toHaveBeenCalledTimes(1); }); + + test("fn rejects while multiple callers wait — all receive the error", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let rejectFn!: (error: Error) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((_resolve, reject) => { + rejectFn = reject; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(rejectFn).toBeDefined()); + + const networkError = new Error("network failure"); + rejectFn(networkError); + + await expect(p1).rejects.toThrow("network failure"); + await expect(p2).rejects.toThrow("network failure"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("caller aborts after promise already resolved — gets the resolved value", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller = new AbortController(); + + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(resolveFn).toBeDefined()); + + // Resolve first, then abort + resolveFn("already-resolved"); + // Allow microtask to settle the promise + await new Promise((r) => setTimeout(r, 0)); + controller.abort(); + + await expect(p).resolves.toBe("already-resolved"); + }); + + test("new caller after previous entry fully aborted gets fresh execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let callCount = 0; + const capturedSignals: AbortSignal[] = []; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + callCount++; + capturedSignals.push(signal); + const currentCall = callCount; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + // Auto-resolve after a tick if not aborted (for the second call) + setTimeout(() => { + if (!signal.aborted) resolve(`result-${currentCall}`); + }, 10); + }), + ); + + const controller1 = new AbortController(); + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Abort the only caller — shared controller should also abort + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Allow the entry.promise .finally() to clean up + await new Promise((r) => setTimeout(r, 0)); + + // New caller arrives — should get a fresh execution, not the aborted one + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + await expect(p2).resolves.toBe("result-2"); + + expect(fn).toHaveBeenCalledTimes(2); + expect(capturedSignals[0].aborted).toBe(true); + expect(capturedSignals[1].aborted).toBe(false); + }); }); describe("disabled cache", () => { From 99ae46c711e7d4b83327598cd467862ac67148c2 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Fri, 22 May 2026 19:06:12 +0200 Subject: [PATCH 4/5] fix(appkit): add grace period before aborting shared cache execution When all callers leave an in-flight cache entry, delay the sharedController.abort() by 100ms instead of firing immediately. This allows React StrictMode remounts to join the existing execution before it is cancelled, preventing unnecessary SQL query aborts and error logs in development. - Add abortTimer to InFlightEntry with 100ms grace period - Clear abort timer when a new caller joins the dedup path - Skip joining aborted entries (guard against expired grace period) - Guard finally-block delete to only remove own entry - Remove early span.end() in dedup path - Add grace period tests + edge-case abort tests Signed-off-by: Pawel Kosiec --- packages/appkit/src/cache/index.ts | 18 ++- .../src/cache/tests/cache-manager.test.ts | 112 ++++++++++++++++-- 2 files changed, 118 insertions(+), 12 deletions(-) diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index c4ae8d107..b4248202e 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -27,6 +27,7 @@ interface InFlightEntry { promise: Promise; refCount: number; sharedController: AbortController; + abortTimer?: ReturnType; } function createAbortError(signal: AbortSignal): unknown { @@ -265,6 +266,11 @@ export class CacheManager { | undefined; if (existing && !existing.sharedController.signal.aborted) { existing.refCount++; + // Cancel any pending abort timer — a new caller has joined + if (existing.abortTimer) { + clearTimeout(existing.abortTimer); + existing.abortTimer = undefined; + } span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); span.addEvent("cache.deduplication_used", { @@ -377,9 +383,15 @@ export class CacheManager { const release = () => { if (entry.refCount > 0) entry.refCount--; if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { - entry.sharedController.abort( - callerSignal.reason ?? "all cache callers aborted", - ); + // Grace period: delay abort so a StrictMode remount can join + // the in-flight entry before the shared execution is cancelled. + entry.abortTimer = setTimeout(() => { + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + entry.sharedController.abort( + callerSignal.reason ?? "all cache callers aborted", + ); + } + }, 100); } }; diff --git a/packages/appkit/src/cache/tests/cache-manager.test.ts b/packages/appkit/src/cache/tests/cache-manager.test.ts index 3222623d8..3f8fa5dbd 100644 --- a/packages/appkit/src/cache/tests/cache-manager.test.ts +++ b/packages/appkit/src/cache/tests/cache-manager.test.ts @@ -480,7 +480,11 @@ describe("CacheManager", () => { controller2.abort(); await expect(p2).rejects.toThrow(); - // Now shared signal should be aborted + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); expect(capturedSignal.aborted).toBe(true); }); @@ -529,7 +533,11 @@ describe("CacheManager", () => { controller.abort(); await expect(p).rejects.toThrow(); - // With only one caller, shared controller should also abort + // Grace period: shared controller not yet aborted + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); expect(capturedSignal.aborted).toBe(true); }); @@ -652,10 +660,13 @@ describe("CacheManager", () => { signal.addEventListener("abort", () => reject(new DOMException("aborted", "AbortError")), ); - // Auto-resolve after a tick if not aborted (for the second call) - setTimeout(() => { - if (!signal.aborted) resolve(`result-${currentCall}`); - }, 10); + // Only auto-resolve for the second call (first is held open + // until the abort timer fires and cancels it) + if (currentCall > 1) { + setTimeout(() => { + if (!signal.aborted) resolve(`result-${currentCall}`); + }, 10); + } }), ); @@ -665,12 +676,12 @@ describe("CacheManager", () => { callerSignal: controller1.signal, }); - // Abort the only caller — shared controller should also abort + // Abort the only caller — shared controller should abort after grace period controller1.abort(); await expect(p1).rejects.toThrow(); - // Allow the entry.promise .finally() to clean up - await new Promise((r) => setTimeout(r, 0)); + // Wait for grace period + .finally() cleanup + await new Promise((r) => setTimeout(r, 200)); // New caller arrives — should get a fresh execution, not the aborted one const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); @@ -680,6 +691,89 @@ describe("CacheManager", () => { expect(capturedSignals[0].aborted).toBe(true); expect(capturedSignals[1].aborted).toBe(false); }); + + test("grace period: new caller joins before timer fires — no abort, single execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve) => { + capturedSignal = signal; + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + + // Mount #1: starts execution + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Wait for fn to be invoked + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + + // Mount #1 unmounts — abort fires, grace period starts + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal should NOT be aborted yet (grace period active) + expect(capturedSignal.aborted).toBe(false); + + // Mount #2 arrives within the grace period — joins the same entry + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Resolve the underlying fn — mount #2 gets the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + // fn was only called once — no duplicate execution + expect(fn).toHaveBeenCalledTimes(1); + // Shared signal was never aborted + expect(capturedSignal.aborted).toBe(false); + }); + + test("grace period: no new caller arrives — timer fires and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((_resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire (100ms + buffer) + await new Promise((r) => setTimeout(r, 150)); + + // Now shared signal should be aborted + expect(capturedSignal.aborted).toBe(true); + }); }); describe("disabled cache", () => { From 16884fe1cf850fa486a28140a61764994be18a6f Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Fri, 22 May 2026 22:30:19 +0200 Subject: [PATCH 5/5] fix(appkit): use proper AbortError for stream disconnect abort reasons StreamManager.abort() was passing string reasons ("All clients disconnected", "Server shutdown") which caused _categorizeError to misclassify client disconnects as INTERNAL_ERROR instead of STREAM_ABORTED. Use DOMException with name "AbortError" so the error is correctly categorized and logged at info level. Signed-off-by: Pawel Kosiec --- packages/appkit/src/stream/stream-manager.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 901e0b46c..a5b3a4b32 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -76,7 +76,9 @@ export class StreamManager { abortAll(): void { this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); - operation.controller.abort("Server shutdown"); + operation.controller.abort( + new DOMException("Server shutdown", "AbortError"), + ); }); this.activeOperations.clear(); this.streamRegistry.clear(); @@ -140,7 +142,9 @@ export class StreamManager { // Stop the generator when no clients remain if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - streamEntry.abortController.abort("All clients disconnected"); + streamEntry.abortController.abort( + new DOMException("All clients disconnected", "AbortError"), + ); } // cleanup if stream is completed and no clients are connected @@ -227,7 +231,9 @@ export class StreamManager { // Stop the generator when no clients remain so polling loops // (e.g. jobs runAndWait) don't keep running in the background. if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - abortController.abort("Client disconnected"); + abortController.abort( + new DOMException("Client disconnected", "AbortError"), + ); } });