Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 139 additions & 20 deletions packages/appkit/src/cache/index.ts
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agentic review:

ACE Review Findings

# Severity File Line Issue
1 high cache/tests/cache-manager.test.ts Zero tests for the core abort/ref-counting behavior — the entire motivation for this PR is untested
2 medium 18 test files across plugins/ Stale getOrExecute mock signatures (fn: () => Promise<T> instead of fn: (signal?: AbortSignal) => Promise<T>)
3 medium plugin/tests/cache.test.ts 38 MockCacheManager calls fn() without shared signal; CacheInterceptor signal-swap (context.signal = sharedSignal) is untested
4 low cache/index.ts 376-383 release() uses last-aborting caller's reason for shared abort — arbitrary but acceptable since nobody observes it
5 low cache/index.ts 263-265 Unsafe as InFlightEntry<T> cast — type-safe in practice due to same-key guarantees

Finding 1 — Missing abort/ref-counting tests

Tests needed (new describe("abort / ref-counting") block in cache-manager.test.ts):

  • One caller aborts while another still waits → waiting caller still resolves normally
  • All callers abort → shared controller's signal is aborted
  • Pre-aborted callerSignal throws immediately without executing fn
  • Single caller with signal aborts mid-flight → rejects with abort error, shared controller aborts
  • Deduped caller abort does not poison the first caller's result

Finding 2 — Stale mock signatures (18 files)

Update mock type from fn: () => Promise<unknown> to fn: (signal?: AbortSignal) => Promise<unknown> in:

  • plugins/lakebase/tests/lakebase-agent-tool.test.ts
  • plugins/serving/tests/serving.test.ts
  • plugins/agents/tests/dispatch-tool-call.test.ts
  • plugins/agents/tests/dos-limits.test.ts
  • plugins/agents/tests/approval-route.test.ts
  • plugins/agents/tests/approval-config.test.ts
  • plugins/agents/tests/route-handler-errors.test.ts
  • plugins/genie/tests/genie.test.ts
  • plugins/files/tests/error-handling.test.ts
  • plugins/files/tests/path-validation.test.ts
  • plugins/files/tests/upload.test.ts
  • plugins/files/tests/raw-endpoint.test.ts
  • plugins/files/tests/plugin.test.ts
  • plugins/files/tests/shutdown.test.ts
  • plugins/files/tests/mkdir.test.ts
  • plugins/files/tests/download-endpoint.test.ts

Finding 3 — MockCacheManager missing signal pass-through

In plugin/tests/cache.test.ts:38, change fn() to fn(new AbortController().signal) and add a test verifying context.signal is swapped during fn() execution and restored afterward.

Note: sync-throw guard NOT needed

Originally flagged as high — entry.promise initialized as undefined as unknown as Promise<T> could crash if fn throws synchronously. However, the only production consumer (CacheInterceptor) wraps fn in an async lambda, which can never throw synchronously. Not a real concern.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import { InMemoryStorage, PersistentStorage } from "./storage";

const logger = createLogger("cache");

/**
* Reference-counted in-flight cache execution entry.
*
* `sharedController` decouples the cached `fn()` from any single caller's
* abort signal. Callers join an in-flight entry by incrementing `refCount`;
* when a caller aborts, refCount is decremented. The shared controller is
* aborted only when refCount drops to 0 — i.e. all callers have abandoned
* the request. This prevents one caller's cancellation (e.g. React
* StrictMode unmount) from poisoning the in-flight result for other still-
* connected awaiters.
*/
interface InFlightEntry<T> {
promise: Promise<T>;
refCount: number;
sharedController: AbortController;
}

function createAbortError(signal: AbortSignal): unknown {
if (signal.reason !== undefined) return signal.reason;
return new DOMException("The operation was aborted.", "AbortError");
}

