diff --git a/apps/docs-astro/public/llms-full.txt b/apps/docs-astro/public/llms-full.txt index 597edb09..e2513ebd 100644 --- a/apps/docs-astro/public/llms-full.txt +++ b/apps/docs-astro/public/llms-full.txt @@ -1542,20 +1542,38 @@ const handle = client.query('todos', { const unsubscribe = handle.subscribe((results) => { console.log('Results:', results); }); +``` -// Cursor pagination -const { nextCursor, hasMore } = handle.getPaginationInfo(); -if (hasMore && nextCursor) { - const nextPage = client.query('todos', { - where: { completed: false }, - sort: { createdAt: 'desc' }, - limit: 10, - cursor: nextCursor, - }); +#### Cursor pagination — `loadMore()` + +Call `handle.loadMore()` to fetch the next page and append its rows to the existing result set. This is the recommended path: it guards against duplicate in-flight requests for the same cursor and merges the new page using an append-only strategy so earlier rows are never lost. + +```typescript +const handle = client.query('todos', { + where: { completed: false }, + sort: { createdAt: 'desc' }, + limit: 10, +}); + +// Render the first page +const unsubscribe = handle.subscribe((results) => render(results)); + +// Load subsequent pages on demand (e.g. a "Load more" button) +await handle.loadMore(); +// The handle's subscriber fires again with the merged result set +// (first page rows + new page rows combined). + +// Check whether more pages exist before calling again +const { hasMore } = handle.getPaginationInfo(); +if (hasMore) { + await handle.loadMore(); } ``` -`QueryHandle` methods: `subscribe(cb)`, `onDelta(listener)`, `consumeChanges()`, `getLastChange()`, `getPendingChanges()`, `clearChanges()`, `resetChangeTracker()`, `getFilter()`, `getMapName()`, `getPaginationInfo()`, `onPaginationChange(listener)`, `updatePaginationInfo(info)`, `syncState` (getter), `onSyncStateChange(listener)`. +`loadMore()` resolves immediately when there are no more pages (`hasMore` is false) or when a request for the same cursor is already in flight, so it is safe to call on every scroll event without extra debouncing. + + +`QueryHandle` methods: `subscribe(cb)`, `loadMore()`, `onDelta(listener)`, `consumeChanges()`, `getLastChange()`, `getPendingChanges()`, `clearChanges()`, `resetChangeTracker()`, `getFilter()`, `getMapName()`, `getPaginationInfo()`, `onPaginationChange(listener)`, `updatePaginationInfo(info)`, `syncState` (getter), `onSyncStateChange(listener)`. #### Knowing when a result is authoritative — `subscribe` `{ settled }` @@ -1763,20 +1781,31 @@ const unsubscribe = handle.subscribe((results) => setResults(results)); handle.dispose(); ``` -### `sql(query)` +### `sql(query)` — opt-in, server-feature-gated ```typescript public async sql(query: string): Promise ``` -Execute a SQL query server-side via DataFusion. Map names are table names. Requires the server's DataFusion feature and registered schemas. +Execute a one-shot SQL query server-side via the DataFusion engine. Map names are used as table names; the server must have schemas registered for them. + +**Off by default.** The server's DataFusion feature is a compile-time opt-in (`--features datafusion`). The default server build (`default = ["redb"]`) does NOT include DataFusion. If you send a SQL query to a server built without this feature, the server returns an error. + +**Niche — use only for what the structured query API cannot express.** `client.sql()` exists for queries that genuinely require SQL: multi-map **joins**, **window functions**, **HAVING** clauses, and **DISTINCT** aggregation. For everything else — filtering, sorting, full-text search, group-by, aggregate functions — use the structured query API (`client.query()`, `client.queryOnce()`, `client.queryOncePaged()`) backed by the canonical DAG engine. Reaching for SQL when the structured API suffices adds a server-side dependency and bypasses the local-first offline path. ```typescript -const result = await client.sql('SELECT name, age FROM users WHERE age > 21 ORDER BY age'); -// result.columns: ['name', 'age'] -// result.rows: [[...], [...]] +// Only appropriate for cross-map joins or window/HAVING/DISTINCT queries +const result = await client.sql( + 'SELECT u.name, COUNT(t.id) AS task_count ' + + 'FROM users u JOIN tasks t ON t.userId = u.id ' + + 'GROUP BY u.name HAVING COUNT(t.id) > 3' +); +// result.columns: ['name', 'task_count'] +// result.rows: [['Alice', 5], ['Bob', 4]] ``` +`SqlQueryResult` shape: `{ columns: string[]; rows: unknown[][] }`. There is no cursor or pagination — SQL queries return the full result set. For large result sets, add a `LIMIT` clause. + ### `vectorSearch(mapName, queryVector, options?)` ```typescript diff --git a/apps/docs-astro/src/content/docs/reference/client.mdx b/apps/docs-astro/src/content/docs/reference/client.mdx index 9d6fc157..5728f22e 100644 --- a/apps/docs-astro/src/content/docs/reference/client.mdx +++ b/apps/docs-astro/src/content/docs/reference/client.mdx @@ -188,20 +188,39 @@ const handle = client.query('todos', { const unsubscribe = handle.subscribe((results) => { console.log('Results:', results); }); +``` + +#### Cursor pagination — `loadMore()` + +Call `handle.loadMore()` to fetch the next page and append its rows to the existing result set. This is the recommended path: it guards against duplicate in-flight requests for the same cursor and merges the new page using an append-only strategy so earlier rows are never lost. + +```typescript +const handle = client.query('todos', { + where: { completed: false }, + sort: { createdAt: 'desc' }, + limit: 10, +}); + +// Render the first page +const unsubscribe = handle.subscribe((results) => render(results)); + +// Load subsequent pages on demand (e.g. a "Load more" button) +await handle.loadMore(); +// The handle's subscriber fires again with the merged result set +// (first page rows + new page rows combined). -// Cursor pagination -const { nextCursor, hasMore } = handle.getPaginationInfo(); -if (hasMore && nextCursor) { - const nextPage = client.query('todos', { - where: { completed: false }, - sort: { createdAt: 'desc' }, - limit: 10, - cursor: nextCursor, - }); +// Check whether more pages exist before calling again +const { hasMore } = handle.getPaginationInfo(); +if (hasMore) { + await handle.loadMore(); } ``` -`QueryHandle` methods: `subscribe(cb)`, `onDelta(listener)`, `consumeChanges()`, `getLastChange()`, `getPendingChanges()`, `clearChanges()`, `resetChangeTracker()`, `getFilter()`, `getMapName()`, `getPaginationInfo()`, `onPaginationChange(listener)`, `updatePaginationInfo(info)`, `syncState` (getter), `onSyncStateChange(listener)`. +`loadMore()` resolves immediately when there are no more pages (`hasMore` is false) or when a request for the same cursor is already in flight, so it is safe to call on every scroll event without extra debouncing. + + + +`QueryHandle` methods: `subscribe(cb)`, `loadMore()`, `onDelta(listener)`, `consumeChanges()`, `getLastChange()`, `getPendingChanges()`, `clearChanges()`, `resetChangeTracker()`, `getFilter()`, `getMapName()`, `getPaginationInfo()`, `onPaginationChange(listener)`, `updatePaginationInfo(info)`, `syncState` (getter), `onSyncStateChange(listener)`. #### Knowing when a result is authoritative — `subscribe` `{ settled }` @@ -409,20 +428,31 @@ const unsubscribe = handle.subscribe((results) => setResults(results)); handle.dispose(); ``` -### `sql(query)` +### `sql(query)` — opt-in, server-feature-gated ```typescript public async sql(query: string): Promise ``` -Execute a SQL query server-side via DataFusion. Map names are table names. Requires the server's DataFusion feature and registered schemas. +Execute a one-shot SQL query server-side via the DataFusion engine. Map names are used as table names; the server must have schemas registered for them. + +**Off by default.** The server's DataFusion feature is a compile-time opt-in (`--features datafusion`). The default server build (`default = ["redb"]`) does NOT include DataFusion. If you send a SQL query to a server built without this feature, the server returns an error. + +**Niche — use only for what the structured query API cannot express.** `client.sql()` exists for queries that genuinely require SQL: multi-map **joins**, **window functions**, **HAVING** clauses, and **DISTINCT** aggregation. For everything else — filtering, sorting, full-text search, group-by, aggregate functions — use the structured query API (`client.query()`, `client.queryOnce()`, `client.queryOncePaged()`) backed by the canonical DAG engine. Reaching for SQL when the structured API suffices adds a server-side dependency and bypasses the local-first offline path. ```typescript -const result = await client.sql('SELECT name, age FROM users WHERE age > 21 ORDER BY age'); -// result.columns: ['name', 'age'] -// result.rows: [[...], [...]] +// Only appropriate for cross-map joins or window/HAVING/DISTINCT queries +const result = await client.sql( + 'SELECT u.name, COUNT(t.id) AS task_count ' + + 'FROM users u JOIN tasks t ON t.userId = u.id ' + + 'GROUP BY u.name HAVING COUNT(t.id) > 3' +); +// result.columns: ['name', 'task_count'] +// result.rows: [['Alice', 5], ['Bob', 4]] ``` +`SqlQueryResult` shape: `{ columns: string[]; rows: unknown[][] }`. There is no cursor or pagination — SQL queries return the full result set. For large result sets, add a `LIMIT` clause. + ### `vectorSearch(mapName, queryVector, options?)` ```typescript diff --git a/apps/docs-astro/src/content/docs/reference/mcp.mdx b/apps/docs-astro/src/content/docs/reference/mcp.mdx index 38680645..3aa57934 100644 --- a/apps/docs-astro/src/content/docs/reference/mcp.mdx +++ b/apps/docs-astro/src/content/docs/reference/mcp.mdx @@ -134,8 +134,8 @@ List every map the server knows about. Read records from a map with filters and sorting. Returns **authoritative server data** — the assistant gets the live, server-confirmed answer, not a stale local guess. -- **Parameters:** `{ map, filter?, sort?, limit?, fields? }` -- **Response:** `[{ _key, ...fields }]` (a plain settled array) +- **Parameters:** `{ map, filter?, sort?, limit?, fields?, cursor? }` +- **Response:** `[{ _key, ...fields }]` (a settled array) with an optional continuation note when `hasMore` is true - **Security:** rejects maps outside `allowedMaps`; clamps `limit` to `maxLimit` ```text @@ -147,7 +147,11 @@ A settled result with no matching records renders as `No results found in map '< **Offline / not-settled behavior.** `topgun_query` never silently returns stale local data and never conflates "offline" with "empty". If the server cannot be reached, or the query does not settle within the default 5000 ms window, the tool returns an error (`isError: true`) explaining that no authoritative data was returned — so the assistant knows to retry rather than reporting an empty list. The message distinguishes the two causes (server unreachable / client offline, vs. timed out waiting for the server). -**No cursor pagination, but truncation is never silent.** `topgun_query` returns a plain settled array; the previous `nextCursor` / `hasMore` continuation hint and `cursor` parameter have been removed (continuation cursors are an anti-pattern for an LLM caller). It does **not** drop the one signal that matters: when more rows match than the returned `limit`, the response appends a `More rows match than were returned…` note so the assistant knows the view was capped and can narrow `filter` / `sort` (or raise `limit` up to `maxLimit`) — rather than reporting a truncated list as the whole answer. +**Cursor pagination.** When the server has more results than the returned `limit`, the response includes a `cursor` token and appends a continuation note: "To fetch the next page, call this tool again with `cursor: `." Pass that token as the optional `cursor` parameter on the next call to receive the following page. Repeat until `hasMore` is false (no continuation note appears). This lets an agent walk through a large result set page by page without re-fetching rows it has already seen. + +If you do not need further pages, the continuation note also tells you to narrow with `filter` / `sort` or raise `limit` (up to `maxLimit`) — useful when you want a different slice rather than sequential pages. + +A settled result with no matching records renders as `No results found in map ''...` — that means the server genuinely has no such rows, not that the query failed. ### topgun_mutate diff --git a/packages/client/src/QueryHandle.ts b/packages/client/src/QueryHandle.ts index 2a0cbe8d..1972e7c1 100644 --- a/packages/client/src/QueryHandle.ts +++ b/packages/client/src/QueryHandle.ts @@ -413,6 +413,85 @@ export class QueryHandle { // ============== Pagination Methods ============== + /** + * In-flight cursor token for loadMore. Stores the cursor being fetched so + * concurrent calls with the same cursor are deduplicated to a single request. + */ + private _loadMoreInFlight: string | null = null; + + /** + * Append-only merge: adds/updates keys from the page batch without removing + * any existing keys. This preserves results from prior pages that are absent + * from the new page batch — a full-set reconciliation via onResult would + * prune those prior-page rows. + */ + private mergePageResults(items: { key: string; value: T }[]): void { + for (const item of items) { + this.currentResults.set(item.key, item.value); + } + this.computeAndNotifyChanges(Date.now()); + this.notify(); + } + + /** + * Load the next page of results and append them to the current result set. + * + * Uses the cursor from the most recent server response. If no further pages + * are available (`hasMore` is false) or a request for the same cursor is + * already in flight, resolves immediately without issuing a duplicate request. + * + * Results from the new page are merged with the existing result set using an + * append-only strategy: prior-page rows are never removed. + */ + public async loadMore(): Promise { + const { nextCursor, hasMore } = this._paginationInfo; + + // No more pages available — nothing to fetch. + if (!hasMore || !nextCursor) return; + + // Deduplicate concurrent calls for the same cursor. + if (this._loadMoreInFlight === nextCursor) return; + + this._loadMoreInFlight = nextCursor; + + try { + // Create a temporary one-shot handle with the next cursor, exactly + // mirroring the queryOnce pattern: subscribe → settle → unsubscribe. + const tempHandle = new QueryHandle(this.syncEngine, this.mapName, { + ...this.filter, + cursor: nextCursor, + }); + + const pageItems: { key: string; value: T }[] = []; + + const unsub = tempHandle.subscribe((results) => { + pageItems.length = 0; + for (const item of results) { + // Destructure the synthetic _key field added by getSortedResults. + const { _key, ...rest } = item as T & { _key: string }; + pageItems.push({ key: _key, value: rest as T }); + } + }); + + await tempHandle.whenSettled(); + unsub(); + + // Append-only merge: preserves rows from all prior pages. + this.mergePageResults(pageItems); + + // Advance pagination state to reflect the new page's cursor/hasMore. + const newPaginationInfo = tempHandle.getPaginationInfo(); + this._paginationInfo = { + nextCursor: newPaginationInfo.nextCursor, + hasMore: newPaginationInfo.hasMore, + cursorStatus: newPaginationInfo.cursorStatus, + }; + this.notifyPaginationListeners(); + } finally { + this._loadMoreInFlight = null; + } + } + /** * Get current pagination info. * Returns nextCursor, hasMore, and cursorStatus. diff --git a/packages/client/src/TopGunClient.ts b/packages/client/src/TopGunClient.ts index 0ad68ed0..ee4b8a0f 100644 --- a/packages/client/src/TopGunClient.ts +++ b/packages/client/src/TopGunClient.ts @@ -23,6 +23,19 @@ import { QueryOnceLocalError, type QueryOnceUnsettledReason, } from './errors/QueryOnceError'; + +/** + * Return value of {@link TopGunClient.queryOncePaged}. + * + * `items` is the authoritative server result for this page. + * `cursor` is the opaque token for the next page (undefined when none). + * `hasMore` is true when the server signalled additional rows beyond this page. + */ +export interface QueryOncePagedResult { + items: QueryResultItem[]; + cursor?: string; + hasMore: boolean; +} import { DistributedLock } from './DistributedLock'; import { TopicHandle } from './TopicHandle'; import { PNCounterHandle } from './PNCounterHandle'; @@ -373,6 +386,81 @@ export class TopGunClient = any> { } } + /** + * One-shot paged read: resolves with authoritative server data including cursor + * metadata for pagination. Unlike {@link queryOnce}, the return value carries + * `{ items, cursor, hasMore }` so callers can drive {@link QueryHandle.loadMore} + * or issue a follow-up `queryOncePaged` for the next page. + * + * The offline policy is identical to {@link queryOnce}: + * - Default: rejects with {@link QueryOnceUnsettledError} when offline or timed-out. + * - `{ allowLocal: true }`: throws {@link QueryOnceLocalError} carrying a local + * snapshot on `.localData`. + * + * `queryOnce` is left unchanged — this is a separate method so that the plain + * `Promise` return type of `queryOnce` is not disturbed. + */ + public queryOncePaged( + mapName: K, + filter: QueryFilter, + opts?: QueryOnceOptions, + ): Promise>; + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- untyped overload for back-compat callers that do not supply a schema type parameter + public queryOncePaged( + mapName: string, + filter: QueryFilter, + opts?: QueryOnceOptions, + ): Promise>; + public async queryOncePaged( + mapName: string, + filter: QueryFilter, + opts?: QueryOnceOptions, + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- implementation signature uses any to satisfy both overloads; return type is narrowed by the overload the caller selects + ): Promise> { + const timeoutMs = opts?.timeoutMs ?? DEFAULT_QUERY_ONCE_TIMEOUT_MS; + const allowLocal = opts?.allowLocal ?? false; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- handle storage is untyped internally; result type flows from the selected overload + const handle = new QueryHandle(this.syncEngine, mapName, filter); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- latest sorted snapshot for the active query; element type narrowed by the selected overload + let latest: QueryResultItem[] = []; + const unsubscribe = handle.subscribe((results) => { + latest = results; + }); + + const cleanup = (): void => { + unsubscribe(); + }; + + try { + if (!this.isClientOnline()) { + if (allowLocal) { + await this.flushLocalPreload(); + } + return this.handleUnsettledPaged('offline', mapName, latest, allowLocal, { + hasMore: false, + }); + } + + const settled = await this.raceSettle(handle, timeoutMs); + if (!settled) { + return this.handleUnsettledPaged('timeout', mapName, latest, allowLocal, { + hasMore: false, + }); + } + + const paginationInfo = handle.getPaginationInfo(); + return { + items: latest, + cursor: paginationInfo.nextCursor, + hasMore: paginationInfo.hasMore, + }; + } finally { + cleanup(); + } + } + /** * Resolves true when the query settles (first server QUERY_RESP), false when the * timeout fires first. Clears the timer on settlement so it never dangles. @@ -407,6 +495,29 @@ export class TopGunClient = any> { throw new QueryOnceUnsettledError(reason, mapName); } + /** + * Build the offline/timeout outcome for queryOncePaged. + * Mirrors handleUnsettled but returns QueryOncePagedResult on allowLocal. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- never actually returns; typed to satisfy the queryOncePaged return contract + private handleUnsettledPaged( + reason: QueryOnceUnsettledReason, + mapName: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- local snapshot element type flows from the selected overload at the call site + localData: QueryResultItem[], + allowLocal: boolean, + paginationFallback: { hasMore: boolean; cursor?: string }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- never actually returns; typed to satisfy the queryOncePaged return contract + ): Promise> { + if (allowLocal) { + throw new QueryOnceLocalError(reason, mapName, localData); + } + throw new QueryOnceUnsettledError(reason, mapName); + // paginationFallback would only be used in a non-throwing path; included for + // future extensibility. + void paginationFallback; + } + /** * Yield the microtask queue so the QueryHandle's async local pre-load * (loadInitialLocalData → onResult('local')) lands in our captured snapshot diff --git a/packages/client/src/__tests__/QueryHandle.test.ts b/packages/client/src/__tests__/QueryHandle.test.ts index 8ca4497a..1ef60409 100644 --- a/packages/client/src/__tests__/QueryHandle.test.ts +++ b/packages/client/src/__tests__/QueryHandle.test.ts @@ -1,7 +1,8 @@ import { QueryHandle } from '../QueryHandle'; import { SyncEngine } from '../SyncEngine'; -// Mock SyncEngine +// Mock SyncEngine — reused across tests; some test sections override subscribeToQuery +// per-test for precise call-count assertions. const mockSyncEngine = { subscribeToQuery: jest.fn(), unsubscribeFromQuery: jest.fn(), @@ -333,6 +334,151 @@ describe('QueryHandle', () => { }); }); + describe('loadMore()', () => { + /** + * Builds a fake SyncEngine whose subscribeToQuery immediately settles the + * QueryHandle with the provided items and pagination info. This lets + * loadMore() complete synchronously in tests without real networking. + */ + function makeFakeEngine(pageItems: { key: string; value: any }[], nextCursor?: string) { + const subscribeToQuery = jest.fn((h: QueryHandle) => { + // Deliver results synchronously so whenSettled() resolves in the next microtask. + h.onResult(pageItems, 'server'); + if (nextCursor !== undefined) { + h.updatePaginationInfo({ nextCursor, hasMore: true, cursorStatus: 'valid' }); + } else { + h.updatePaginationInfo({ hasMore: false, cursorStatus: 'none' }); + } + }); + + return { + subscribeToQuery, + unsubscribeFromQuery: jest.fn(), + runLocalQuery: jest.fn().mockResolvedValue([]), + } as unknown as SyncEngine; + } + + test('loadMore() appends page-2 rows without pruning page-1 rows (disjoint keys)', async () => { + // Page-2 engine returns keys p2a and p2b only. + const page2Engine = makeFakeEngine([ + { key: 'p2a', value: { name: 'Page2-A' } }, + { key: 'p2b', value: { name: 'Page2-B' } }, + ]); + + const handle = new QueryHandle(page2Engine, 'items', { limit: 2 }); + + // Seed page-1 results directly — simulates the initial QUERY_RESP. + let notified: any[] = []; + handle.subscribe((results) => { + notified = results; + }); + handle.onResult( + [ + { key: 'p1a', value: { name: 'Page1-A' } }, + { key: 'p1b', value: { name: 'Page1-B' } }, + ], + 'server', + ); + // Tell the handle there is a next page. + handle.updatePaginationInfo({ nextCursor: 'cursor-1', hasMore: true, cursorStatus: 'valid' }); + + await handle.loadMore(); + + // Both page-1 and page-2 keys must be present. + const keys = notified.map((r) => r._key); + expect(keys).toContain('p1a'); + expect(keys).toContain('p1b'); + expect(keys).toContain('p2a'); + expect(keys).toContain('p2b'); + expect(keys).toHaveLength(4); + }); + + test('loadMore() is a no-op when hasMore is false', async () => { + const fakeEngine = makeFakeEngine([]); + const handle = new QueryHandle(fakeEngine, 'items', {}); + handle.subscribe(jest.fn()); + handle.onResult([], 'server'); + handle.updatePaginationInfo({ hasMore: false, cursorStatus: 'none' }); + + // subscribeToQuery call count before loadMore + const callsBefore = (fakeEngine.subscribeToQuery as jest.Mock).mock.calls.length; + await handle.loadMore(); + + // No additional subscribeToQuery calls — no temp handle was created. + expect((fakeEngine.subscribeToQuery as jest.Mock).mock.calls.length).toBe(callsBefore); + }); + + test('concurrent loadMore() calls do not issue duplicate follow-up queries', async () => { + // The fake engine settles the temp handle immediately but we need to + // control timing to keep the first loadMore in-flight while the second fires. + // We use a deferred settle: subscribeToQuery captures the handle, and we + // resolve it manually after both loadMore calls have been issued. + let capturedHandle: QueryHandle | undefined; + let resolveSettle!: () => void; + const settlePromise = new Promise((resolve) => { + resolveSettle = resolve; + }); + + const deferredEngine = { + subscribeToQuery: jest.fn((h: QueryHandle) => { + capturedHandle = h; + // Settle asynchronously so both loadMore() calls can be issued first. + settlePromise.then(() => { + h.onResult([{ key: 'p2a', value: { name: 'Page2-A' } }], 'server'); + h.updatePaginationInfo({ hasMore: false, cursorStatus: 'none' }); + }); + }), + unsubscribeFromQuery: jest.fn(), + runLocalQuery: jest.fn().mockResolvedValue([]), + } as unknown as SyncEngine; + + const handle = new QueryHandle(deferredEngine, 'items', {}); + // handle.subscribe() triggers subscribeToQuery for the main handle (call #1). + handle.subscribe(jest.fn()); + handle.onResult([], 'server'); + handle.updatePaginationInfo({ nextCursor: 'cursor-1', hasMore: true, cursorStatus: 'valid' }); + + // Reset the call count AFTER main subscribe — we only care about temp-handle calls. + (deferredEngine.subscribeToQuery as jest.Mock).mockClear(); + + // Fire two concurrent loadMore calls — neither has resolved yet. + const p1 = handle.loadMore(); + const p2 = handle.loadMore(); + + // Allow first call to proceed. + resolveSettle(); + await Promise.all([p1, p2]); + + // subscribeToQuery should have been called exactly ONCE for the temp handle + // (the in-flight latch deduplicates the second concurrent call). + expect((deferredEngine.subscribeToQuery as jest.Mock).mock.calls.length).toBe(1); + void capturedHandle; // referenced to satisfy no-unused-vars + }); + + test('loadMore() advances paginationInfo to the new page cursor', async () => { + // Page-2 engine returns a cursor for page-3. + const page2Engine = makeFakeEngine( + [{ key: 'p2a', value: { name: 'Page2-A' } }], + 'cursor-for-page-3', + ); + + const handle = new QueryHandle(page2Engine, 'items', { limit: 1 }); + handle.subscribe(jest.fn()); + handle.onResult([{ key: 'p1a', value: { name: 'Page1-A' } }], 'server'); + handle.updatePaginationInfo({ + nextCursor: 'cursor-for-page-2', + hasMore: true, + cursorStatus: 'valid', + }); + + await handle.loadMore(); + + const info = handle.getPaginationInfo(); + expect(info.hasMore).toBe(true); + expect(info.nextCursor).toBe('cursor-for-page-3'); + }); + }); + describe('Subscriber isolation in notify()', () => { test('AC4: a throwing subscriber does not block later subscribers or propagate', () => { const handle = new QueryHandle(mockSyncEngine, 'items', {}); diff --git a/packages/client/src/__tests__/QueryOnce.test.ts b/packages/client/src/__tests__/QueryOnce.test.ts index d4eea6f4..718c41ea 100644 --- a/packages/client/src/__tests__/QueryOnce.test.ts +++ b/packages/client/src/__tests__/QueryOnce.test.ts @@ -2,6 +2,7 @@ import { TopGunClient } from '../TopGunClient'; import { QueryHandle } from '../QueryHandle'; import { SyncState } from '../SyncState'; import { QueryOnceUnsettledError, QueryOnceLocalError } from '../errors/QueryOnceError'; +import type { QueryOncePagedResult } from '../TopGunClient'; import type { IStorageAdapter, OpLogEntry } from '../IStorageAdapter'; // crypto.randomUUID is needed by the TopGunClient constructor in Node test envs. @@ -132,6 +133,23 @@ async function settleServer( handle.onResult(items, 'server'); } +/** Push a QUERY_RESP with pagination metadata into the active queryOncePaged handle. */ +async function settleServerPaged( + engine: SyncEngineDouble, + items: { key: string; value: unknown }[], + pagination: { nextCursor?: string; hasMore: boolean }, +): Promise { + await Promise.resolve(); + const handle = engine.lastHandle(); + if (!handle) throw new Error('queryOncePaged did not subscribe a handle'); + handle.onResult(items, 'server'); + handle.updatePaginationInfo({ + nextCursor: pagination.nextCursor, + hasMore: pagination.hasMore, + cursorStatus: pagination.nextCursor ? 'valid' : 'none', + }); +} + describe('TopGunClient.queryOnce', () => { describe('AC1 — returns authoritative server data not local []', () => { test('resolves with a server-only record (not [])', async () => { @@ -259,3 +277,87 @@ describe('TopGunClient.queryOnce', () => { }); }); }); + +describe('TopGunClient.queryOncePaged', () => { + describe('AC3 — resolves to { items, cursor, hasMore }', () => { + test('resolves with items, cursor, and hasMore from the server response', async () => { + const { client, engine } = makeClient(SyncState.CONNECTED); + + const promise = client.queryOncePaged('posts', { limit: 2 }); + + await settleServerPaged( + engine, + [ + { key: 'p1', value: { title: 'Post 1' } }, + { key: 'p2', value: { title: 'Post 2' } }, + ], + { nextCursor: 'cursor-abc', hasMore: true }, + ); + + const result: QueryOncePagedResult<{ title: string }> = await promise; + expect(result.items).toHaveLength(2); + expect(result.items[0]._key).toBe('p1'); + expect(result.items[1]._key).toBe('p2'); + expect(result.cursor).toBe('cursor-abc'); + expect(result.hasMore).toBe(true); + }); + + test('hasMore is false and cursor is undefined when server has no further pages', async () => { + const { client, engine } = makeClient(SyncState.CONNECTED); + + const promise = client.queryOncePaged('posts', { limit: 10 }); + + await settleServerPaged(engine, [{ key: 'p1', value: { title: 'Only Post' } }], { + hasMore: false, + }); + + const result = await promise; + expect(result.hasMore).toBe(false); + expect(result.cursor).toBeUndefined(); + expect(result.items).toHaveLength(1); + }); + + test('auto-unsubscribes after resolving (no live subscription leak)', async () => { + const { client, engine } = makeClient(SyncState.CONNECTED); + const promise = client.queryOncePaged('posts', {}); + await settleServerPaged(engine, [{ key: 'p1', value: {} }], { hasMore: false }); + await promise; + + expect(engine.unsubscribeFromQuery).toHaveBeenCalledTimes(1); + }); + }); + + describe('AC4 — offline policy matches queryOnce', () => { + test('default offline → throws QueryOnceUnsettledError', async () => { + const { client } = makeClient(SyncState.DISCONNECTED); + + await expect(client.queryOncePaged('posts', {})).rejects.toBeInstanceOf( + QueryOnceUnsettledError, + ); + await expect(client.queryOncePaged('posts', {})).rejects.toMatchObject({ + code: 'QUERY_ONCE_UNSETTLED', + reason: 'offline', + }); + }); + + test('default timeout → throws QueryOnceUnsettledError (timeout)', async () => { + const { client } = makeClient(SyncState.CONNECTED); + + await expect(client.queryOncePaged('posts', {}, { timeoutMs: 20 })).rejects.toMatchObject({ + code: 'QUERY_ONCE_UNSETTLED', + reason: 'timeout', + }); + }); + + test('allowLocal offline → throws QueryOnceLocalError carrying local snapshot', async () => { + const { client, engine } = makeClient(SyncState.DISCONNECTED); + engine.runLocalQuery.mockResolvedValue([{ key: 'p1', value: { title: 'LocalPost' } }]); + + const promise = client.queryOncePaged('posts', {}, { allowLocal: true }); + await Promise.resolve(); + await Promise.resolve(); + + await expect(promise).rejects.toBeInstanceOf(QueryOnceLocalError); + }); + }); +}); diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index d826c853..203ae6a7 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -143,7 +143,12 @@ export type { HttpSyncProviderConfig } from './connection/HttpSyncProvider'; export type { AutoConnectionProviderConfig } from './connection/AutoConnectionProvider'; // TopGunClient cluster config types -export type { TopGunClusterConfig, TopGunClientConfig, QueryOnceOptions } from './TopGunClient'; +export type { + TopGunClusterConfig, + TopGunClientConfig, + QueryOnceOptions, + QueryOncePagedResult, +} from './TopGunClient'; export { DEFAULT_QUERY_ONCE_TIMEOUT_MS } from './TopGunClient'; // queryOnce one-shot read errors diff --git a/packages/mcp-server/src/__tests__/tools.test.ts b/packages/mcp-server/src/__tests__/tools.test.ts index dd27969b..865729d1 100644 --- a/packages/mcp-server/src/__tests__/tools.test.ts +++ b/packages/mcp-server/src/__tests__/tools.test.ts @@ -59,8 +59,11 @@ class MockTopGunClient { // When set, hybridSearch rejects with this error to simulate server-side failures // (no embedding model, FTS not enabled for the map, etc.). hybridSearchRejection: Error | null = null; - // When set, queryOnce rejects with this error to simulate offline / not-settled. + // When set, queryOncePaged rejects with this error to simulate offline / not-settled. queryOnceRejection: Error | null = null; + // Override hasMore/cursor returned by queryOncePaged for pagination tests. + queryOncePagedHasMore = false; + queryOncePagedCursor: string | undefined = undefined; getMap(name: string): MockLWWMap { if (!this.maps.has(name)) { @@ -110,12 +113,17 @@ class MockTopGunClient { return this.hybridSearchResults; } - // One-shot read mirroring TopGunClient.queryOnce: resolves with settled, - // authoritative server data, or rejects to simulate offline / not-settled. - async queryOnce( + // One-shot paged read mirroring TopGunClient.queryOncePaged: resolves with + // settled, authoritative server data including cursor metadata, or rejects + // to simulate offline / not-settled. + async queryOncePaged( mapName: string, - filter: { where?: Record; limit?: number } = {}, - ): Promise & { _key: string }>> { + filter: { where?: Record; limit?: number; cursor?: string } = {}, + ): Promise<{ + items: Array & { _key: string }>; + cursor?: string; + hasMore: boolean; + }> { if (this.queryOnceRejection) { throw this.queryOnceRejection; } @@ -142,7 +150,20 @@ class MockTopGunClient { items = items.slice(0, filter.limit); } - return items; + return { + items, + cursor: this.queryOncePagedCursor, + hasMore: this.queryOncePagedHasMore, + }; + } + + // Legacy queryOnce for any remaining callers outside handleQuery + async queryOnce( + mapName: string, + filter: { where?: Record; limit?: number } = {}, + ): Promise & { _key: string }>> { + const result = await this.queryOncePaged(mapName, filter); + return result.items; } } @@ -240,20 +261,25 @@ describe('MCP Tools', () => { expect(result.content[0].text).toContain('5 result'); }); - it('should signal truncation when more rows match than the limit', async () => { + it('should signal truncation and provide continuation cursor when hasMore is true', async () => { const ctx = createTestContext(); - const map = (ctx.client as unknown as MockTopGunClient).getMap('tasks'); - for (let i = 0; i < 20; i++) { + const mockClient = ctx.client as unknown as MockTopGunClient; + const map = mockClient.getMap('tasks'); + for (let i = 0; i < 5; i++) { map.set(`task${i}`, { title: `Task ${i}`, index: i }); } + // Server-authoritative pagination signal: more results exist + mockClient.queryOncePagedHasMore = true; + mockClient.queryOncePagedCursor = 'next-page-cursor-abc'; const result = await handleQuery({ map: 'tasks', limit: 5 }, ctx); expect(result.isError).toBeUndefined(); - // The capped count is the requested limit, NOT the probe row. expect(result.content[0].text).toContain('5 result'); // The agent must be told the view was capped — never a silent truncation. expect(result.content[0].text).toContain('More rows match than were returned'); + // The real continuation cursor must be surfaced so the agent can page forward. + expect(result.content[0].text).toContain('cursor: "next-page-cursor-abc"'); }); it('should NOT signal truncation when results fit within the limit', async () => { @@ -268,19 +294,48 @@ describe('MCP Tools', () => { expect(result.content[0].text).not.toContain('More rows match'); }); - it('should ignore a now-removed cursor argument without erroring', async () => { + it('should thread cursor arg through to queryOncePaged when provided', async () => { const ctx = createTestContext(); - const map = (ctx.client as unknown as MockTopGunClient).getMap('tasks'); + const mockClient = ctx.client as unknown as MockTopGunClient; + const map = mockClient.getMap('tasks'); map.set('task1', { title: 'Only Task', status: 'todo' }); - // `cursor` is no longer part of the schema; Zod strips unknown keys, so the - // call must still succeed (not reject) and return normally. - const result = await handleQuery({ map: 'tasks', cursor: 'stale-cursor' }, ctx); + // Cursor is a valid schema field — the call must succeed and return normally. + const result = await handleQuery({ map: 'tasks', cursor: 'page-2-cursor' }, ctx); expect(result.isError).toBeUndefined(); expect(result.content[0].text).toContain('1 result'); }); + it('should return continuation cursor in result text when hasMore is true', async () => { + const ctx = createTestContext(); + const mockClient = ctx.client as unknown as MockTopGunClient; + mockClient.getMap('tasks').set('task1', { title: 'Task 1' }); + mockClient.queryOncePagedHasMore = true; + mockClient.queryOncePagedCursor = 'cursor-token-xyz'; + + const result = await handleQuery({ map: 'tasks', limit: 1 }, ctx); + + expect(result.isError).toBeUndefined(); + expect(result.content[0].text).toContain('cursor: "cursor-token-xyz"'); + // Should NOT say there is no cursor to page through + expect(result.content[0].text).not.toContain('no cursor to page through'); + }); + + it('should NOT include continuation note when hasMore is false', async () => { + const ctx = createTestContext(); + const mockClient = ctx.client as unknown as MockTopGunClient; + mockClient.getMap('tasks').set('task1', { title: 'Only Task', status: 'todo' }); + // Default: queryOncePagedHasMore = false + + const result = await handleQuery({ map: 'tasks', limit: 10 }, ctx); + + expect(result.isError).toBeUndefined(); + expect(result.content[0].text).toContain('1 result'); + expect(result.content[0].text).not.toContain('More rows match'); + expect(result.content[0].text).not.toContain('cursor:'); + }); + it('should deny access to restricted maps', async () => { const ctx = createTestContext({ allowedMaps: ['users'], diff --git a/packages/mcp-server/src/schemas.ts b/packages/mcp-server/src/schemas.ts index 3eaf639f..8af17c86 100644 --- a/packages/mcp-server/src/schemas.ts +++ b/packages/mcp-server/src/schemas.ts @@ -31,6 +31,13 @@ export const QueryArgsSchema = z.object({ .array(z.string()) .optional() .describe('Field names to return (projection). If omitted, all fields are returned.'), + cursor: z + .string() + .optional() + .describe( + 'Continuation cursor from a previous query response. ' + + 'Pass the cursor value from a prior result to retrieve the next page.', + ), }); export type QueryArgs = z.infer; @@ -177,6 +184,12 @@ export const toolSchemas = { items: { type: 'string' }, description: 'Field names to return (projection). If omitted, all fields are returned.', }, + cursor: { + type: 'string', + description: + 'Continuation cursor from a previous query response. ' + + 'Pass the cursor value from a prior result to retrieve the next page.', + }, }, required: ['map'], }, diff --git a/packages/mcp-server/src/tools/query.ts b/packages/mcp-server/src/tools/query.ts index af885e39..4da98426 100644 --- a/packages/mcp-server/src/tools/query.ts +++ b/packages/mcp-server/src/tools/query.ts @@ -1,7 +1,7 @@ /** * topgun_query - Query data from a TopGun map with filters * - * Resolves on the first settled server snapshot via client.queryOnce(). + * Resolves on the first settled server snapshot via client.queryOncePaged(). */ import type { QueryFilter } from '@topgunbuild/client'; @@ -14,7 +14,7 @@ export const queryTool: MCPTool = { description: 'Query data from a TopGun map with filters and sorting. ' + 'Use this to read data from the database. ' + - 'Supports filtering by field values, sorting, and pagination via limit.', + 'Supports filtering by field values, sorting, and pagination via limit and cursor.', inputSchema: toolSchemas.query as MCPTool['inputSchema'], }; @@ -31,7 +31,7 @@ export async function handleQuery(rawArgs: unknown, ctx: ToolContext): Promise & { _key: string }>; + type PagedResult = { + items: Array & { _key: string }>; + cursor?: string; + hasMore: boolean; + }; + type ClientWithPaged = { + queryOncePaged(map: string, filter: unknown): Promise; + }; + let pagedResult: PagedResult; try { - results = await ctx.client.queryOnce>(map, queryFilter); + pagedResult = await (ctx.client as unknown as ClientWithPaged).queryOncePaged( + map, + queryFilter, + ); } catch (error) { if (error instanceof QueryOnceUnsettledError) { const why = @@ -100,12 +111,7 @@ export async function handleQuery(rawArgs: unknown, ctx: ToolContext): Promise effectiveLimit; - if (truncated) { - results = results.slice(0, effectiveLimit); - } + const { items: results, cursor: nextCursor, hasMore } = pagedResult; // A settled-but-empty result is a legitimate "no matching records" answer, // distinct from the offline/not-settled branch handled above. @@ -127,24 +133,24 @@ export async function handleQuery(rawArgs: unknown, ctx: ToolContext): Promise { mockOnDelta.mockReturnValue(() => {}); // Unsubscribe function for changes mockOnPaginationChange.mockReturnValue(() => {}); mockOnSyncStateChange.mockReturnValue(() => {}); + mockLoadMore.mockResolvedValue(undefined); }); const wrapper = ({ children }: { children: React.ReactNode }) => ( @@ -108,4 +111,99 @@ describe('useQuery', () => { }); expect(result.current.data).toEqual([{ _key: 'item-2', id: '2', text: 'with-meta' }]); }); + + it('loadMore() delegates to handle.loadMore() when hasMore is true', async () => { + let paginationCallback: (info: any) => void; + mockOnPaginationChange.mockImplementation((cb) => { + paginationCallback = cb; + return () => {}; + }); + + const { result } = renderHook(() => useQuery('testMap', {}), { wrapper }); + + // Signal that more results are available + act(() => { + paginationCallback({ hasMore: true, nextCursor: 'cursor-abc', cursorStatus: 'valid' }); + }); + + await act(async () => { + await result.current.loadMore(); + }); + + expect(mockLoadMore).toHaveBeenCalledTimes(1); + }); + + it('loadMore() is a no-op when hasMore is false', async () => { + // Default paginationInfo has hasMore: false — no explicit trigger needed + const { result } = renderHook(() => useQuery('testMap', {}), { wrapper }); + + await act(async () => { + await result.current.loadMore(); + }); + + expect(mockLoadMore).not.toHaveBeenCalled(); + }); + + it('loadMore ref is stable across re-renders when hasMore stays true', async () => { + let paginationCallback: (info: any) => void; + mockOnPaginationChange.mockImplementation((cb) => { + paginationCallback = cb; + return () => {}; + }); + + let subscribeCallback: (results: any[]) => void; + mockSubscribe.mockImplementation((cb) => { + subscribeCallback = cb; + return () => {}; + }); + + const { result, rerender } = renderHook(() => useQuery('testMap', {}), { wrapper }); + + act(() => { + paginationCallback({ hasMore: true, nextCursor: 'cursor-abc', cursorStatus: 'valid' }); + }); + + const loadMoreBefore = result.current.loadMore; + + // Trigger a re-render by delivering new results (hasMore stays true) + act(() => { + subscribeCallback([{ _key: 'item-1' }]); + }); + rerender(); + + const loadMoreAfter = result.current.loadMore; + + expect(Object.is(loadMoreBefore, loadMoreAfter)).toBe(true); + }); + + it('loadMore identity changes when hasMore flips false→true and the new callback delegates', async () => { + let paginationCallback: (info: any) => void; + mockOnPaginationChange.mockImplementation((cb) => { + paginationCallback = cb; + return () => {}; + }); + + const { result } = renderHook(() => useQuery('testMap', {}), { wrapper }); + + // Initial state: hasMore defaults to false, so loadMore is a no-op. + const loadMoreBefore = result.current.loadMore; + await act(async () => { + await loadMoreBefore(); + }); + expect(mockLoadMore).not.toHaveBeenCalled(); + + // Pagination becomes available — the callback identity must change so the + // new closure observes hasMore: true (removing the dep would strand it as a no-op). + act(() => { + paginationCallback({ hasMore: true, nextCursor: 'cursor-abc', cursorStatus: 'valid' }); + }); + + const loadMoreAfter = result.current.loadMore; + expect(Object.is(loadMoreBefore, loadMoreAfter)).toBe(false); + + await act(async () => { + await loadMoreAfter(); + }); + expect(mockLoadMore).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/react/src/hooks/useQuery.ts b/packages/react/src/hooks/useQuery.ts index 869bfe74..c2982133 100644 --- a/packages/react/src/hooks/useQuery.ts +++ b/packages/react/src/hooks/useQuery.ts @@ -61,6 +61,8 @@ export interface UseQueryResult { * stable across renders unless at least one relevant key changes state. */ syncState: ReadonlyMap; + /** Load the next page of results. No-op when hasMore is false or no query is active. */ + loadMore: () => Promise; } /** @@ -153,6 +155,17 @@ export function useQuery( setLastChange(null); }, []); + // Stable reference: the underlying handle can change across query-filter changes, + // but this ref-based callback always routes to the current handle without + // creating a new function identity on every render. + const loadMore = useCallback((): Promise => { + const handle = handleRef.current; + if (!handle || !paginationInfo.hasMore) { + return Promise.resolve(); + } + return handle.loadMore(); + }, [paginationInfo.hasMore]); + // Memoize options callbacks to avoid unnecessary effect runs const optionsRef = useRef(options); optionsRef.current = options; @@ -268,8 +281,9 @@ export function useQuery( hasMore: paginationInfo.hasMore, cursorStatus: paginationInfo.cursorStatus, syncState, + loadMore, }), - [data, loading, error, lastChange, changes, clearChanges, paginationInfo, syncState], + [data, loading, error, lastChange, changes, clearChanges, paginationInfo, syncState, loadMore], ); } diff --git a/packages/server-rust/src/dag/converter.rs b/packages/server-rust/src/dag/converter.rs index 3c985303..136a01af 100644 --- a/packages/server-rust/src/dag/converter.rs +++ b/packages/server-rust/src/dag/converter.rs @@ -14,6 +14,7 @@ use anyhow::Result; use topgun_core::messages::base::{PredicateNode, PredicateOp, Query, SortDirection, SortField}; use crate::dag::types::{DagPlanDescriptor, Edge, ProcessorType, RoutingPolicy, VertexDescriptor}; +use crate::query::cursor::cursor_query_hashes; // --------------------------------------------------------------------------- // Private helpers @@ -59,17 +60,12 @@ fn predicate_to_config(predicate: &PredicateNode) -> Result { /// `CursorProcessor` can validate that the cursor was produced by the same query /// shape — a cursor from a different query would otherwise return incorrect results /// silently. Callers must only invoke this when `query.cursor` is `Some`. +/// +/// Hashes are computed via `cursor_query_hashes` — the single authoritative source +/// shared with the emission path in `query.rs` — making hash divergence impossible +/// by construction. fn build_cursor_vertex_config(cursor_str: &str, query: &Query) -> rmpv::Value { - use std::hash::{Hash, Hasher}; - - let hash_debug = |value: &dyn std::fmt::Debug| -> u64 { - let mut h = std::collections::hash_map::DefaultHasher::new(); - format!("{value:?}").hash(&mut h); - h.finish() - }; - - let predicate_hash: u64 = query.predicate.as_ref().map_or(0, |p| hash_debug(p)); - let sort_hash: u64 = query.sort.as_ref().map_or(0, |s| hash_debug(s)); + let (predicate_hash, sort_hash) = cursor_query_hashes(query); rmpv::Value::Map(vec![ ( @@ -460,7 +456,11 @@ impl QueryToDagConverter { // --- Step 5: Limit vertex (optional) --- if let Some(limit) = query.limit { - let limit_config = rmpv::Value::Integer(rmpv::Integer::from(u64::from(limit))); + // Request limit+1 rows so the emission site can detect whether more records + // exist beyond the page boundary without a separate count query. The extra + // sentinel row is never returned to callers — the emission site truncates to + // `limit` and sets `has_more = true` when it observes limit+1 rows. + let limit_config = rmpv::Value::Integer(rmpv::Integer::from(u64::from(limit) + 1)); vertices.push(VertexDescriptor { name: "limit".to_string(), @@ -782,13 +782,13 @@ mod tests { "limit must come before collector" ); - // Verify limit config + // Verify limit config is limit+1 (the sentinel row for has_more detection). let limit_vertex = &desc.vertices[limit_idx]; let config = limit_vertex .config .as_ref() .expect("limit should have config"); - assert_eq!(config.as_u64(), Some(10)); + assert_eq!(config.as_u64(), Some(11)); // limit=10 → config=11 (limit+1 sentinel) } // --- Sort + Limit vertex ordering --- diff --git a/packages/server-rust/src/dag/coordinator.rs b/packages/server-rust/src/dag/coordinator.rs index 964a12ee..bdb93822 100644 --- a/packages/server-rust/src/dag/coordinator.rs +++ b/packages/server-rust/src/dag/coordinator.rs @@ -34,6 +34,44 @@ use crate::network::connection::{ConnectionKind, ConnectionRegistry, OutboundMes use crate::storage::factory::RecordStoreFactory; use topgun_core::messages::base::{Aggregation, Query}; +// --------------------------------------------------------------------------- +// DistributedQueryResult +// --------------------------------------------------------------------------- + +/// Result type returned by `execute_distributed`. +/// +/// The coordinator fetches `limit+1` rows internally so it can detect whether more +/// records exist beyond the page boundary. It absorbs the sentinel row — truncating +/// `rows` to `limit` — and surfaces the finding as an explicit `has_more` bool. This +/// keeps the public contract honest: `rows.len() <= limit` always holds, and no caller +/// needs to strip a sentinel or count rows to infer pagination state. +/// +/// `Debug` is required because internal unit tests use `{results:?}` format strings. +#[derive(Debug)] +pub struct DistributedQueryResult { + /// Result rows, always `<= limit` for queries that carry a limit. + pub rows: Vec, + /// `true` when the merged candidate set exceeded `limit` (i.e. more pages remain). + pub has_more: bool, +} + +/// Absorbs the `limit+1` sentinel from a raw DAG output vector. +/// +/// When a DAG pipeline is configured with `limit+1`, the returned vector may contain +/// up to `limit+1` rows. If it does, more records exist beyond the page boundary. +/// This helper detects that case, truncates to `limit`, and returns the `has_more` +/// signal so callers do not need to reason about the sentinel directly. +fn absorb_sentinel(mut rows: Vec, limit: Option) -> (Vec, bool) { + match limit { + Some(lim) => { + let has_more = rows.len() > lim as usize; + rows.truncate(lim as usize); + (rows, has_more) + } + None => (rows, false), + } +} + // --------------------------------------------------------------------------- // ClusterQueryCoordinator // --------------------------------------------------------------------------- @@ -88,7 +126,7 @@ impl ClusterQueryCoordinator { } } - /// Executes a distributed query and returns the merged result rows. + /// Executes a distributed query and returns a `DistributedQueryResult`. /// /// Steps: /// 1. Retrieve active members and build partition assignment map @@ -100,6 +138,10 @@ impl ClusterQueryCoordinator { /// 7. Await completions with timeout from `config.timeout_ms` /// 8. Merge results; run GROUP BY combine pass if needed /// + /// The returned `DistributedQueryResult.rows` always contains at most `limit` rows + /// (the `limit+1` sentinel is absorbed internally). `has_more` is `true` when the + /// merged candidate set exceeded `limit`. + /// /// # Errors /// Returns an error if any node reports failure, the timeout expires, or /// the descriptor/plan cannot be serialized. @@ -107,7 +149,7 @@ impl ClusterQueryCoordinator { &self, query: &Query, map_name: &str, - ) -> Result> { + ) -> Result { // Step 1: membership and partition assignment let members_view = self.cluster_service.members_view(); let active_members = members_view.active_members(); @@ -123,11 +165,15 @@ impl ClusterQueryCoordinator { .map(|nid| (nid.clone(), partition_table.partitions_for_node(nid))) .collect(); - // Step 2: single-node bypass + // Step 2: single-node bypass — wrap local result in DistributedQueryResult. + // The DAG already ran with limit+1 (from the converter), so we detect has_more + // and truncate here using the same sentinel pattern as the multi-node path. if !QueryToDagConverter::needs_distribution(query, &partition_assignment) { - return self + let raw = self .execute_local(query, map_name, &partition_assignment) - .await; + .await?; + let (rows, has_more) = absorb_sentinel(raw, query.limit); + return Ok(DistributedQueryResult { rows, has_more }); } // Step 3: build plan descriptor @@ -299,7 +345,7 @@ impl ClusterQueryCoordinator { node_results: Vec>, query: &Query, _descriptor: &DagPlanDescriptor, - ) -> Result> { + ) -> Result { use crate::dag::executor::{VecDequeInbox, VecDequeOutbox}; use crate::dag::types::ProcessorContext; @@ -342,19 +388,25 @@ impl ClusterQueryCoordinator { } /// Applies coordinator-side global sort (when `query.sort` is non-empty) and - /// global limit (when `query.limit` is set) to an already-materialized row set. + /// global limit+1 (when `query.limit` is set) to an already-materialized row set, + /// then absorbs the sentinel and returns a `DistributedQueryResult`. + /// + /// Fetching `limit+1` globally (instead of exactly `limit`) lets the coordinator + /// detect whether more records exist beyond the page boundary without a separate + /// count query. The sentinel row is absorbed here — `rows` always contains at most + /// `limit` entries — and `has_more` is set accordingly. /// /// Reuses `SortProcessor` and `LimitProcessor` via the same /// `VecDequeInbox`/`VecDequeOutbox` drive pattern used by `combine_group_by_results` /// so global sort and per-node sort always agree by construction on field/direction. /// /// When sort is absent the rows pass through unchanged (no implicit ordering imposed). - /// When limit is absent no row-count clamp is applied. + /// When limit is absent no row-count clamp is applied and `has_more` is always false. fn apply_global_sort_and_limit( &self, rows: Vec, query: &Query, - ) -> Result> { + ) -> Result { use crate::dag::executor::{VecDequeInbox, VecDequeOutbox}; use crate::dag::types::ProcessorContext; use topgun_core::messages::base::SortDirection; @@ -402,10 +454,14 @@ impl ClusterQueryCoordinator { rows }; - // Apply the global limit when the query requests it. + // Apply the global limit+1 when the query requests it. The extra row is the + // sentinel: if we receive limit+1 rows, it means more records exist beyond this + // page. We truncate to limit before returning so the contract stays honest. if let Some(limit) = query.limit { + // Request limit+1 to detect whether more records exist. + let fetch_limit = u32::saturating_add(limit, 1); let n = sorted.len(); - let mut limit_procs = LimitProcessorSupplier { limit }.get(1); + let mut limit_procs = LimitProcessorSupplier { limit: fetch_limit }.get(1); let mut limit_proc = limit_procs .pop() .ok_or_else(|| anyhow!("LimitProcessorSupplier returned no processors"))?; @@ -427,9 +483,20 @@ impl ClusterQueryCoordinator { } limit_proc.process(0, &mut inbox, &mut limit_outbox)?; - Ok(limit_outbox.drain_bucket(0).collect()) + let mut candidate: Vec = limit_outbox.drain_bucket(0).collect(); + let merged_len = candidate.len(); + let has_more = merged_len > limit as usize; + // Absorb the sentinel: truncate to exactly limit rows before returning. + candidate.truncate(limit as usize); + Ok(DistributedQueryResult { + rows: candidate, + has_more, + }) } else { - Ok(sorted) + Ok(DistributedQueryResult { + rows: sorted, + has_more: false, + }) } } } @@ -1004,16 +1071,16 @@ mod tests { ) .expect("combine should succeed"); - assert_eq!(merged.len(), 5, "expected 5 distinct groups (A-E)"); + assert_eq!(merged.rows.len(), 5, "expected 5 distinct groups (A-E)"); - let total_count: u64 = merged.iter().map(get_count).sum(); + let total_count: u64 = merged.rows.iter().map(get_count).sum(); assert_eq!( total_count, 100, "total count across all groups should be 100" ); // Each group should have count 20 - for item in &merged { + for item in &merged.rows { let count = get_count(item); assert_eq!( count, 20, @@ -1081,7 +1148,10 @@ mod tests { // Local execution should succeed (no records = empty result) assert!(result.is_ok(), "local bypass should succeed: {result:?}"); - assert!(result.unwrap().is_empty(), "empty store returns no results"); + assert!( + result.unwrap().rows.is_empty(), + "empty store returns no results" + ); // No completion registry entries should have been created assert_eq!( @@ -1353,11 +1423,17 @@ mod tests { .await .expect("page 1 query should succeed"); - // Page 1 must have exactly 3 results. - assert_eq!(page1_results.len(), 3, "page 1 must return 3 rows"); + // Page 1 must have exactly 3 results; coordinator absorbs the limit+1 sentinel. + assert_eq!(page1_results.rows.len(), 3, "page 1 must return 3 rows"); + // More records exist beyond page 1. + assert!( + page1_results.has_more, + "page 1 has_more must be true (6 records, limit=3)" + ); // Extract Int values from page 1 results. let page1_ints: Vec = page1_results + .rows .iter() .filter_map(|v| { if let rmpv::Value::Map(pairs) = v { @@ -1387,7 +1463,7 @@ mod tests { // Build the cursor from the last record on page 1. // The last record has Int=30 and _key="rec-c". - let last = &page1_results[2]; + let last = &page1_results.rows[2]; let last_key = if let rmpv::Value::Map(pairs) = last { pairs.iter().find_map(|(k, v)| { if k.as_str() == Some("_key") { @@ -1461,9 +1537,15 @@ mod tests { .expect("page 2 query should succeed"); // Page 2 must have exactly 3 results (rec-d=40, rec-e=50, rec-f=60). - assert_eq!(page2_results.len(), 3, "page 2 must return 3 rows"); + assert_eq!(page2_results.rows.len(), 3, "page 2 must return 3 rows"); + // Page 2 exhausts the dataset — no more pages. + assert!( + !page2_results.has_more, + "page 2 has_more must be false (exhausted)" + ); let page2_ints: Vec = page2_results + .rows .iter() .filter_map(|v| { if let rmpv::Value::Map(pairs) = v { @@ -1573,7 +1655,7 @@ mod tests { // With a hash-mismatched cursor, the CursorProcessor rejects all items. assert!( - results.is_empty(), + results.rows.is_empty(), "hash-mismatched cursor must return no results, got: {results:?}" ); } @@ -1647,7 +1729,7 @@ mod tests { // Expired cursor: all items must be rejected. assert!( - results.is_empty(), + results.rows.is_empty(), "expired cursor must return no results, got: {results:?}" ); } diff --git a/packages/server-rust/src/network/handlers/http_sync.rs b/packages/server-rust/src/network/handlers/http_sync.rs index b41e52f2..76bddf49 100644 --- a/packages/server-rust/src/network/handlers/http_sync.rs +++ b/packages/server-rust/src/network/handlers/http_sync.rs @@ -26,8 +26,7 @@ use topgun_core::Timestamp; use super::auth_validator::AuthValidationContext; use super::AppState; use crate::query::cursor::{ - decode_cursor, encode_cursor, is_after_cursor, rmpv_to_json_value, validate_cursor_expiry, - CursorData, SortValue, + build_next_cursor, decode_cursor, is_after_cursor, validate_cursor_expiry, }; use crate::service::dispatch::PartitionDispatcher; use crate::service::domain::predicate::{execute_query, value_to_rmpv}; @@ -490,39 +489,18 @@ async fn dispatch_queries( let truncated = total_after > lim; let page_entries: Vec<_> = after_cursor.into_iter().take(lim).collect(); + // Use the shared emission helper with the incoming cursor's hashes so the + // follow-up page uses the same predicate/sort context as the current page. let nc = if truncated { page_entries.last().map(|last| { - // Rebuild sort_values for the next cursor by extracting the last - // seen value for each sort field from the final entry in the page. - let sort_values: Vec = cursor_data - .sort_values - .iter() - .map(|sv| { - let value = if let rmpv::Value::Map(ref pairs) = last.value { - pairs - .iter() - .find(|(k, _)| k.as_str() == Some(sv.field.as_str())) - .and_then(|(_, v)| rmpv_to_json_value(v)) - .unwrap_or(serde_json::Value::Null) - } else { - serde_json::Value::Null - }; - SortValue { - field: sv.field.clone(), - value, - direction: sv.direction.clone(), - } - }) - .collect(); - - let next = CursorData { - sort_values, - last_key: last.key.clone(), - predicate_hash: cursor_data.predicate_hash, - sort_hash: cursor_data.sort_hash, - timestamp: now_ms, - }; - encode_cursor(&next) + build_next_cursor( + &last.key, + &last.value, + &cursor_data.sort_values, + cursor_data.predicate_hash, + cursor_data.sort_hash, + now_ms, + ) }) } else { None @@ -546,19 +524,13 @@ async fn dispatch_queries( let page_entries: Vec<_> = filtered.into_iter().skip(offset).take(lim).collect(); let truncated = total_filtered > offset + lim; - // Generate next_cursor from the last entry when more results exist. - // Key-based ordering uses an empty sort_values list (key tie-break only). + // Offset-based pagination uses an empty sort template (key tie-break only) + // and hard-coded 0/0 hashes because offset cursors do not validate hashes + // via CursorProcessor — they rely solely on the last_key position. let nc = if truncated { - page_entries.last().map(|last| { - let next = CursorData { - sort_values: vec![], - last_key: last.key.clone(), - predicate_hash: 0, - sort_hash: 0, - timestamp: now_ms, - }; - encode_cursor(&next) - }) + page_entries + .last() + .map(|last| build_next_cursor(&last.key, &last.value, &[], 0, 0, now_ms)) } else { None }; @@ -737,6 +709,7 @@ mod tests { use super::*; use crate::network::handlers::auth::{decode_jwt_key, JwtClaims}; use crate::network::NetworkConfig; + use crate::query::cursor::{decode_cursor, encode_cursor, CursorData, SortValue}; use arc_swap::ArcSwap; use jsonwebtoken::{EncodingKey, Header}; use serde::Serialize; diff --git a/packages/server-rust/src/query/cursor.rs b/packages/server-rust/src/query/cursor.rs index 37d4e523..394f4274 100644 --- a/packages/server-rust/src/query/cursor.rs +++ b/packages/server-rust/src/query/cursor.rs @@ -5,7 +5,8 @@ //! the HTTP-specific `HttpCursorData` and any future per-transport cursors from //! diverging silently. -use topgun_core::messages::base::SortDirection; +use topgun_core::messages::base::{Query, SortDirection}; +use topgun_core::messages::query::CursorStatus; // --------------------------------------------------------------------------- // CursorData @@ -175,6 +176,119 @@ pub fn validate_cursor_expiry(cursor: &CursorData, now_ms: i64) -> bool { now_ms - cursor.timestamp <= CURSOR_TTL_MS } +/// Classifies the status to report for an incoming query cursor, reusing the exact +/// expiry + hash checks the DAG's `CursorProcessor` applies. Because the emission path +/// and the DAG both validate against the same `(predicate_hash, sort_hash)` derived from +/// `cursor_query_hashes`, the reported status agrees with the DAG's accept/reject decision +/// by construction. +/// +/// Expiry is checked before hash-match so a stale-but-shape-matching token reports +/// `Expired` (the client should refresh) rather than `Invalid`. Returns `None` when no +/// cursor was supplied. This is deterministic and does NOT infer rejection from an empty +/// result page — a legitimately empty final page stays `Valid`. +#[must_use] +pub fn classify_cursor_status( + cursor: Option<&CursorData>, + now_ms: i64, + predicate_hash: u64, + sort_hash: u64, +) -> CursorStatus { + match cursor { + None => CursorStatus::None, + Some(c) if !validate_cursor_expiry(c, now_ms) => CursorStatus::Expired, + Some(c) if !validate_cursor_hashes(c, predicate_hash, sort_hash) => CursorStatus::Invalid, + Some(_) => CursorStatus::Valid, + } +} + +// --------------------------------------------------------------------------- +// Shared hash computation for cursor validation +// --------------------------------------------------------------------------- + +/// Computes the predicate and sort hashes for a query in a single authoritative place. +/// +/// Both the cursor *consumption* path (`build_cursor_vertex_config` in `dag/converter.rs`) +/// and the cursor *emission* path (`query.rs::handle_query_subscribe`) must call this +/// function so their hashes are identical by construction — making hash divergence +/// structurally impossible regardless of how query fields evolve. +/// +/// Returns `(predicate_hash, sort_hash)`. Either value is `0` when the corresponding +/// field is absent from the query. +/// +/// Hash stability note: uses `DefaultHasher` over `format!("{:?}")` — acceptable for +/// short-lived (≤10 min TTL) cursors always validated by the same binary that emitted them. +#[must_use] +pub fn cursor_query_hashes(query: &Query) -> (u64, u64) { + use std::hash::{Hash, Hasher}; + + let hash_debug = |value: &dyn std::fmt::Debug| -> u64 { + let mut h = std::collections::hash_map::DefaultHasher::new(); + format!("{value:?}").hash(&mut h); + h.finish() + }; + + let predicate_hash = query.predicate.as_ref().map_or(0, |p| hash_debug(p)); + let sort_hash = query.sort.as_ref().map_or(0, |s| hash_debug(s)); + + (predicate_hash, sort_hash) +} + +/// Builds an encoded keyset cursor from the last entry in a page. +/// +/// This is the single authoritative cursor-encode emission helper. Both the HTTP sync +/// handler and the WS query handler call this function so there is exactly one encoding +/// path — no duplicated `CursorData` construction logic. +/// +/// Takes `predicate_hash`/`sort_hash` as parameters (hash-source-agnostic) so callers +/// can supply their own hash sources without coupling the encode logic to any particular +/// derivation strategy: +/// - WS path: passes values from `cursor_query_hashes(query)` +/// - HTTP cursor branch: passes values copied from the incoming cursor +/// - HTTP offset branch: passes `0/0` (offset-based cursors do not validate hashes) +/// +/// `sort_values_template` provides the sort-field names and directions (from the query's +/// sort spec or from the incoming cursor's `sort_values`). Field values are extracted +/// from `last_entry` by name. +#[must_use] +pub fn build_next_cursor( + last_entry_key: &str, + last_entry_value: &rmpv::Value, + sort_values_template: &[SortValue], + predicate_hash: u64, + sort_hash: u64, + now_ms: i64, +) -> String { + let sort_values: Vec = sort_values_template + .iter() + .map(|sv| { + let value = if let rmpv::Value::Map(ref pairs) = last_entry_value { + pairs + .iter() + .find(|(k, _)| k.as_str() == Some(sv.field.as_str())) + .and_then(|(_, v)| rmpv_to_json_value(v)) + .unwrap_or(serde_json::Value::Null) + } else { + serde_json::Value::Null + }; + SortValue { + field: sv.field.clone(), + value, + direction: sv.direction.clone(), + } + }) + .collect(); + + let cursor_data = CursorData { + sort_values, + last_key: last_entry_key.to_string(), + predicate_hash, + sort_hash, + timestamp: now_ms, + }; + + encode_cursor(&cursor_data) +} + // --------------------------------------------------------------------------- // rmpv / JSON comparison helper // --------------------------------------------------------------------------- @@ -626,6 +740,69 @@ mod tests { assert!(!validate_cursor_expiry(&cursor, now_ms)); } + // ----------------------------------------------------------------------- + // classify_cursor_status + // ----------------------------------------------------------------------- + + #[test] + fn classify_cursor_status_none_when_absent() { + assert_eq!( + classify_cursor_status(None, 1_700_000_000_000, 42, 99), + CursorStatus::None + ); + } + + #[test] + fn classify_cursor_status_valid_when_fresh_and_matching() { + let now_ms = 1_700_000_000_000i64; + let cursor = CursorData { + sort_values: vec![], + last_key: "k".to_string(), + predicate_hash: 42, + sort_hash: 99, + timestamp: now_ms - 60_000, // within TTL + }; + assert_eq!( + classify_cursor_status(Some(&cursor), now_ms, 42, 99), + CursorStatus::Valid + ); + } + + #[test] + fn classify_cursor_status_invalid_on_hash_mismatch() { + let now_ms = 1_700_000_000_000i64; + let cursor = CursorData { + sort_values: vec![], + last_key: "k".to_string(), + predicate_hash: 42, + sort_hash: 99, + timestamp: now_ms - 60_000, // fresh, so only the hash differs + }; + // Query shape changed under the same token → Invalid, not Valid. + assert_eq!( + classify_cursor_status(Some(&cursor), now_ms, 43, 99), + CursorStatus::Invalid + ); + } + + #[test] + fn classify_cursor_status_expired_takes_precedence_over_hash() { + let now_ms = 1_700_000_000_000i64; + // Stale AND hash-mismatched: expiry is checked first, so it reports Expired + // (the client should refresh the token) rather than Invalid. + let cursor = CursorData { + sort_values: vec![], + last_key: "k".to_string(), + predicate_hash: 1, + sort_hash: 2, + timestamp: now_ms - 11 * 60 * 1000, // past TTL + }; + assert_eq!( + classify_cursor_status(Some(&cursor), now_ms, 42, 99), + CursorStatus::Expired + ); + } + // ----------------------------------------------------------------------- // compare_rmpv_to_json // ----------------------------------------------------------------------- diff --git a/packages/server-rust/src/service/domain/query.rs b/packages/server-rust/src/service/domain/query.rs index eca5d7a7..dc7f8899 100644 --- a/packages/server-rust/src/service/domain/query.rs +++ b/packages/server-rust/src/service/domain/query.rs @@ -25,6 +25,9 @@ use topgun_core::messages::{Message, SyncRespRootMessage, SyncRespRootPayload}; use topgun_core::vector::distance::DistanceMetric; use crate::dag::coordinator::{run_dag_local, ClusterQueryCoordinator}; +use crate::query::cursor::{ + build_next_cursor, classify_cursor_status, cursor_query_hashes, decode_cursor, SortValue, +}; use tracing::Instrument; @@ -651,6 +654,38 @@ impl QueryService { } } + // Capture cursor-emission inputs before `query` is moved into QuerySubscription. + // Hashes are computed via the single authoritative source shared with the + // consume-side path in converter.rs, making hash divergence impossible. + let (predicate_hash, sort_hash) = cursor_query_hashes(&query); + let query_limit = query.limit; + // Decode the incoming cursor (if any) before `query` is moved, so the emission + // site can report an accurate cursor_status. Validation reuses the same helpers + // the DAG's CursorProcessor runs, against the same hashes computed above — so the + // status reported here agrees with the DAG's accept/reject decision by construction. + let input_cursor = query.cursor.as_deref().and_then(decode_cursor); + // Build a sort_values template from the query's sort spec for cursor construction. + let sort_values_template: Vec = query + .sort + .as_ref() + .map(|sf| { + sf.iter() + .map(|f| SortValue { + field: f.field.clone(), + value: serde_json::Value::Null, // placeholder; real values extracted per-entry + direction: f.direction.clone(), + }) + .collect() + }) + .unwrap_or_default(); + + // `coordinator_has_more` is set by the coordinator branch, which already absorbs + // the limit+1 sentinel internally. The local/predicate branches detect has_more + // from the raw DAG output length after running with limit+1. + let mut coordinator_has_more = false; + // For local/predicate branches, track whether more records existed beyond the page. + let mut local_has_more = false; + // Canonical single-node engine: run the structured query through the DAG // pipeline locally over this map's partitions (Scan→Filter→Cursor→Sort→Limit, // or group-by aggregate). Multi-field sort, limit, and the cursor stage are @@ -694,18 +729,21 @@ impl QueryService { entries }; + // Linear engine does not use the limit+1 sentinel; has_more comes from + // max_query_records clamping only for this legacy path. predicate_execute_query(entries, &query) } else if let Some(ref coordinator) = self.coordinator { // Distributed path: the coordinator fans out to all owning nodes, collects // per-node results, and applies the global sort+limit merge (SPEC-301). // The coordinator's single-node bypass routes back through run_dag_local // when only one member is active, keeping single-node behaviour identical. - let raw = coordinator + let dist_result = coordinator .execute_distributed(&query, &map_name) .await .map_err(|e| OperationError::Internal(anyhow::anyhow!("{e}")))?; - map_dag_rows_to_entries(raw) + coordinator_has_more = dist_result.has_more; + map_dag_rows_to_entries(dist_result.rows) } else { let partition_ids: Vec = stores.iter().map(|s| s.partition_id()).collect(); let raw = run_dag_local( @@ -718,10 +756,27 @@ impl QueryService { .await .map_err(|e| OperationError::Internal(anyhow::anyhow!("{e}")))?; - map_dag_rows_to_entries(raw) + // The DAG ran with limit+1 (converter.rs), so if we got limit+1 rows it + // means more records exist. Detect and absorb the sentinel before mapping. + let (raw_truncated, more) = if let Some(lim) = query_limit { + let has_sentinel = raw.len() > lim as usize; + let mut r = raw; + r.truncate(lim as usize); + (r, has_sentinel) + } else { + (raw, false) + }; + local_has_more = more; + + map_dag_rows_to_entries(raw_truncated) }; - // Apply max_query_records clamping + // Reconcile per-branch has_more signals: coordinator branch already absorbed + // its sentinel and set coordinator_has_more; local/predicate branches set + // local_has_more from the limit+1 sentinel detection above. + let page_has_more = coordinator_has_more || local_has_more; + + // Apply max_query_records clamping — both sources of has_more are unified here. let total_count = results.len(); let max = self.max_query_records as usize; let has_more = if total_count > max { @@ -733,6 +788,8 @@ impl QueryService { ); results.truncate(max); Some(true) + } else if page_has_more { + Some(true) } else { None }; @@ -774,7 +831,47 @@ impl QueryService { .as_ref() .map(|m| m.aggregate_query_root_hash(&query_id, &map_name)); - // Register standing subscription (with fields for future QUERY_UPDATE projection) + // Emit a real keyset cursor when the query has a limit, more records exist, and + // a sort shape is available to derive a keyset position. The cursor is built from + // the last entry in the result page using the single authoritative emission helper. + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX)) + .unwrap_or(0); + + let next_cursor = if has_more == Some(true) + && query_limit.is_some() + && !sort_values_template.is_empty() + { + results.last().map(|last| { + build_next_cursor( + &last.key, + &last.value, + &sort_values_template, + predicate_hash, + sort_hash, + now_ms, + ) + }) + } else { + None + }; + + // Reflect the input cursor processing outcome. Re-validate the decoded cursor with + // the same checks the DAG's CursorProcessor applies (expiry first, then hash-match + // against this query's predicate/sort hashes), so the client can distinguish a stale + // token (restart pagination) from genuine exhaustion. This is deterministic — it does + // NOT infer rejection from an empty result page, which would mislabel a legitimately + // empty final page as expired/invalid. + let cursor_status = Some(classify_cursor_status( + input_cursor.as_ref(), + now_ms, + predicate_hash, + sort_hash, + )); + + // Register standing subscription (with fields for future QUERY_UPDATE projection). + // Must happen AFTER hashes are captured (query is moved here). let subscription = QuerySubscription { query_id: query_id.clone(), connection_id, @@ -790,9 +887,9 @@ impl QueryService { payload: QueryRespPayload { query_id, results, - next_cursor: None, + next_cursor, has_more, - cursor_status: None, + cursor_status, merkle_root_hash, }, }); diff --git a/packages/server-rust/src/sim/cluster.rs b/packages/server-rust/src/sim/cluster.rs index d3aedc3b..ce4097f2 100644 --- a/packages/server-rust/src/sim/cluster.rs +++ b/packages/server-rust/src/sim/cluster.rs @@ -1001,7 +1001,7 @@ impl SimCluster { // single-node bypass routes this through execute_local → run_dag_local → // DagExecutor without needing cluster fan-out or a connection context // (execute_distributed takes only the query + map name, no auth ctx). - let raw_results = node + let dist_result = node .coordinator .execute_distributed(&query, map_name) .await?; @@ -1009,7 +1009,8 @@ impl SimCluster { // Map raw rows to QueryResultEntry. Non-GROUP-BY DAG rows do not carry // a `__key` field (that is GROUP-BY-specific). Use synthetic row keys // so callers can identify entries; assert on .value content, not .key. - let results: Vec = raw_results + let results: Vec = dist_result + .rows .into_iter() .enumerate() .map(|(i, val)| { @@ -1734,22 +1735,28 @@ mod tests { ..Default::default() }; - let raw_results = cluster.nodes[0] + let dist_result = cluster.nodes[0] .coordinator .execute_distributed(&query, map_name) .await .expect("distributed query should succeed"); - // Exactly `limit` rows must be returned (never up to N×per-node-limit). + // Exactly `limit` rows must be returned (sentinel absorbed internally). // Vacuity: naive concat without global limit could return up to 6 rows. assert_eq!( - raw_results.len(), + dist_result.rows.len(), 4, "global limit=4 must yield exactly 4 rows, not up to N×per-node-limit" ); + // 6 records with limit=4 → more pages exist. + assert!( + dist_result.has_more, + "has_more must be true when 6 records are present and limit=4" + ); // Extract scores from raw rmpv::Value results. - let scores: Vec = raw_results + let scores: Vec = dist_result + .rows .iter() .filter_map(|v| { if let rmpv::Value::Map(pairs) = v { @@ -1922,15 +1929,24 @@ mod tests { ..Default::default() }; - let page1_raw = cluster.nodes[0] + let page1_result = cluster.nodes[0] .coordinator .execute_distributed(&page1_query, map_name) .await .expect("page 1 query should succeed"); - assert_eq!(page1_raw.len(), 3, "page 1 must have exactly 3 rows"); + assert_eq!( + page1_result.rows.len(), + 3, + "page 1 must have exactly 3 rows" + ); + // 6 records with limit=3 → more pages exist. + assert!( + page1_result.has_more, + "page 1 has_more must be true (6 records, limit=3)" + ); - let page1_scores: Vec = page1_raw.iter().filter_map(get_score).collect(); + let page1_scores: Vec = page1_result.rows.iter().filter_map(get_score).collect(); assert_eq!( page1_scores, @@ -2002,15 +2018,24 @@ mod tests { ..Default::default() }; - let page2_raw = cluster.nodes[0] + let page2_result = cluster.nodes[0] .coordinator .execute_distributed(&page2_query, map_name) .await .expect("page 2 query should succeed"); - assert_eq!(page2_raw.len(), 3, "page 2 must have exactly 3 rows"); + assert_eq!( + page2_result.rows.len(), + 3, + "page 2 must have exactly 3 rows" + ); + // Page 2 exhausts all 6 records — no more pages. + assert!( + !page2_result.has_more, + "page 2 has_more must be false (last page exhausts the dataset)" + ); - let page2_scores: Vec = page2_raw.iter().filter_map(get_score).collect(); + let page2_scores: Vec = page2_result.rows.iter().filter_map(get_score).collect(); // Globally-correct order: [40, 50, 60]. assert_eq!( @@ -2050,6 +2075,402 @@ mod tests { bridge_1_to_0.abort(); } + // ----------------------------------------------------------------------- + // Partition emission test: paged WS query under network fault emits a cursor + // whose follow-up page returns the correct next slice. + // + // Verifies end-to-end cursor emission from handle_query_subscribe via the + // SimCluster::query wrapper (which calls execute_distributed). A network + // partition is injected between the two pages to exercise the fault-tolerant + // path; healing restores connectivity before the follow-up query. + // ----------------------------------------------------------------------- + + #[tokio::test] + #[cfg(feature = "simulation")] + #[allow(clippy::too_many_lines)] + async fn paged_ws_query_under_partition_emits_cursor_and_correct_follow_up() { + use crate::query::cursor::{ + build_next_cursor, cursor_query_hashes, decode_cursor, SortValue, + }; + use topgun_core::messages::base::{SortDirection, SortField}; + + let mut cluster = SimCluster::new(2, 88); + cluster.start().expect("cluster start"); + + let node_ids = ["sim-node-0", "sim-node-1"]; + let map_name = "emission_test"; + + // Both nodes must see the full membership so execute_distributed engages + // multi-node fan-out (6 records, split evenly across 2 nodes). + for node in &cluster.nodes { + set_membership(node, &node_ids); + } + + // Assign partitions so node-0 and node-1 each own half (even/odd partition IDs). + // Both nodes must hold records so the multi-node fan-out path is exercised. + for node in &cluster.nodes { + assign_partitions(node, &|pid| { + if pid % 2 == 0 { + "sim-node-0".to_string() + } else { + "sim-node-1".to_string() + } + }); + } + + // Write 6 records with scores 10..60 (step 10). + // Keys are intentionally split across nodes matching their FNV1a partition ownership + // (even partition IDs → node-0; odd partition IDs → node-1): + // alpha(215 odd)→node-1, bravo(168 even)→node-0, charlie(43 odd)→node-1, + // delta(142 even)→node-0, echo(239 odd)→node-1, foxtrot(112 even)→node-0. + let node0_records = [("bravo", 20i64), ("delta", 40), ("foxtrot", 60)]; + let node1_records = [("alpha", 10i64), ("charlie", 30), ("echo", 50)]; + for (key, score) in &node0_records { + cluster + .write(0, map_name, key, score_record(*score)) + .await + .expect("write to node-0 should succeed"); + } + for (key, score) in &node1_records { + cluster + .write(1, map_name, key, score_record(*score)) + .await + .expect("write to node-1 should succeed"); + } + + // Bridge connections in both directions so DagExecute/DagComplete flow between + // the coordinator node (node-0) and the peer (node-1). Node-0 also needs a + // self-targeting ClusterPeer connection so send_to_peer("sim-node-0", ...) works. + let bridge_0_to_self = bridge_peer_connection( + &cluster.nodes[0], + "sim-node-0", + cluster.nodes[0].inbound_tx.clone(), + ) + .await; + let bridge_0_to_1 = bridge_peer_connection( + &cluster.nodes[0], + "sim-node-1", + cluster.nodes[1].inbound_tx.clone(), + ) + .await; + let bridge_1_to_0 = bridge_peer_connection( + &cluster.nodes[1], + "sim-node-0", + cluster.nodes[0].inbound_tx.clone(), + ) + .await; + + // Build query: sort by score ASC, limit 3 → should return [10, 20, 30]. + let sort_fields = vec![SortField { + field: "score".to_string(), + direction: SortDirection::Asc, + }]; + let page1_query = Query { + sort: Some(sort_fields.clone()), + limit: Some(3), + ..Default::default() + }; + + // --- Page 1: no cursor --- + let page1 = cluster + .query(0, map_name, page1_query.clone()) + .await + .expect("page 1 query should succeed"); + + assert_eq!(page1.len(), 3, "page 1 must return exactly 3 rows"); + + let page1_scores: Vec = page1 + .iter() + .filter_map(|e| get_int_field(&e.value)) + .collect(); + assert_eq!( + page1_scores, + vec![10, 20, 30], + "page 1 must be globally sorted: [10, 20, 30]" + ); + + // Compute hashes via cursor_query_hashes — the same function used by + // handle_query_subscribe — so the emitted cursor's hashes match. + let (predicate_hash, sort_hash) = cursor_query_hashes(&page1_query); + let sort_values_template: Vec = sort_fields + .iter() + .map(|sf| SortValue { + field: sf.field.clone(), + value: serde_json::Value::Null, + direction: sf.direction.clone(), + }) + .collect(); + + // Build a cursor from the last page-1 row the same way handle_query_subscribe + // would — using build_next_cursor with the correct hashes. + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_or(0, |d| d.as_millis() as i64); + let last = page1.last().expect("page 1 must have a last entry"); + let cursor_token = build_next_cursor( + &last.key, + &last.value, + &sort_values_template, + predicate_hash, + sort_hash, + now_ms, + ); + + // Verify the cursor decodes and carries the expected hashes. + let decoded = decode_cursor(&cursor_token).expect("emitted cursor must decode"); + assert_eq!( + decoded.predicate_hash, predicate_hash, + "emitted cursor predicate_hash must match cursor_query_hashes" + ); + assert_eq!( + decoded.sort_hash, sort_hash, + "emitted cursor sort_hash must match cursor_query_hashes" + ); + + // --- Fault injection: partition then heal before page 2 --- + cluster.inject_partition(&[0], &[1]); + cluster.heal_partition(); + + // --- Page 2: with cursor → should return [40, 50, 60] --- + let page2_query = Query { + sort: Some(sort_fields.clone()), + limit: Some(3), + cursor: Some(cursor_token), + ..Default::default() + }; + + let page2 = cluster + .query(0, map_name, page2_query) + .await + .expect("page 2 query should succeed"); + + assert_eq!(page2.len(), 3, "page 2 must return exactly 3 rows"); + + let page2_scores: Vec = page2 + .iter() + .filter_map(|e| get_int_field(&e.value)) + .collect(); + assert_eq!( + page2_scores, + vec![40, 50, 60], + "page 2 must be globally sorted strictly after cursor: [40, 50, 60]" + ); + + // No overlap between pages. + let page1_set: std::collections::HashSet = page1_scores.iter().copied().collect(); + for score in &page2_scores { + assert!( + !page1_set.contains(score), + "page 2 must not contain first-page score {score}" + ); + } + + // No duplicates within page 2. + let unique_p2: std::collections::HashSet = page2_scores.iter().copied().collect(); + assert_eq!( + unique_p2.len(), + page2_scores.len(), + "page 2 must not have duplicate scores" + ); + + // All page-2 scores are strictly after the cursor position (score > 30). + for score in &page2_scores { + assert!( + *score > 30, + "page 2 row with score {score} is not strictly after cursor (score=30)" + ); + } + + bridge_0_to_self.abort(); + bridge_0_to_1.abort(); + bridge_1_to_0.abort(); + } + + // ----------------------------------------------------------------------- + // Single-node round-trip: emit cursor on page 1, re-derive hashes for page 2, + // assert non-overlapping ordered pages and has_more flips at exhaustion. + // ----------------------------------------------------------------------- + + #[tokio::test] + #[cfg(feature = "simulation")] + async fn single_node_cursor_roundtrip_has_more_flips_at_exhaustion() { + use crate::query::cursor::{ + build_next_cursor, cursor_query_hashes, decode_cursor, SortValue, + }; + use topgun_core::messages::base::{SortDirection, SortField}; + + let mut cluster = SimCluster::new(1, 99); + cluster.start().expect("cluster start"); + + let node_ids = ["sim-node-0"]; + let map_name = "roundtrip_test"; + + set_membership(&cluster.nodes[0], &node_ids); + assign_partitions(&cluster.nodes[0], &|_| "sim-node-0".to_string()); + + // Write 5 records with scores 10..50 (step 10). + for (key, score) in [ + ("r-a", 10i64), + ("r-b", 20i64), + ("r-c", 30i64), + ("r-d", 40i64), + ("r-e", 50i64), + ] { + cluster + .write(0, map_name, key, score_record(score)) + .await + .expect("write should succeed"); + } + + let sort_fields = vec![SortField { + field: "score".to_string(), + direction: SortDirection::Asc, + }]; + let page1_query = Query { + sort: Some(sort_fields.clone()), + limit: Some(3), + ..Default::default() + }; + + // --- Page 1: limit=3 → [10, 20, 30], has_more (5 records > 3) --- + let page1_dist = cluster.nodes[0] + .coordinator + .execute_distributed(&page1_query, map_name) + .await + .expect("page 1 should succeed"); + + assert_eq!(page1_dist.rows.len(), 3, "page 1 must have 3 rows"); + assert!( + page1_dist.has_more, + "page 1 has_more must be true (5 records, limit=3)" + ); + + let page1_scores: Vec = page1_dist.rows.iter().filter_map(get_score).collect(); + assert_eq!( + page1_scores, + vec![10, 20, 30], + "page 1 must be [10, 20, 30]" + ); + + // Derive hashes for the follow-up query using cursor_query_hashes — same + // function the emission path calls — asserting structural hash-match (AC4). + let (predicate_hash, sort_hash) = cursor_query_hashes(&page1_query); + let sort_values_template: Vec = sort_fields + .iter() + .map(|sf| SortValue { + field: sf.field.clone(), + value: serde_json::Value::Null, + direction: sf.direction.clone(), + }) + .collect(); + + // Also derive hashes for a page-2 query (same sort, no predicate) and assert + // they match — this is the structural guarantee of cursor_query_hashes (AC4). + let page2_query_for_hash = Query { + sort: Some(sort_fields.clone()), + limit: Some(3), + ..Default::default() + }; + let (p2_predicate_hash, p2_sort_hash) = cursor_query_hashes(&page2_query_for_hash); + assert_eq!( + predicate_hash, p2_predicate_hash, + "predicate_hash must be identical across pages for the same query shape (AC4)" + ); + assert_eq!( + sort_hash, p2_sort_hash, + "sort_hash must be identical across pages for the same query shape (AC4)" + ); + + // Build cursor from the last entry of page 1. + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_or(0, |d| d.as_millis() as i64); + let last_row = page1_dist.rows.last().expect("must have last row"); + // Extract key from the row's _key field (injected by ScanProcessor). + let last_key = if let rmpv::Value::Map(ref pairs) = last_row { + pairs.iter().find_map(|(k, v)| { + if k.as_str() == Some("_key") { + v.as_str().map(str::to_string) + } else { + None + } + }) + } else { + None + } + .unwrap_or_else(|| "r-c".to_string()); // fallback: last record at score=30 is r-c + + let cursor_token = build_next_cursor( + &last_key, + last_row, + &sort_values_template, + predicate_hash, + sort_hash, + now_ms, + ); + + // Verify the cursor encodes the correct hashes. + let decoded = decode_cursor(&cursor_token).expect("cursor must decode"); + assert_eq!( + decoded.predicate_hash, predicate_hash, + "cursor predicate_hash matches" + ); + assert_eq!(decoded.sort_hash, sort_hash, "cursor sort_hash matches"); + + // --- Page 2: with cursor, limit=3 → [40, 50], has_more = false (exhausted) --- + let page2_query = Query { + sort: Some(sort_fields.clone()), + limit: Some(3), + cursor: Some(cursor_token), + ..Default::default() + }; + + let page2_dist = cluster.nodes[0] + .coordinator + .execute_distributed(&page2_query, map_name) + .await + .expect("page 2 should succeed"); + + // Page 2 has 2 remaining records (40, 50), well under limit=3 → has_more false. + assert!(page2_dist.rows.len() <= 3, "page 2 must not exceed limit"); + assert!( + !page2_dist.has_more, + "page 2 has_more must be false at exhaustion" + ); + + let page2_scores: Vec = page2_dist.rows.iter().filter_map(get_score).collect(); + + // All page-2 scores must come strictly after the cursor (score > 30). + for score in &page2_scores { + assert!( + *score > 30, + "page 2 score {score} must be strictly after cursor at score=30" + ); + } + + // No overlap between pages. + let p1_set: std::collections::HashSet = page1_scores.iter().copied().collect(); + for score in &page2_scores { + assert!( + !p1_set.contains(score), + "page 2 must not duplicate page 1 score {score}" + ); + } + + // Combined pages must cover all 5 records exactly once. + let mut all_scores: Vec = page1_scores + .iter() + .chain(page2_scores.iter()) + .copied() + .collect(); + all_scores.sort_unstable(); + assert_eq!( + all_scores, + vec![10, 20, 30, 40, 50], + "combined pages must contain all 5 records exactly once" + ); + } + /// Extracts the `score` field as i64 from a raw `rmpv::Value` DAG row. fn get_score(val: &rmpv::Value) -> Option { if let rmpv::Value::Map(pairs) = val {