Skip to content

Commit 173cc01

Browse files
authored
Fix cursor pagination with optional tie-breaker (#2080)
Fixes OPS-3810. ## Additional Notes We create benchmark workflows in bulk, so many rows share the same updated timestamp. With cursor pagination, page boundaries previously filtered using only updated < cursor, which caused rows with the same timestamp to be skipped across pages. This PR adds support for an optional secondary pagination key (used as a secondary sort key) to break ties when primary timestamps are identical.
1 parent 6267adc commit 173cc01

4 files changed

Lines changed: 373 additions & 69 deletions

File tree

packages/server/api/src/app/flows/flow/flow.service.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ export const flowService = {
145145
columnName: 'fv.updated',
146146
columnType: 'timestamp with time zone',
147147
},
148+
customPaginationSecondaryColumn: {
149+
columnPath: 'id',
150+
columnName: 'flow.id',
151+
columnType: 'string',
152+
},
148153
});
149154

150155
const queryWhere: Record<string, unknown> = {

packages/server/api/src/app/helper/pagination/build-paginator.ts

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,28 @@ export type PagingQuery = {
88
order?: Order | 'ASC' | 'DESC';
99
};
1010

11+
type CustomPaginationColumnOptions = {
12+
columnPath: string;
13+
columnName: string;
14+
columnType?: string;
15+
};
16+
17+
// Secondary custom pagination is only valid when primary custom pagination is configured.
18+
type CustomPaginationColumns =
19+
| {
20+
customPaginationColumn?: never;
21+
customPaginationSecondaryColumn?: never;
22+
}
23+
| {
24+
customPaginationColumn: CustomPaginationColumnOptions;
25+
customPaginationSecondaryColumn?: CustomPaginationColumnOptions;
26+
};
27+
1128
export type PaginationOptions<Entity> = {
1229
entity: EntitySchema<Entity>;
1330
alias?: string;
1431
query?: PagingQuery;
15-
customPaginationColumn?: {
16-
columnPath: string;
17-
columnName: string;
18-
columnType?: string;
19-
};
20-
};
32+
} & CustomPaginationColumns;
2133