/**
* Cache manager class to handle cache operations.
* Can be used with in-memory storage or persistent storage (Lakebase).
Expand All @@ -34,7 +56,7 @@ export class CacheManager {

private storage: CacheStorage;
private config: CacheConfig;
private inFlightRequests: Map<string, Promise<unknown>>;
private inFlightRequests: Map<string, InFlightEntry<unknown>>;
private cleanupInProgress: boolean;
private lastCleanupAttempt: number;

Expand Down Expand Up @@ -174,20 +196,37 @@ export class CacheManager {
}

/**
* Get or execute a function and cache the result
* @param key - Cache key
* @param fn - Function to execute
* @param userKey - User key
* Get or execute a function and cache the result.
*
* Multiple concurrent callers with the same `cacheKey` are deduplicated
* onto a single in-flight execution. Each caller may pass its own
* `callerSignal`; the underlying `fn()` is run with a shared, internally
* managed `AbortSignal` that aborts only when *all* callers have
* abandoned the request (reference counted). This decouples a single
* caller's cancellation (e.g. React StrictMode unmount) from the shared
* result, so other still-connected callers receive the cached value
* normally.
*
* @param key - Cache key parts
* @param fn - Function to execute. Receives the cache-owned shared signal;
* pass it through to the underlying I/O so the work is cancelled when
* no caller is left waiting.
* @param userKey - User key for cache namespacing
* @param options - Options for the cache
* @returns Promise of the result
*/
async getOrExecute<T>(
key: (string | number | object)[],
fn: () => Promise<T>,
fn: (sharedSignal?: AbortSignal) => Promise<T>,
userKey: string,
options?: { ttl?: number },
options?: { ttl?: number; callerSignal?: AbortSignal },
): Promise<T> {
if (!this.config.enabled) return fn();
if (!this.config.enabled) return fn(options?.callerSignal);

const callerSignal = options?.callerSignal;
if (callerSignal?.aborted) {
throw createAbortError(callerSignal);
}

const cacheKey = this.generateKey(key, userKey);

Expand Down Expand Up @@ -218,9 +257,14 @@ export class CacheManager {
return cached.value;
}

// check if the value is being processed by another request
const inFlight = this.inFlightRequests.get(cacheKey);
if (inFlight) {
// check if the value is being processed by another request — join
// the existing in-flight entry under reference counting so this
// caller's abort doesn't poison the shared result.
const existing = this.inFlightRequests.get(cacheKey) as
| InFlightEntry<T>
| undefined;
if (existing && !existing.sharedController.signal.aborted) {
existing.refCount++;
span.setAttribute("cache.hit", true);
span.setAttribute("cache.deduplication", true);
span.addEvent("cache.deduplication_used", {
Expand All @@ -238,11 +282,10 @@ export class CacheManager {
cache_deduplication: true,
});

span.end();
return inFlight as Promise<T>;
return await this._waitWithRefCount(existing, callerSignal);
}

// cache miss - execute function
// cache miss - execute function under a shared abort controller
span.setAttribute("cache.hit", false);
span.addEvent("cache.miss", { "cache.key": cacheKey });
this.telemetryMetrics.cacheMissCount.add(1, {
Expand All @@ -254,7 +297,14 @@ export class CacheManager {
cache_key: cacheKey,
});

const promise = fn()
const sharedController = new AbortController();
const entry: InFlightEntry<T> = {
promise: undefined as unknown as Promise<T>,
refCount: 1,
sharedController,
};

entry.promise = fn(sharedController.signal)
.then(async (result) => {
await this.set(cacheKey, result, options);
span.addEvent("cache.value_stored", {
Expand All @@ -266,8 +316,13 @@ export class CacheManager {
.catch((error) => {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
// Preserve AppKit errors and Databricks API errors (with status codes)
// so route handlers can map them to proper HTTP responses.
// If the shared controller aborted, all callers have already
// abandoned the request (or are about to via their own signals)
// — propagate the original error without wrapping. No live
// awaiter will observe this rejection.
if (sharedController.signal.aborted) {
throw error;
}
if (error instanceof AppKitError || error instanceof ApiError) {
throw error;
}
Expand All @@ -276,12 +331,19 @@ export class CacheManager {
);
})
.finally(() => {
this.inFlightRequests.delete(cacheKey);
if (this.inFlightRequests.get(cacheKey) === entry) {
this.inFlightRequests.delete(cacheKey);
}
});

this.inFlightRequests.set(cacheKey, promise);
// Suppress unhandled rejection warnings when every caller bailed
// before fn() resolved (their own promises rejected via
// _waitWithRefCount; the underlying entry.promise has no awaiter).
entry.promise.catch(() => {});

this.inFlightRequests.set(cacheKey, entry as InFlightEntry<unknown>);

const result = await promise;
const result = await this._waitWithRefCount(entry, callerSignal);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
Expand All @@ -296,6 +358,63 @@ export class CacheManager {
);
}

/**
* Wait on an in-flight entry, racing the underlying promise against the
* caller's abort signal. When the caller aborts, the entry's refCount is
* decremented; if it hits zero the shared controller is aborted so the
* underlying `fn()` can stop. Other callers continue to await the same
* entry and receive the result when it arrives.
*/
private _waitWithRefCount<T>(
entry: InFlightEntry<T>,
callerSignal?: AbortSignal,
): Promise<T> {
if (!callerSignal) return entry.promise;

return new Promise<T>((resolve, reject) => {
let settled = false;

const release = () => {
if (entry.refCount > 0) entry.refCount--;
if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) {
entry.sharedController.abort(
callerSignal.reason ?? "all cache callers aborted",
);
}
};

const onAbort = () => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
release();
reject(createAbortError(callerSignal));
};

if (callerSignal.aborted) {
onAbort();
return;
}

callerSignal.addEventListener("abort", onAbort, { once: true });

entry.promise.then(
(value) => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
resolve(value);
},
(error) => {
if (settled) return;
settled = true;
callerSignal.removeEventListener("abort", onAbort);
reject(error);
},
);
});
}

/**
* Get a cached value
* @param key - Cache key
Expand Down
Loading
Loading