Skip to content

Commit 10cd5d9

Browse files
Add pipelining option to distributed keynote benchmark
1 parent b98c68c commit 10cd5d9

7 files changed

Lines changed: 117 additions & 20 deletions

File tree

templates/keynote-2/DEVELOP.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,8 @@ Notes:
264264

265265
- `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS.
266266
- `--window-seconds` is the measured interval.
267+
- `--pipelined 1` enables request pipelining. Omit it or pass `--pipelined 0` to stay in closed-loop mode, one request at a time.
268+
- `--max-inflight-per-connection` caps the number of in-flight requests each connection may have when pipelining is enabled. The default is `8`.
267269
- `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes.
268270
- The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`.
269271
- For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator.
@@ -351,6 +353,7 @@ The result contains:
351353

352354
- participating generator IDs
353355
- total participating connections
356+
- whether the epoch used closed-loop or pipelined load, and the per-connection in-flight cap
354357
- committed transaction delta from the server metrics endpoint
355358
- measured window duration
356359
- computed TPS
@@ -361,7 +364,7 @@ The result contains:
361364
- Start the coordinator before the generators.
362365
- Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins.
363366
- Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded.
364-
- For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow.
367+
- Distributed TypeScript mode defaults to closed-loop, one request at a time per connection. Enable pipelining on the coordinator with `--pipelined 1`, and all generators will follow that setting for the epoch.
365368
- Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch.
366369
- The coordinator does not use heartbeats. It includes generators that most recently reported `ready`.
367370
- If a participating generator dies and never sends `/stopped`, the epoch result is written with an `error`, and that generator remains `running` in coordinator status until you restart it and let it register again.

templates/keynote-2/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ This architectural difference means SpacetimeDB can execute transactions in micr
149149
### Client Pipelining
150150

151151
The benchmark supports **pipelining** for all clients - sending multiple requests without waiting for responses. This maximizes throughput by keeping connections saturated.
152+
The distributed TypeScript SpacetimeDB flow also supports coordinator-controlled pipelining via `bench-dist-coordinator -- --pipelined 1`, with `--max-inflight-per-connection` to cap outstanding requests per connection.
152153

153154
### Confirmed Reads (`withConfirmedReads`)
154155

templates/keynote-2/src/distributed/control.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ function printState(state: CoordinatorState): void {
1818
`phase=${state.phase} epoch=${state.currentEpoch ?? '-'} label=${state.currentLabel ?? '-'}`,
1919
);
2020
console.log(
21-
`test=${state.test} connector=${state.connector} participants=${state.participants.length}`,
21+
`test=${state.test} connector=${state.connector} participants=${state.participants.length} mode=${state.loadOptions.pipelined ? `pipelined/${state.loadOptions.maxInflightPerConnection}` : 'closed-loop'}`,
2222
);
2323

2424
if (state.generators.length === 0) {
@@ -35,7 +35,7 @@ function printState(state: CoordinatorState): void {
3535
if (state.lastResult) {
3636
console.log('last_result:');
3737
console.log(
38-
` epoch=${state.lastResult.epoch} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
38+
` epoch=${state.lastResult.epoch} mode=${state.lastResult.loadOptions.pipelined ? `pipelined/${state.lastResult.loadOptions.maxInflightPerConnection}` : 'closed-loop'} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
3939
);
4040
}
4141
}

templates/keynote-2/src/distributed/coordinator.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
import type {
2121
CoordinatorPhase,
2222
CoordinatorState,
23+
DistributedLoadOptions,
2324
EpochResult,
2425
GeneratorLocalState,
2526
GeneratorSnapshot,
@@ -103,6 +104,7 @@ async function runVerification(url: string, moduleName: string): Promise<void> {
103104
class DistributedCoordinator {
104105
private readonly testName: string;
105106
private readonly connectorName: string;
107+
private readonly loadOptions: DistributedLoadOptions;
106108
private readonly warmupMs: number;
107109
private readonly windowMs: number;
108110
private readonly verifyAfterEpoch: boolean;
@@ -121,6 +123,7 @@ class DistributedCoordinator {
121123
constructor(opts: {
122124
testName: string;
123125
connectorName: string;
126+
loadOptions: DistributedLoadOptions;
124127
warmupMs: number;
125128
windowMs: number;
126129
verifyAfterEpoch: boolean;
@@ -131,6 +134,7 @@ class DistributedCoordinator {
131134
}) {
132135
this.testName = opts.testName;
133136
this.connectorName = opts.connectorName;
137+
this.loadOptions = opts.loadOptions;
134138
this.warmupMs = opts.warmupMs;
135139
this.windowMs = opts.windowMs;
136140
this.verifyAfterEpoch = opts.verifyAfterEpoch;
@@ -159,6 +163,7 @@ class DistributedCoordinator {
159163
participants: this.currentEpoch?.participantIds ?? [],
160164
test: this.testName,
161165
connector: this.connectorName,
166+
loadOptions: this.loadOptions,
162167
generators,
163168
lastResult: this.lastResult,
164169
};
@@ -365,6 +370,7 @@ class DistributedCoordinator {
365370
label: activeEpoch.label,
366371
test: this.testName,
367372
connector: this.connectorName,
373+
loadOptions: this.loadOptions,
368374
warmupSeconds: this.warmupMs / 1000,
369375
windowSeconds: this.windowMs / 1000,
370376
actualWindowSeconds,
@@ -438,11 +444,23 @@ async function main(): Promise<void> {
438444
const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir);
439445
const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15);
440446
const windowSeconds = getNumberFlag(flags, 'window-seconds', 60);
447+
const pipelined = getBoolFlag(flags, 'pipelined', false);
448+
const maxInflightPerConnection = getNumberFlag(
449+
flags,
450+
'max-inflight-per-connection',
451+
8,
452+
);
441453
const stopAckTimeoutSeconds = getNumberFlag(
442454
flags,
443455
'stop-ack-timeout-seconds',
444456
60,
445457
);
458+
if (
459+
!Number.isInteger(maxInflightPerConnection) ||
460+
maxInflightPerConnection < 1
461+
) {
462+
throw new Error('--max-inflight-per-connection must be an integer >= 1');
463+
}
446464
const verifyAfterEpoch = getBoolFlag(flags, 'verify', false);
447465
const rawStdbUrl = getStringFlag(
448466
flags,
@@ -460,6 +478,10 @@ async function main(): Promise<void> {
460478
const coordinator = new DistributedCoordinator({
461479
testName,
462480
connectorName,
481+
loadOptions: {
482+
pipelined,
483+
maxInflightPerConnection,
484+
},
463485
warmupMs: warmupSeconds * 1000,
464486
windowMs: windowSeconds * 1000,
465487
verifyAfterEpoch,
@@ -530,7 +552,7 @@ async function main(): Promise<void> {
530552
});
531553

532554
console.log(
533-
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
555+
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
534556
);
535557
}
536558

templates/keynote-2/src/distributed/generator.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,11 @@ async function main(): Promise<void> {
149149
state.currentEpoch != null &&
150150
state.participants.includes(id)
151151
) {
152-
console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`);
153-
await session.startEpoch(state.currentEpoch);
152+
const { pipelined, maxInflightPerConnection } = state.loadOptions;
153+
console.log(
154+
`[generator ${id}] starting epoch ${state.currentEpoch} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'}`,
155+
);
156+
await session.startEpoch(state.currentEpoch, state.loadOptions);
154157
activeEpoch = state.currentEpoch;
155158
}
156159
} else if (!shouldKeepRunning) {

templates/keynote-2/src/distributed/loadSession.ts

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { ReducerConnector } from '../core/connectors.ts';
22
import { performance } from 'node:perf_hooks';
33
import { pickTwoDistinct, zipfSampler } from '../core/zipf.ts';
4+
import type { DistributedLoadOptions } from './protocol.ts';
45

56
const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000');
67
const DEFAULT_PRECOMPUTED_TRANSFER_PAIRS = 10_000_000;
@@ -67,6 +68,7 @@ type RunState = {
6768
epoch: number;
6869
stopRequested: boolean;
6970
workerPromises: Promise<void>[];
71+
loadOptions: DistributedLoadOptions;
7072
};
7173

7274
export class LoadSession {
@@ -137,7 +139,7 @@ export class LoadSession {
137139
}
138140
}
139141

140-
async startEpoch(epoch: number): Promise<void> {
142+
async startEpoch(epoch: number, loadOptions: DistributedLoadOptions): Promise<void> {
141143
if (this.openedConnections !== this.concurrency) {
142144
throw new Error(
143145
`Cannot start epoch ${epoch}: expected ${this.concurrency} open connections, got ${this.openedConnections}`,
@@ -153,9 +155,15 @@ export class LoadSession {
153155
epoch,
154156
stopRequested: false,
155157
workerPromises: [],
158+
loadOptions,
156159
};
157160
this.runState = runState;
158161

162+
const mode = loadOptions.pipelined
163+
? `pipelined max_inflight=${loadOptions.maxInflightPerConnection}`
164+
: 'closed-loop';
165+
console.log(`[distributed] starting epoch ${epoch} in ${mode} mode`);
166+
159167
runState.workerPromises = this.conns.map((conn, workerIndex) => {
160168
if (!conn) {
161169
throw new Error(`Connection ${workerIndex} not open`);
@@ -213,13 +221,23 @@ export class LoadSession {
213221
workerIndex: number,
214222
runState: RunState,
215223
): Promise<void> {
224+
const nextTransferPair = this.makeTransferPairPicker(workerIndex);
225+
if (!runState.loadOptions.pipelined) {
226+
await this.closedLoopWorker(conn, workerIndex, runState, nextTransferPair);
227+
return;
228+
}
229+
230+
await this.pipelinedWorker(conn, workerIndex, runState, nextTransferPair);
231+
}
232+
233+
private makeTransferPairPicker(workerIndex: number): () => [number, number] {
216234
const pairsPerWorker = Math.max(
217235
1,
218236
Math.floor(this.pairs.count / this.concurrency),
219237
);
220238
let pairIndex = workerIndex * pairsPerWorker;
221239

222-
const nextTransferPair = (): [number, number] => {
240+
return (): [number, number] => {
223241
if (pairIndex >= this.pairs.count) {
224242
pairIndex = 0;
225243
}
@@ -229,22 +247,65 @@ export class LoadSession {
229247
pairIndex++;
230248
return [from, to];
231249
};
250+
}
232251

252+
private async closedLoopWorker(
253+
conn: ReducerConnector,
254+
workerIndex: number,
255+
runState: RunState,
256+
nextTransferPair: () => [number, number],
257+
): Promise<void> {
233258
while (!runState.stopRequested) {
234-
const [from, to] = nextTransferPair();
259+
await this.runTransfer(conn, workerIndex, nextTransferPair);
260+
}
261+
}
235262

236-
try {
237-
await withOpTimeout(
238-
this.scenario(conn, from, to, 1),
239-
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
263+
private async pipelinedWorker(
264+
conn: ReducerConnector,
265+
workerIndex: number,
266+
runState: RunState,
267+
nextTransferPair: () => [number, number],
268+
): Promise<void> {
269+
const maxInflight = runState.loadOptions.maxInflightPerConnection;
270+
const inflight = new Set<Promise<void>>();
271+
272+
const launchTransfer = () => {
273+
const transfer = this.runTransfer(conn, workerIndex, nextTransferPair);
274+
inflight.add(transfer);
275+
transfer.finally(() => {
276+
inflight.delete(transfer);
277+
});
278+
};
279+
280+
while (!runState.stopRequested) {
281+
if (inflight.size < maxInflight) {
282+
launchTransfer();
283+
} else {
284+
await Promise.race(inflight);
285+
}
286+
}
287+
288+
await Promise.all(inflight);
289+
}
290+
291+
private async runTransfer(
292+
conn: ReducerConnector,
293+
workerIndex: number,
294+
nextTransferPair: () => [number, number],
295+
): Promise<void> {
296+
const [from, to] = nextTransferPair();
297+
298+
try {
299+
await withOpTimeout(
300+
this.scenario(conn, from, to, 1),
301+
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
302+
);
303+
} catch (err) {
304+
if (process.env.LOG_ERRORS === '1') {
305+
const msg = err instanceof Error ? err.message : String(err);
306+
console.warn(
307+
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
240308
);
241-
} catch (err) {
242-
if (process.env.LOG_ERRORS === '1') {
243-
const msg = err instanceof Error ? err.message : String(err);
244-
console.warn(
245-
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
246-
);
247-
}
248309
}
249310
}
250311
}

templates/keynote-2/src/distributed/protocol.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ export type GeneratorLocalState = 'registered' | 'ready' | 'running';
22

33
export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop';
44

5+
export type DistributedLoadOptions = {
6+
pipelined: boolean;
7+
maxInflightPerConnection: number;
8+
};
9+
510
export type GeneratorSnapshot = {
611
id: string;
712
hostname: string;
@@ -16,6 +21,7 @@ export type EpochResult = {
1621
label: string | null;
1722
test: string;
1823
connector: string;
24+
loadOptions: DistributedLoadOptions;
1925
warmupSeconds: number;
2026
windowSeconds: number;
2127
actualWindowSeconds: number;
@@ -39,6 +45,7 @@ export type CoordinatorState = {
3945
participants: string[];
4046
test: string;
4147
connector: string;
48+
loadOptions: DistributedLoadOptions;
4249
generators: GeneratorSnapshot[];
4350
lastResult: EpochResult | null;
4451
};

0 commit comments

Comments
 (0)