2234
export function buildPaginator<Entity extends ObjectLiteral>(
2335
options: PaginationOptions<Entity>,
@@ -32,6 +44,15 @@ export function buildPaginator<Entity extends ObjectLiteral>(
3244

3345
paginator.setAlias(alias);
3446

47+
if (
48+
options.customPaginationSecondaryColumn &&
49+
!options.customPaginationColumn
50+
) {
51+
throw new Error(
52+
'customPaginationSecondaryColumn requires customPaginationColumn',
53+
);
54+
}
55+
3556
if (options.customPaginationColumn) {
3657
paginator.setPaginationColumn(
3758
options.customPaginationColumn.columnPath,
@@ -40,6 +61,14 @@ export function buildPaginator<Entity extends ObjectLiteral>(
4061
);
4162
}
4263

64+
if (options.customPaginationSecondaryColumn) {
65+
paginator.setPaginationSecondaryColumn(
66+
options.customPaginationSecondaryColumn.columnPath,
67+
options.customPaginationSecondaryColumn.columnName,
68+
options.customPaginationSecondaryColumn.columnType,
69+
);
70+
}
71+
4372
if (query.afterCursor) {
4473
paginator.setAfterCursor(query.afterCursor);
4574
}

packages/server/api/src/app/helper/pagination/paginator.ts

Lines changed: 221 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,16 @@ export type PagingResult<Entity> = {
3131
cursor: CursorResult;
3232
};
3333

34+
type CursorContext = {
35+
primaryColumnName: string;
36+
primaryParamName: string;
37+
secondaryColumnName: string | null;
38+
secondaryParamName: string | null;
39+
};
40+
3441
const PAGINATION_KEY = 'created';
3542
const CUSTOM_PAGINATION_KEY = 'custom_pagination';
43+
const CUSTOM_PAGINATION_SECONDARY_KEY = 'custom_pagination_tie_breaker';
3644
const DEFAULT_TIMESTAMP_TYPE = 'timestamp with time zone';
3745

3846
export default class Paginator<Entity extends ObjectLiteral> {
@@ -56,6 +64,12 @@ export default class Paginator<Entity extends ObjectLiteral> {
5664

5765
private paginationColumnType: string | null = null;
5866

67+
private paginationSecondaryColumnPath: string | null = null;
68+
69+
private paginationSecondaryColumnName: string | null = null;
70+
71+
private paginationSecondaryColumnType: string | null = null;
72+
5973
public constructor(private readonly entity: EntitySchema) {}
6074

6175
public setPaginationColumn(
@@ -72,6 +86,16 @@ export default class Paginator<Entity extends ObjectLiteral> {
7286
this.alias = alias;
7387
}
7488

89+
public setPaginationSecondaryColumn(
90+
columnPath: string,
91+
columnName: string,
92+
columnType = 'string',
93+
): void {
94+
this.paginationSecondaryColumnPath = columnPath;
95+
this.paginationSecondaryColumnName = columnName;
96+
this.paginationSecondaryColumnType = columnType;
97+
}
98+
7599
public setAfterCursor(cursor: string): void {
76100
this.afterCursor = cursor;
77101
}
@@ -171,28 +195,26 @@ export default class Paginator<Entity extends ObjectLiteral> {
171195
): void {
172196
const dbType = system.get(AppSystemProp.DB_TYPE);
173197
const operator = this.getOperator();
174-
let queryString: string;
175-
176-
const isCustomColumn =
177-
this.paginationColumnName && cursors[CUSTOM_PAGINATION_KEY];
178-
const columnName = isCustomColumn
179-
? this.paginationColumnName
180-
: `${this.alias}.${PAGINATION_KEY}`;
181-
const paramName = isCustomColumn ? CUSTOM_PAGINATION_KEY : PAGINATION_KEY;
182-
183-
if (dbType === DatabaseType.SQLITE3) {
184-
queryString = `${columnName} ${operator} :${paramName}`;
185-
} else if (dbType === DatabaseType.POSTGRES) {
186-
if (this.hasBeforeCursor() && !this.hasAfterCursor()) {
187-
queryString = `${columnName} ${operator} (:${paramName}::timestamp + INTERVAL '1 millisecond')`;
188-
} else {
189-
queryString = `${columnName} ${operator} :${paramName}::timestamp`;
190-
}
191-
} else {
192-
throw new Error('Unsupported database type');
198+
const context = this.resolveCursorContext(cursors);
199+
200+
if (context.secondaryColumnName && context.secondaryParamName) {
201+
this.applyCompositeCursorFilter(
202+
where,
203+
cursors,
204+
dbType,
205+
operator,
206+
context,
207+
);
208+
return;
193209
}
194210

195-
where.orWhere(queryString, cursors);
211+
this.applySingleColumnCursorFilter(
212+
where,
213+
cursors,
214+
dbType,
215+
operator,
216+
context,
217+
);
196218
}
197219

198220
private getOperator(): string {
@@ -222,6 +244,10 @@ export default class Paginator<Entity extends ObjectLiteral> {
222244
orderByCondition[`${this.alias}.${PAGINATION_KEY}`] = order;
223245
}
224246

247+
if (this.paginationColumnName && this.paginationSecondaryColumnName) {
248+
orderByCondition[this.paginationSecondaryColumnName] = order;
249+
}
250+
225251
return orderByCondition;
226252
}
227253

@@ -257,9 +283,31 @@ export default class Paginator<Entity extends ObjectLiteral> {
257283
this.paginationColumnType || DEFAULT_TIMESTAMP_TYPE,
258284
value,
259285
);
260-
const payload = `${CUSTOM_PAGINATION_KEY}:${encodedValue}`;
286+
const payload = [`${CUSTOM_PAGINATION_KEY}:${encodedValue}`];
261287

262-
return btoa(payload);
288+
if (
289+
this.paginationSecondaryColumnPath &&
290+
this.paginationSecondaryColumnName
291+
) {
292+
const secondaryValue = getValueByPath(
293+
entity,
294+
this.paginationSecondaryColumnPath,
295+
);
296+
if (secondaryValue === null || secondaryValue === undefined) {
297+
throw new Error(
298+
`Pagination secondary column not found at path: ${this.paginationSecondaryColumnPath}`,
299+
);
300+
}
301+
const encodedSecondaryValue = encodeByType(
302+
this.paginationSecondaryColumnType || 'string',
303+
secondaryValue,
304+
);
305+
payload.push(
306+
`${CUSTOM_PAGINATION_SECONDARY_KEY}:${encodedSecondaryValue}`,
307+
);
308+
}
309+
310+
return btoa(payload.join(','));
263311
}
264312

265313
private decode(cursor: string): CursorParam {
@@ -279,6 +327,9 @@ export default class Paginator<Entity extends ObjectLiteral> {
279327
if (key === CUSTOM_PAGINATION_KEY) {
280328
return this.paginationColumnType || DEFAULT_TIMESTAMP_TYPE;
281329
}
330+
if (key === CUSTOM_PAGINATION_SECONDARY_KEY) {
331+
return this.paginationSecondaryColumnType || 'string';
332+
}
282333

283334
const col = this.entity.options.columns[key];
284335
if (col === undefined) {
@@ -291,6 +342,154 @@ export default class Paginator<Entity extends ObjectLiteral> {
291342
return order === Order.ASC ? Order.DESC : Order.ASC;
292343
}
293344

345+
private buildComparisonClause({
346+
dbType,
347+
columnName,
348+
paramName,
349+
operator,
350+
}: {
351+
dbType: string | undefined;
352+
columnName: string;
353+
paramName: string;
354+
operator: string;
355+
}): string {
356+
if (dbType === DatabaseType.SQLITE3) {
357+
return `${columnName} ${operator} :${paramName}`;
358+
}
359+
360+
if (dbType === DatabaseType.POSTGRES) {
361+
const type = this.getEntityPropertyType(paramName);
362+
if (this.isTimestampType(type)) {
363+
if (operator === '<') {
364+
return `${columnName} < :${paramName}::timestamptz`;
365+
}
366+
if (operator === '>') {
367+
return `${columnName} >= (:${paramName}::timestamptz + INTERVAL '1 millisecond')`;
368+
}
369+
if (operator === '=') {
370+
return `(${columnName} >= :${paramName}::timestamptz AND ${columnName} < (:${paramName}::timestamptz + INTERVAL '1 millisecond'))`;
371+
}
372+
return `${columnName} ${operator} :${paramName}::timestamptz`;
373+
}
374+
return `${columnName} ${operator} :${paramName}`;
375+
}
376+
377+
throw new Error('Unsupported database type');
378+
}
379+
380+
private isTimestampType(type: string): boolean {
381+
return (
382+
type === 'timestamp with time zone' ||
383+
type === 'datetime' ||
384+
type === 'date'
385+
);
386+
}
387+
388+
private resolveCursorContext(cursors: CursorParam): CursorContext {
389+
const customPaginationColumnName = this.paginationColumnName;
390+
const hasCustomPaginationCursor =
391+
customPaginationColumnName !== null &&
392+
cursors[CUSTOM_PAGINATION_KEY] !== undefined;
393+
394+
const primaryColumnName =
395+
hasCustomPaginationCursor && customPaginationColumnName
396+
? customPaginationColumnName
397+
: `${this.alias}.${PAGINATION_KEY}`;
398+
const primaryParamName = hasCustomPaginationCursor
399+
? CUSTOM_PAGINATION_KEY
400+
: PAGINATION_KEY;
401+
402+
const hasCustomSecondaryCursor =
403+
this.paginationSecondaryColumnName !== null &&
404+
cursors[CUSTOM_PAGINATION_SECONDARY_KEY] !== undefined;
405+
406+
if (hasCustomPaginationCursor && hasCustomSecondaryCursor) {
407+
return {
408+
primaryColumnName,
409+
primaryParamName,
410+
secondaryColumnName: this.paginationSecondaryColumnName,
411+
secondaryParamName: CUSTOM_PAGINATION_SECONDARY_KEY,
412+
};
413+
}
414+
415+
return {
416+
primaryColumnName,
417+
primaryParamName,
418+
secondaryColumnName: null,
419+
secondaryParamName: null,
420+
};
421+
}
422+
423+
private applySingleColumnCursorFilter(
424+
where: WhereExpressionBuilder,
425+
cursors: CursorParam,
426+
dbType: string | undefined,
427+
operator: string,
428+
context: CursorContext,
429+
): void {
430+
where.orWhere(
431+
this.buildComparisonClause({
432+
dbType,
433+
columnName: context.primaryColumnName,
434+
paramName: context.primaryParamName,
435+
operator,
436+
}),
437+
cursors,
438+
);
439+
}
440+
441+
private applyCompositeCursorFilter(
442+
where: WhereExpressionBuilder,
443+
cursors: CursorParam,
444+
dbType: string | undefined,
445+
operator: string,
446+
context: CursorContext,
447+
): void {
448+
const {
449+
primaryColumnName,
450+
primaryParamName,
451+
secondaryColumnName,
452+
secondaryParamName,
453+
} = context;
454+
if (!secondaryColumnName || !secondaryParamName) {
455+
throw new Error('Pagination secondary context is not configured');
456+
}
457+
458+
where.orWhere(
459+
this.buildComparisonClause({
460+
dbType,
461+
columnName: primaryColumnName,
462+
paramName: primaryParamName,
463+
operator,
464+
}),
465+
cursors,
466+
);
467+
468+
// Lexicographic cursor compare: primary equals, then compare secondary key.
469+
where.orWhere(
470+
new Brackets((nestedWhere) => {
471+
nestedWhere.where(
472+
this.buildComparisonClause({
473+
dbType,
474+
columnName: primaryColumnName,
475+
paramName: primaryParamName,
476+
operator: '=',
477+
}),
478+
cursors,
479+
);
480+
nestedWhere.andWhere(
481+
this.buildComparisonClause({
482+
dbType,
483+
columnName: secondaryColumnName,
484+
paramName: secondaryParamName,
485+
operator,
486+
}),
487+
cursors,
488+
);
489+
}),
490+
);
491+
}
492+
294493
private toPagingResult<Entity>(entities: Entity[]): PagingResult<Entity> {
295494
return {
296495
data: entities,

0 commit comments

Comments
 (0)