diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 873aada18..b4248202e 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -12,6 +12,29 @@ import { InMemoryStorage, PersistentStorage } from "./storage"; const logger = createLogger("cache"); +/** + * Reference-counted in-flight cache execution entry. + * + * `sharedController` decouples the cached `fn()` from any single caller's + * abort signal. Callers join an in-flight entry by incrementing `refCount`; + * when a caller aborts, refCount is decremented. The shared controller is + * aborted only when refCount drops to 0 — i.e. all callers have abandoned + * the request. This prevents one caller's cancellation (e.g. React + * StrictMode unmount) from poisoning the in-flight result for other still- + * connected awaiters. + */ +interface InFlightEntry { + promise: Promise; + refCount: number; + sharedController: AbortController; + abortTimer?: ReturnType; +} + +function createAbortError(signal: AbortSignal): unknown { + if (signal.reason !== undefined) return signal.reason; + return new DOMException("The operation was aborted.", "AbortError"); +} + /** * Cache manager class to handle cache operations. * Can be used with in-memory storage or persistent storage (Lakebase). @@ -34,7 +57,7 @@ export class CacheManager { private storage: CacheStorage; private config: CacheConfig; - private inFlightRequests: Map>; + private inFlightRequests: Map>; private cleanupInProgress: boolean; private lastCleanupAttempt: number; @@ -174,20 +197,37 @@ export class CacheManager { } /** - * Get or execute a function and cache the result - * @param key - Cache key - * @param fn - Function to execute - * @param userKey - User key + * Get or execute a function and cache the result. + * + * Multiple concurrent callers with the same `cacheKey` are deduplicated + * onto a single in-flight execution. Each caller may pass its own + * `callerSignal`; the underlying `fn()` is run with a shared, internally + * managed `AbortSignal` that aborts only when *all* callers have + * abandoned the request (reference counted). This decouples a single + * caller's cancellation (e.g. React StrictMode unmount) from the shared + * result, so other still-connected callers receive the cached value + * normally. + * + * @param key - Cache key parts + * @param fn - Function to execute. Receives the cache-owned shared signal; + * pass it through to the underlying I/O so the work is cancelled when + * no caller is left waiting. + * @param userKey - User key for cache namespacing * @param options - Options for the cache * @returns Promise of the result */ async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { - if (!this.config.enabled) return fn(); + if (!this.config.enabled) return fn(options?.callerSignal); + + const callerSignal = options?.callerSignal; + if (callerSignal?.aborted) { + throw createAbortError(callerSignal); + } const cacheKey = this.generateKey(key, userKey); @@ -218,9 +258,19 @@ export class CacheManager { return cached.value; } - // check if the value is being processed by another request - const inFlight = this.inFlightRequests.get(cacheKey); - if (inFlight) { + // check if the value is being processed by another request — join + // the existing in-flight entry under reference counting so this + // caller's abort doesn't poison the shared result. + const existing = this.inFlightRequests.get(cacheKey) as + | InFlightEntry + | undefined; + if (existing && !existing.sharedController.signal.aborted) { + existing.refCount++; + // Cancel any pending abort timer — a new caller has joined + if (existing.abortTimer) { + clearTimeout(existing.abortTimer); + existing.abortTimer = undefined; + } span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); span.addEvent("cache.deduplication_used", { @@ -238,11 +288,10 @@ export class CacheManager { cache_deduplication: true, }); - span.end(); - return inFlight as Promise; + return await this._waitWithRefCount(existing, callerSignal); } - // cache miss - execute function + // cache miss - execute function under a shared abort controller span.setAttribute("cache.hit", false); span.addEvent("cache.miss", { "cache.key": cacheKey }); this.telemetryMetrics.cacheMissCount.add(1, { @@ -254,7 +303,14 @@ export class CacheManager { cache_key: cacheKey, }); - const promise = fn() + const sharedController = new AbortController(); + const entry: InFlightEntry = { + promise: undefined as unknown as Promise, + refCount: 1, + sharedController, + }; + + entry.promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); span.addEvent("cache.value_stored", { @@ -266,8 +322,13 @@ export class CacheManager { .catch((error) => { span.recordException(error); span.setStatus({ code: SpanStatusCode.ERROR }); - // Preserve AppKit errors and Databricks API errors (with status codes) - // so route handlers can map them to proper HTTP responses. + // If the shared controller aborted, all callers have already + // abandoned the request (or are about to via their own signals) + // — propagate the original error without wrapping. No live + // awaiter will observe this rejection. + if (sharedController.signal.aborted) { + throw error; + } if (error instanceof AppKitError || error instanceof ApiError) { throw error; } @@ -276,12 +337,19 @@ export class CacheManager { ); }) .finally(() => { - this.inFlightRequests.delete(cacheKey); + if (this.inFlightRequests.get(cacheKey) === entry) { + this.inFlightRequests.delete(cacheKey); + } }); - this.inFlightRequests.set(cacheKey, promise); + // Suppress unhandled rejection warnings when every caller bailed + // before fn() resolved (their own promises rejected via + // _waitWithRefCount; the underlying entry.promise has no awaiter). + entry.promise.catch(() => {}); - const result = await promise; + this.inFlightRequests.set(cacheKey, entry as InFlightEntry); + + const result = await this._waitWithRefCount(entry, callerSignal); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { @@ -296,6 +364,69 @@ export class CacheManager { ); } + /** + * Wait on an in-flight entry, racing the underlying promise against the + * caller's abort signal. When the caller aborts, the entry's refCount is + * decremented; if it hits zero the shared controller is aborted so the + * underlying `fn()` can stop. Other callers continue to await the same + * entry and receive the result when it arrives. + */ + private _waitWithRefCount( + entry: InFlightEntry, + callerSignal?: AbortSignal, + ): Promise { + if (!callerSignal) return entry.promise; + + return new Promise((resolve, reject) => { + let settled = false; + + const release = () => { + if (entry.refCount > 0) entry.refCount--; + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + // Grace period: delay abort so a StrictMode remount can join + // the in-flight entry before the shared execution is cancelled. + entry.abortTimer = setTimeout(() => { + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + entry.sharedController.abort( + callerSignal.reason ?? "all cache callers aborted", + ); + } + }, 100); + } + }; + + const onAbort = () => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + release(); + reject(createAbortError(callerSignal)); + }; + + if (callerSignal.aborted) { + onAbort(); + return; + } + + callerSignal.addEventListener("abort", onAbort, { once: true }); + + entry.promise.then( + (value) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + resolve(value); + }, + (error) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); + } + /** * Get a cached value * @param key - Cache key diff --git a/packages/appkit/src/cache/tests/cache-manager.test.ts b/packages/appkit/src/cache/tests/cache-manager.test.ts index 3916ef1eb..3f8fa5dbd 100644 --- a/packages/appkit/src/cache/tests/cache-manager.test.ts +++ b/packages/appkit/src/cache/tests/cache-manager.test.ts @@ -378,6 +378,402 @@ describe("CacheManager", () => { expect(r2).toBe("result-2"); expect(fn).toHaveBeenCalledTimes(2); }); + + test("should work when fn ignores signal parameter (non-signal caller regression)", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + // Simulates direct callers like telemetry-example-plugin that pass + // a plain async function without accepting the shared signal. + const fn = vi.fn().mockResolvedValue("no-signal-result"); + + const result = await cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + }); + + expect(result).toBe("no-signal-result"); + expect(fn).toHaveBeenCalledTimes(1); + // The cache passes the shared AbortSignal even if the fn ignores it + expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal)); + }); + }); + + describe("abort / ref-counting", () => { + test("one caller aborts while another still waits — waiting caller resolves normally", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked and both callers to join the entry + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + await new Promise((r) => setTimeout(r, 0)); + + // Abort caller 1 — caller 2 still holds a ref + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Resolve the underlying fn — caller 2 should still get the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("all callers abort — shared controller signal is aborted", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + expect(capturedSignal.aborted).toBe(false); + + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal still active — one caller remains + expect(capturedSignal.aborted).toBe(false); + + controller2.abort(); + await expect(p2).rejects.toThrow(); + + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); + expect(capturedSignal.aborted).toBe(true); + }); + + test("pre-aborted callerSignal throws immediately without executing fn", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const controller = new AbortController(); + controller.abort(); + + const fn = vi.fn().mockResolvedValue("should-not-run"); + + await expect( + cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }), + ).rejects.toThrow(); + + expect(fn).not.toHaveBeenCalled(); + }); + + test("single caller abort mid-flight rejects with abort error and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // Grace period: shared controller not yet aborted + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); + expect(capturedSignal.aborted).toBe(true); + }); + + test("deduped caller abort does not poison the first caller's result", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controllerDeduped = new AbortController(); + + // First caller — no signal (or non-aborting) + const p1 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Second caller joins with an abort signal + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controllerDeduped.signal, + }); + + // Abort the deduped caller + controllerDeduped.abort(); + await expect(p2).rejects.toThrow(); + + // Resolve underlying fn — first caller should get the result + resolveFn("first-caller-result"); + await expect(p1).resolves.toBe("first-caller-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("fn rejects while multiple callers wait — all receive the error", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let rejectFn!: (error: Error) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((_resolve, reject) => { + rejectFn = reject; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(rejectFn).toBeDefined()); + + const networkError = new Error("network failure"); + rejectFn(networkError); + + await expect(p1).rejects.toThrow("network failure"); + await expect(p2).rejects.toThrow("network failure"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("caller aborts after promise already resolved — gets the resolved value", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller = new AbortController(); + + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(resolveFn).toBeDefined()); + + // Resolve first, then abort + resolveFn("already-resolved"); + // Allow microtask to settle the promise + await new Promise((r) => setTimeout(r, 0)); + controller.abort(); + + await expect(p).resolves.toBe("already-resolved"); + }); + + test("new caller after previous entry fully aborted gets fresh execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let callCount = 0; + const capturedSignals: AbortSignal[] = []; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + callCount++; + capturedSignals.push(signal); + const currentCall = callCount; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + // Only auto-resolve for the second call (first is held open + // until the abort timer fires and cancels it) + if (currentCall > 1) { + setTimeout(() => { + if (!signal.aborted) resolve(`result-${currentCall}`); + }, 10); + } + }), + ); + + const controller1 = new AbortController(); + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Abort the only caller — shared controller should abort after grace period + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Wait for grace period + .finally() cleanup + await new Promise((r) => setTimeout(r, 200)); + + // New caller arrives — should get a fresh execution, not the aborted one + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + await expect(p2).resolves.toBe("result-2"); + + expect(fn).toHaveBeenCalledTimes(2); + expect(capturedSignals[0].aborted).toBe(true); + expect(capturedSignals[1].aborted).toBe(false); + }); + + test("grace period: new caller joins before timer fires — no abort, single execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve) => { + capturedSignal = signal; + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + + // Mount #1: starts execution + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Wait for fn to be invoked + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + + // Mount #1 unmounts — abort fires, grace period starts + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal should NOT be aborted yet (grace period active) + expect(capturedSignal.aborted).toBe(false); + + // Mount #2 arrives within the grace period — joins the same entry + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Resolve the underlying fn — mount #2 gets the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + // fn was only called once — no duplicate execution + expect(fn).toHaveBeenCalledTimes(1); + // Shared signal was never aborted + expect(capturedSignal.aborted).toBe(false); + }); + + test("grace period: no new caller arrives — timer fires and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((_resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire (100ms + buffer) + await new Promise((r) => setTimeout(r, 150)); + + // Now shared signal should be aborted + expect(capturedSignal.aborted).toBe(true); + }); }); describe("disabled cache", () => { diff --git a/packages/appkit/src/plugin/interceptors/cache.ts b/packages/appkit/src/plugin/interceptors/cache.ts index b8af0ca32..d1637afed 100644 --- a/packages/appkit/src/plugin/interceptors/cache.ts +++ b/packages/appkit/src/plugin/interceptors/cache.ts @@ -18,11 +18,27 @@ export class CacheInterceptor implements ExecutionInterceptor { return fn(); } + const callerSignal = context.signal; + + // The cache may dedupe this request onto a shared in-flight execution. + // Swap context.signal to the cache-owned shared signal for the duration + // of fn() so the inner interceptor chain (timeout/retry/telemetry) and + // the underlying I/O observe abort only when *all* callers have left, + // not just this one. Without this swap, mount #1's abort under React + // StrictMode poisons mount #2's joined inflight result. return this.cacheManager.getOrExecute( this.config.cacheKey, - fn, + async (sharedSignal) => { + const previousSignal = context.signal; + context.signal = sharedSignal; + try { + return await fn(); + } finally { + context.signal = previousSignal; + } + }, context.userKey, - { ttl: this.config.ttl }, + { ttl: this.config.ttl, callerSignal }, ); } } diff --git a/packages/appkit/src/plugin/tests/cache.test.ts b/packages/appkit/src/plugin/tests/cache.test.ts index 7e01e7787..12730f67b 100644 --- a/packages/appkit/src/plugin/tests/cache.test.ts +++ b/packages/appkit/src/plugin/tests/cache.test.ts @@ -20,9 +20,9 @@ class MockCacheManager { async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { const cacheKey = this.generateKey(key, userKey); const cached = await this.get(cacheKey); @@ -35,7 +35,8 @@ class MockCacheManager { return inFlight as Promise; } - const promise = fn() + const sharedController = new AbortController(); + const promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); return result; @@ -263,4 +264,39 @@ describe("CacheInterceptor", () => { expect(fn).toHaveBeenCalledTimes(2); }); + + test("should swap context.signal to shared signal during fn() and restore afterward", async () => { + const config: CacheConfig = { + enabled: true, + cacheKey: ["test", "signal-swap"], + }; + const callerSignal = new AbortController().signal; + const contextWithSignal: InterceptorContext = { + metadata: new Map(), + userKey: "service", + signal: callerSignal, + }; + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); + + let signalDuringExecution: AbortSignal | undefined; + const fn = vi.fn().mockImplementation(async () => { + signalDuringExecution = contextWithSignal.signal; + return "result"; + }); + + await interceptor.intercept(fn, contextWithSignal); + + // During fn(), context.signal should have been swapped to the shared signal + expect(signalDuringExecution).toBeDefined(); + expect(signalDuringExecution).not.toBe(callerSignal); + expect(signalDuringExecution).toBeInstanceOf(AbortSignal); + + // After intercept completes, context.signal should be restored + expect(contextWithSignal.signal).toBe(callerSignal); + }); }); diff --git a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts index 0827da35e..855bf716b 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as unknown as typeof CacheManager.getInstanceSync; diff --git a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts index 6e090bd2f..61595fcbf 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts @@ -51,8 +51,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as any; diff --git a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts index 047836648..2767766c3 100644 --- a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts index aecf75fb9..8aba4ce1d 100644 --- a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts @@ -52,8 +52,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), // biome-ignore lint/suspicious/noExplicitAny: test mock diff --git a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts index 9ea5cb2bf..2fc493ef4 100644 --- a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts +++ b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts @@ -23,8 +23,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts index 5d7bc3bae..550c20be1 100644 --- a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/error-handling.test.ts b/packages/appkit/src/plugins/files/tests/error-handling.test.ts index c0ce31c41..a16e40d0a 100644 --- a/packages/appkit/src/plugins/files/tests/error-handling.test.ts +++ b/packages/appkit/src/plugins/files/tests/error-handling.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/mkdir.test.ts b/packages/appkit/src/plugins/files/tests/mkdir.test.ts index e6557acd2..8f667f5c0 100644 --- a/packages/appkit/src/plugins/files/tests/mkdir.test.ts +++ b/packages/appkit/src/plugins/files/tests/mkdir.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/path-validation.test.ts b/packages/appkit/src/plugins/files/tests/path-validation.test.ts index e4b4ac64f..8ef0cdb62 100644 --- a/packages/appkit/src/plugins/files/tests/path-validation.test.ts +++ b/packages/appkit/src/plugins/files/tests/path-validation.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/plugin.test.ts b/packages/appkit/src/plugins/files/tests/plugin.test.ts index da70b538b..3136a21c0 100644 --- a/packages/appkit/src/plugins/files/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/files/tests/plugin.test.ts @@ -44,8 +44,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(), }; diff --git a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts index c3de70aba..99641bce4 100644 --- a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/shutdown.test.ts b/packages/appkit/src/plugins/files/tests/shutdown.test.ts index dcf761c48..99eaa1e86 100644 --- a/packages/appkit/src/plugins/files/tests/shutdown.test.ts +++ b/packages/appkit/src/plugins/files/tests/shutdown.test.ts @@ -30,8 +30,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/upload.test.ts b/packages/appkit/src/plugins/files/tests/upload.test.ts index baf3aa549..725756a99 100644 --- a/packages/appkit/src/plugins/files/tests/upload.test.ts +++ b/packages/appkit/src/plugins/files/tests/upload.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index fcb2e167c..0af6c25b3 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -21,7 +21,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), diff --git a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts index 24d37341f..5f96d6efe 100644 --- a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts +++ b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts @@ -15,8 +15,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/plugins/serving/tests/serving.test.ts b/packages/appkit/src/plugins/serving/tests/serving.test.ts index 8fbe79bba..216c2b727 100644 --- a/packages/appkit/src/plugins/serving/tests/serving.test.ts +++ b/packages/appkit/src/plugins/serving/tests/serving.test.ts @@ -20,7 +20,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 901e0b46c..a5b3a4b32 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -76,7 +76,9 @@ export class StreamManager { abortAll(): void { this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); - operation.controller.abort("Server shutdown"); + operation.controller.abort( + new DOMException("Server shutdown", "AbortError"), + ); }); this.activeOperations.clear(); this.streamRegistry.clear(); @@ -140,7 +142,9 @@ export class StreamManager { // Stop the generator when no clients remain if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - streamEntry.abortController.abort("All clients disconnected"); + streamEntry.abortController.abort( + new DOMException("All clients disconnected", "AbortError"), + ); } // cleanup if stream is completed and no clients are connected @@ -227,7 +231,9 @@ export class StreamManager { // Stop the generator when no clients remain so polling loops // (e.g. jobs runAndWait) don't keep running in the background. if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - abortController.abort("Client disconnected"); + abortController.abort( + new DOMException("Client disconnected", "AbortError"), + ); } });