From 39f6ecf1fb52e28580e4239369b7823f1a6e50eb Mon Sep 17 00:00:00 2001 From: SamTV12345 <40429738+samtv12345@users.noreply.github.com> Date: Sat, 30 May 2026 15:19:12 +0200 Subject: [PATCH] perf(postgres): batch doBulk + named prepared statements - doBulk collapses N round-trips into one on the native-upsert path via a single multi-row INSERT ... VALUES ($1,$2),($3,$4),... ON CONFLICT DO UPDATE. The function-based fallback (old PG / CockroachDB without native upsert) is preserved for single-row and multi-row cases. - Named prepared statements on the three hottest single-row queries (get, set, remove) so PG parses + plans them once per connection. Split out of #994 to keep one backend per PR (AGENTS.md). Co-Authored-By: Claude Opus 4.8 (1M context) --- databases/postgres_db.ts | 112 +++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 40 deletions(-) diff --git a/databases/postgres_db.ts b/databases/postgres_db.ts index 91c9aa2c..9d66fd1e 100644 --- a/databases/postgres_db.ts +++ b/databases/postgres_db.ts @@ -115,15 +115,18 @@ export default class extends AbstractDatabase { } get(key: string, callback: (err: Error | null, value: any) => {}) { - this.db.query("SELECT value FROM store WHERE key=$1", [key], (err, results) => { - let value = null; + this.db.query( + { name: "ueberdb_get", text: "SELECT value FROM store WHERE key=$1", values: [key] }, + (err, results) => { + let value = null; - if (!err && results.rows.length === 1) { - value = results.rows[0].value; - } + if (!err && results.rows.length === 1) { + value = results.rows[0].value; + } - callback(err, value); - }); + callback(err, value); + }, + ); } findKeys(key: string, notKey: string, callback: (err: Error | null, value: any) => {}) { @@ -188,53 +191,82 @@ export default class extends AbstractDatabase { const val = "" as any; callback(Error("Your Key can only be 100 chars"), val); } else if (this.upsertStatement != null) { - this.db.query(this.upsertStatement, [key, value], callback); + const name = this.upsertStatement.startsWith("INSERT INTO store(key, value) VALUES") + ? "ueberdb_set_native" + : "ueberdb_set_function"; + this.db.query({ name, text: this.upsertStatement, values: [key, value] }, callback); + } else { + // upsertStatement is only unset before init() has finished detecting the upsert + // method. Fail fast: the CacheAndBufferLayer promisifies this callback and awaits it, + // so silently returning here would leave that promise pending forever. + callback(Error("PostgreSQL driver not initialised: call init() before set()"), null as any); } } remove(key: string, callback: () => {}) { - this.db.query("DELETE FROM store WHERE key=$1", [key], callback); + this.db.query( + { name: "ueberdb_remove", text: "DELETE FROM store WHERE key=$1", values: [key] }, + callback, + ); } - doBulk(bulk: BulkObject[], callback: () => {}) { - const replaceVALs = []; - let removeSQL = "DELETE FROM store WHERE key IN ("; - const removeVALs: string[] = []; - - let removeCount = 0; - - for (const i in bulk) { - if (bulk[i].type === "set") { - replaceVALs.push([bulk[i].key, bulk[i].value]); - } else if (bulk[i].type === "remove") { - if (removeCount !== 0) removeSQL += ","; - removeCount += 1; - - removeSQL += `$${removeCount}`; - removeVALs.push(bulk[i].key); - } - } - - removeSQL += ");"; - + doBulk(bulk: BulkObject[], callback: (err?: Error | null) => void) { if (!this.upsertStatement) { + // See set(): never return without settling the callback, or the promisified + // wrapper in CacheAndBufferLayer hangs. + callback(Error("PostgreSQL driver not initialised: call init() before doBulk()")); return; } - const functions: any = replaceVALs.map( - (v) => (cb: () => {}) => this.db.query(this.upsertStatement as string, v as string[], cb), - ); + const setOps: Array<[string, string]> = []; + const removeKeys: string[] = []; + + for (const op of bulk) { + if (op.type === "set") setOps.push([op.key, op.value!]); + else if (op.type === "remove") removeKeys.push(op.key); + } - const removeFunction = (callback: () => {}) => { - if (!(removeVALs.length < 1)) { - this.db.query(removeSQL, removeVALs, callback); + const isNativeUpsert = this.upsertStatement.startsWith("INSERT INTO store(key, value) VALUES"); + // async.parallel expects (err?: Error | null) on its callbacks; pg.query callbacks supply + // (err: Error). Wrap each query so the error type assigns cleanly without an `any` cast. + type AsyncTaskCb = (err?: Error | null) => void; + const tasks: Array<(cb: AsyncTaskCb) => void> = []; + + if (setOps.length > 0) { + if (isNativeUpsert && setOps.length > 1) { + // Build a single multi-row VALUES list with positional params. + const valuesSql: string[] = []; + const params: string[] = []; + let i = 1; + for (const [k, v] of setOps) { + valuesSql.push(`($${i++},$${i++})`); + params.push(k, v); + } + const sql = + `INSERT INTO store(key, value) VALUES ${valuesSql.join(",")} ` + + `ON CONFLICT (key) DO UPDATE SET value = excluded.value`; + tasks.push((cb) => { + this.db.query(sql, params, (err) => cb(err)); + }); } else { - callback(); + // Fallback: per-row via the existing upsertStatement (function-based, or single-row native). + for (const [k, v] of setOps) { + tasks.push((cb) => { + this.db.query(this.upsertStatement as string, [k, v], (err) => cb(err)); + }); + } } - }; - functions.push(removeFunction); + } + + if (removeKeys.length > 0) { + const placeholders = removeKeys.map((_, idx) => `$${idx + 1}`).join(","); + const sql = `DELETE FROM store WHERE key IN (${placeholders})`; + tasks.push((cb) => { + this.db.query(sql, removeKeys, (err) => cb(err)); + }); + } - async.parallel(functions, callback); + async.parallel(tasks, callback); } close(callback: () => {}) {