Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export default class AgentClientAgentPort implements AgentPort {
return createRemoteAgentClient({
url: this.agentUrl,
token: this.mintToken(user),
actionEndpoints: this.buildActionEndpoints(),
actionEndpoints: this.buildActionEndpoints(user.renderingId),
});
}

Expand Down Expand Up @@ -291,10 +291,10 @@ export default class AgentClientAgentPort implements AgentPort {
}
}

private buildActionEndpoints(): ActionEndpointsByCollection {
private buildActionEndpoints(renderingId: number): ActionEndpointsByCollection {
const endpoints: ActionEndpointsByCollection = {};

for (const [collectionName, schema] of this.schemaCache) {
for (const [collectionName, schema] of this.schemaCache.entriesForRendering(renderingId)) {
endpoints[collectionName] = {};

for (const action of schema.actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ export default class StepExecutorFactory {
activityLogPort: ActivityLogPort,
incomingPendingData?: unknown,
): ExecutionContext {
const schemaResolver = new SchemaResolver(cfg.schemaCache, cfg.workflowPort, step.runId);
const schemaResolver = new SchemaResolver(
cfg.schemaCache,
cfg.workflowPort,
step.runId,
step.user.renderingId,
);
const activityLog = new ActivityLog(activityLogPort, step.user);

return {
Expand Down
49 changes: 35 additions & 14 deletions packages/workflow-executor/src/schema-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ import type { CollectionSchema } from './types/validated/collection';

import { DEFAULT_SCHEMA_CACHE_TTL_S } from './defaults';

type Entry = { collectionName: string; schema: CollectionSchema; fetchedAt: number };

// A collection's schema (display names, exposed fields, actions) depends on the rendering it is
// resolved for, so entries are keyed by (renderingId, collectionName). A shared, collection-only
// key would let a run reuse another rendering's schema within the TTL window (PRD-440).
export default class SchemaCache {
private readonly store = new Map<string, { schema: CollectionSchema; fetchedAt: number }>();
// Separates the two key parts so the prefix scan in entriesForRendering can't confuse
// rendering 1 with rendering 11, and a collection name can't spill into the rendering segment.
private static readonly SEPARATOR = String.fromCharCode(0);

private readonly store = new Map<string, Entry>();
private readonly ttlS: number;
private readonly now: () => number;

Expand All @@ -12,34 +21,46 @@ export default class SchemaCache {
this.now = now;
}

get(collectionName: string): CollectionSchema | undefined {
const entry = this.store.get(collectionName);
get(renderingId: number, collectionName: string): CollectionSchema | undefined {
const key = SchemaCache.key(renderingId, collectionName);
const entry = this.store.get(key);

if (!entry) return undefined;

if (this.now() - entry.fetchedAt > this.ttlS * 1000) {
this.store.delete(collectionName);
if (this.isExpired(entry)) {
this.store.delete(key);

return undefined;
}

return entry.schema;
}

set(collectionName: string, schema: CollectionSchema): void {
this.store.set(collectionName, { schema, fetchedAt: this.now() });
set(renderingId: number, collectionName: string, schema: CollectionSchema): void {
this.store.set(SchemaCache.key(renderingId, collectionName), {
collectionName,
schema,
fetchedAt: this.now(),
});
}

// Yields non-expired entries; deletes stale ones along the way.
*[Symbol.iterator](): IterableIterator<[string, CollectionSchema]> {
const now = this.now();
// Yields the given rendering's non-expired entries; deletes stale ones along the way.
*entriesForRendering(renderingId: number): IterableIterator<[string, CollectionSchema]> {
const prefix = SchemaCache.key(renderingId, '');

for (const [key, entry] of this.store) {
if (now - entry.fetchedAt <= this.ttlS * 1000) {
yield [key, entry.schema];
} else {
this.store.delete(key);
if (key.startsWith(prefix)) {
if (this.isExpired(entry)) this.store.delete(key);
else yield [entry.collectionName, entry.schema];
}
}
}

private isExpired(entry: Entry): boolean {
return this.now() - entry.fetchedAt > this.ttlS * 1000;
}

private static key(renderingId: number, collectionName: string): string {
return `${renderingId}${SchemaCache.SEPARATOR}${collectionName}`;
}
}
20 changes: 12 additions & 8 deletions packages/workflow-executor/src/schema-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,32 @@ import type { WorkflowPort } from './ports/workflow-port';
import type SchemaCache from './schema-cache';
import type { CollectionSchema } from './types/validated/collection';

// Per-run schema resolution: binds the shared SchemaCache, the orchestrator port and the
// current runId once, so callers never thread a loader. Writes into the SAME SchemaCache
// instance AgentClientAgentPort reads from (get/iterate): the resolver always populates an
// entry before the agent-port reads it, so the agent-port's SchemaNotCachedError guard does
// not fire under normal TTLs (a TTL shorter than a single step's round-trip could still evict).
// Per-run schema resolution: binds the shared SchemaCache, the orchestrator port, the current
// runId and its renderingId once, so callers never thread a loader. Cache reads/writes are scoped
// to the rendering so a run never reuses another rendering's schema (PRD-440). Writes into the
// SAME SchemaCache instance AgentClientAgentPort reads from (scoped by the same renderingId): the
// resolver always populates an entry before the agent-port reads it, so the agent-port's
// SchemaNotCachedError guard does not fire under normal TTLs (a TTL shorter than a single step's
// round-trip could still evict).
export default class SchemaResolver {
private readonly cache: SchemaCache;
private readonly workflowPort: WorkflowPort;
private readonly runId: string;
private readonly renderingId: number;

constructor(cache: SchemaCache, workflowPort: WorkflowPort, runId: string) {
constructor(cache: SchemaCache, workflowPort: WorkflowPort, runId: string, renderingId: number) {
this.cache = cache;
this.workflowPort = workflowPort;
this.runId = runId;
this.renderingId = renderingId;
}

async resolve(collectionName: string): Promise<CollectionSchema> {
const cached = this.cache.get(collectionName);
const cached = this.cache.get(this.renderingId, collectionName);
if (cached) return cached;

const schema = await this.workflowPort.getCollectionSchema(collectionName, this.runId);
this.cache.set(collectionName, schema);
this.cache.set(this.renderingId, collectionName, schema);

return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@

const mocks = createMockClient();
({ mockCollection, mockRelation, mockAction } = mocks);
mockedCreateRemoteAgentClient.mockReturnValue(mocks.client as any);

Check warning on line 52 in packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Unexpected any. Specify a different type

const schemaCache = new SchemaCache();
schemaCache.set('users', {
schemaCache.set(1, 'users', {
collectionName: 'users',
collectionId: 'col-users',
collectionDisplayName: 'Users',
Expand All @@ -66,7 +66,7 @@
{ name: 'archive', displayName: 'Archive', endpoint: '/forest/actions/archive' },
],
});
schemaCache.set('orders', {
schemaCache.set(1, 'orders', {
collectionName: 'orders',
collectionId: 'col-orders',
collectionDisplayName: 'Orders',
Expand All @@ -77,7 +77,7 @@
],
actions: [],
});
schemaCache.set('posts', {
schemaCache.set(1, 'posts', {
collectionName: 'posts',
collectionId: 'col-posts',
collectionDisplayName: 'Posts',
Expand Down Expand Up @@ -764,7 +764,7 @@
describe('buildActionEndpoints', () => {
it('passes fields and hooks from schema to agent-client (supports Ruby agent fallback)', async () => {
const schemaCache = new SchemaCache();
schemaCache.set('users', {
schemaCache.set(1, 'users', {
collectionName: 'users',
collectionId: 'col-users',
collectionDisplayName: 'Users',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: makeMockLogger(),
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down Expand Up @@ -2148,7 +2148,7 @@

await new LoadRelatedRecordStepExecutor(context).execute();

const firstRow = JSON.parse(selectRecordPrompt(invoke).match(/\[0\] (\{[^\n]*\})/)![1]);

Check warning on line 2151 in packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion
expect(Object.keys(firstRow)).toHaveLength(6);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function makeContext(
const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort();
const schemaCache = new SchemaCache();
const schemaResolver =
overrides.schemaResolver ?? new SchemaResolver(schemaCache, workflowPort, runId);
overrides.schemaResolver ?? new SchemaResolver(schemaCache, workflowPort, runId, 1);

const base: Omit<ExecutionContext<ReadRecordStepDefinition>, 'agent' | 'activityLog'> = {
runId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ function makeContext(
permissionLevel: 'admin',
tags: {},
},
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId),
schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId, 1),
previousSteps: [],
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
...overrides,
Expand Down
Loading
Loading