Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions apps/webapp/app/db.server.ts
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Prisma $on('query') events may not fire with client engine + driver adapter

Both the primary and replica clients register $on('query', ...) handlers for query performance monitoring (queryPerformanceMonitor.onQuery). With Prisma 7's engineType = 'client' and the PrismaPg driver adapter, query-level events may behave differently or not fire at all compared to the binary engine. If query events are no longer emitted, the queryPerformanceMonitor and verbose Prisma logging (VERBOSE_PRISMA_LOGS) would silently stop working. This should be tested to confirm query events still fire with the new engine.

(Refers to lines 228-231)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
type PrismaTransactionClient,
type PrismaTransactionOptions,
} from "@trigger.dev/database";
import { PrismaPg } from "@prisma/adapter-pg";
import { createHash } from "node:crypto";
import invariant from "tiny-invariant";
import { z } from "zod";
import { env } from "./env.server";
Expand Down Expand Up @@ -109,21 +111,30 @@ function getClient() {
const { DATABASE_URL } = process.env;
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");

const databaseUrl = extendQueryParams(DATABASE_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
application_name: env.SERVICE_NAME,
});
const databaseUrl = new URL(DATABASE_URL);

// Set application_name as a query param on the connection string (pg understands this)
databaseUrl.searchParams.set("application_name", env.SERVICE_NAME);

console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl)}`);

const client = new PrismaClient({
datasources: {
db: {
url: databaseUrl.href,
},
const adapter = new PrismaPg(
{
connectionString: databaseUrl.href,
max: env.DATABASE_CONNECTION_LIMIT,
idleTimeoutMillis: env.DATABASE_CONNECTION_TIMEOUT * 1000,
connectionTimeoutMillis: env.DATABASE_CONNECTION_TIMEOUT * 1000,
},
{
// Generate deterministic prepared statement names from query SQL so PostgreSQL
// can reuse cached query plans. Without this, every query uses an anonymous
// prepared statement that PG must parse and plan from scratch each time.
statementNameGenerator: (query) => `p_${createHash("sha256").update(query.sql).digest("hex").slice(0, 16)}`,
}
);
Comment on lines +121 to +134
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 DATABASE_POOL_TIMEOUT env var silently unused — pool wait timeout behavior removed

The DATABASE_POOL_TIMEOUT env var (default 60s, defined at apps/webapp/app/env.server.ts:60) is still parsed but never passed to the new PrismaPg adapter configuration. The old code passed it as pool_timeout to Prisma's connection string, which controlled how long a client waits for a free connection when the pool is at capacity. The pg.Pool used by PrismaPg has no equivalent pool wait timeout — when all connections are busy, new requests queue indefinitely. Under high database load (all max connections checked out), this could cause unbounded request queuing and cascading failures, whereas previously these requests would fail cleanly after 60s.

Prompt for agents
The PrismaPg adapter is configured with pg.Pool options (max, idleTimeoutMillis, connectionTimeoutMillis) but the old DATABASE_POOL_TIMEOUT (default 60s) behavior is lost. pg.Pool does not have a built-in pool wait timeout — when all connections are busy, new callers block indefinitely. To restore the old behavior, you can either:

1. Use the 'pg-pool' option 'allowExitOnIdle' combined with a wrapper that adds a timeout to pool.connect(), or
2. Set the 'statement_timeout' and/or 'idle_in_transaction_session_timeout' on the PostgreSQL connection string, or
3. Most directly: pg.Pool doesn't have a pool-level wait timeout, but you could implement one by wrapping the adapter or by using a pg.Pool subclass that rejects after a timeout when waiting in the queue.

The same fix needs to be applied to both the primary client (getClient function around line 121) and the replica client (getReplicaClient function around line 252). Also consider removing or deprecating DATABASE_POOL_TIMEOUT from env.server.ts if it's no longer applicable, to avoid confusing self-hosting users who set it expecting it to have an effect.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


const client = new PrismaClient({
adapter,
log: [
// events
{
Expand Down Expand Up @@ -233,21 +244,25 @@ function getReplicaClient() {
return;
}

const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
application_name: env.SERVICE_NAME,
});
const replicaUrl = new URL(env.DATABASE_READ_REPLICA_URL);
replicaUrl.searchParams.set("application_name", env.SERVICE_NAME);

console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`);

const replicaClient = new PrismaClient({
datasources: {
db: {
url: replicaUrl.href,
},
const adapter = new PrismaPg(
{
connectionString: replicaUrl.href,
max: env.DATABASE_CONNECTION_LIMIT,
idleTimeoutMillis: env.DATABASE_CONNECTION_TIMEOUT * 1000,
connectionTimeoutMillis: env.DATABASE_CONNECTION_TIMEOUT * 1000,
},
{
statementNameGenerator: (query) => `p_${createHash("sha256").update(query.sql).digest("hex").slice(0, 16)}`,
}
);

const replicaClient = new PrismaClient({
adapter,
log: [
// events
{
Expand Down Expand Up @@ -350,19 +365,6 @@ function getReplicaClient() {
return replicaClient;
}

function extendQueryParams(hrefOrUrl: string | URL, queryParams: Record<string, string>) {
const url = new URL(hrefOrUrl);
const query = url.searchParams;

for (const [key, val] of Object.entries(queryParams)) {
query.set(key, val);
}

url.search = query.toString();

return url;
}

function redactUrlSecrets(hrefOrUrl: string | URL) {
const url = new URL(hrefOrUrl);
url.password = "";
Expand Down
8 changes: 1 addition & 7 deletions apps/webapp/app/routes/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { metricsRegister } from "~/metrics.server";

export async function loader({ request }: LoaderFunctionArgs) {
Expand All @@ -13,14 +12,9 @@ export async function loader({ request }: LoaderFunctionArgs) {
}
}

// We need to remove empty lines from the prisma metrics, grafana doesn't like them
const prismaMetrics = (await prisma.$metrics.prometheus()).replace(/^\s*[\r\n]/gm, "");
const coreMetrics = await metricsRegister.metrics();

// Order matters, core metrics end with `# EOF`, prisma metrics don't
const metrics = prismaMetrics + coreMetrics;

return new Response(metrics, {
return new Response(coreMetrics, {
headers: {
"Content-Type": metricsRegister.contentType,
},
Expand Down
211 changes: 0 additions & 211 deletions apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ import { LoggerSpanExporter } from "./telemetry/loggerExporter.server";
import { CompactMetricExporter } from "./telemetry/compactMetricExporter.server";
import { logger } from "~/services/logger.server";
import { flattenAttributes } from "@trigger.dev/core/v3";
import { prisma } from "~/db.server";
import { metricsRegister } from "~/metrics.server";
import type { Prisma } from "@trigger.dev/database";
import { performance } from "node:perf_hooks";

export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";
Expand Down Expand Up @@ -330,221 +328,12 @@ function setupMetrics() {

const meter = meterProvider.getMeter("trigger.dev", "3.3.12");

configurePrismaMetrics({ meter });
configureNodejsMetrics({ meter });
configureHostMetrics({ meterProvider });

return meter;
}

function configurePrismaMetrics({ meter }: { meter: Meter }) {
// Counters
const queriesTotal = meter.createObservableCounter("db.client.queries.total", {
description: "Total number of Prisma Client queries executed",
unit: "queries",
});
const datasourceQueriesTotal = meter.createObservableCounter("db.datasource.queries.total", {
description: "Total number of datasource queries executed",
unit: "queries",
});
const connectionsOpenedTotal = meter.createObservableCounter("db.pool.connections.opened.total", {
description: "Total number of pool connections opened",
unit: "connections",
});
const connectionsClosedTotal = meter.createObservableCounter("db.pool.connections.closed.total", {
description: "Total number of pool connections closed",
unit: "connections",
});

// Gauges
const queriesActive = meter.createObservableGauge("db.client.queries.active", {
description: "Number of currently active Prisma Client queries",
unit: "queries",
});
const queriesWait = meter.createObservableGauge("db.client.queries.wait", {
description: "Number of queries currently waiting for a connection",
unit: "queries",
});
const totalGauge = meter.createObservableGauge("db.pool.connections.total", {
description: "Open Prisma-pool connections",
unit: "connections",
});
const busyGauge = meter.createObservableGauge("db.pool.connections.busy", {
description: "Connections currently executing queries",
unit: "connections",
});
const freeGauge = meter.createObservableGauge("db.pool.connections.free", {
description: "Idle (free) connections in the pool",
unit: "connections",
});

// Histogram statistics as gauges
const queriesWaitTimeCount = meter.createObservableGauge("db.client.queries.wait_time.count", {
description: "Number of wait time observations",
unit: "observations",
});
const queriesWaitTimeSum = meter.createObservableGauge("db.client.queries.wait_time.sum", {
description: "Total wait time across all observations",
unit: "ms",
});
const queriesWaitTimeMean = meter.createObservableGauge("db.client.queries.wait_time.mean", {
description: "Average wait time for a connection",
unit: "ms",
});

const queriesDurationCount = meter.createObservableGauge("db.client.queries.duration.count", {
description: "Number of query duration observations",
unit: "observations",
});
const queriesDurationSum = meter.createObservableGauge("db.client.queries.duration.sum", {
description: "Total query duration across all observations",
unit: "ms",
});
const queriesDurationMean = meter.createObservableGauge("db.client.queries.duration.mean", {
description: "Average duration of Prisma Client queries",
unit: "ms",
});

const datasourceQueriesDurationCount = meter.createObservableGauge(
"db.datasource.queries.duration.count",
{
description: "Number of datasource query duration observations",
unit: "observations",
}
);
const datasourceQueriesDurationSum = meter.createObservableGauge(
"db.datasource.queries.duration.sum",
{
description: "Total datasource query duration across all observations",
unit: "ms",
}
);
const datasourceQueriesDurationMean = meter.createObservableGauge(
"db.datasource.queries.duration.mean",
{
description: "Average duration of datasource queries",
unit: "ms",
}
);

// Single helper so we hit Prisma only once per scrape ---------------------
async function readPrismaMetrics() {
const metrics = await prisma.$metrics.json();

// Extract counter values
const counters: Record<string, number> = {};
for (const counter of metrics.counters) {
counters[counter.key] = counter.value;
}

// Extract gauge values
const gauges: Record<string, number> = {};
for (const gauge of metrics.gauges) {
gauges[gauge.key] = gauge.value;
}

// Extract histogram values
const histograms: Record<string, Prisma.MetricHistogram> = {};
for (const histogram of metrics.histograms) {
histograms[histogram.key] = histogram.value;
}

return {
counters: {
queriesTotal: counters["prisma_client_queries_total"] ?? 0,
datasourceQueriesTotal: counters["prisma_datasource_queries_total"] ?? 0,
connectionsOpenedTotal: counters["prisma_pool_connections_opened_total"] ?? 0,
connectionsClosedTotal: counters["prisma_pool_connections_closed_total"] ?? 0,
},
gauges: {
queriesActive: gauges["prisma_client_queries_active"] ?? 0,
queriesWait: gauges["prisma_client_queries_wait"] ?? 0,
connectionsOpen: gauges["prisma_pool_connections_open"] ?? 0,
connectionsBusy: gauges["prisma_pool_connections_busy"] ?? 0,
connectionsIdle: gauges["prisma_pool_connections_idle"] ?? 0,
},
histograms: {
queriesWait: histograms["prisma_client_queries_wait_histogram_ms"],
queriesDuration: histograms["prisma_client_queries_duration_histogram_ms"],
datasourceQueriesDuration: histograms["prisma_datasource_queries_duration_histogram_ms"],
},
};
}

meter.addBatchObservableCallback(
async (res) => {
const { counters, gauges, histograms } = await readPrismaMetrics();

// Observe counters
res.observe(queriesTotal, counters.queriesTotal);
res.observe(datasourceQueriesTotal, counters.datasourceQueriesTotal);
res.observe(connectionsOpenedTotal, counters.connectionsOpenedTotal);
res.observe(connectionsClosedTotal, counters.connectionsClosedTotal);

// Observe gauges
res.observe(queriesActive, gauges.queriesActive);
res.observe(queriesWait, gauges.queriesWait);
res.observe(totalGauge, gauges.connectionsOpen);
res.observe(busyGauge, gauges.connectionsBusy);
res.observe(freeGauge, gauges.connectionsIdle);

// Observe histogram statistics as gauges
if (histograms.queriesWait) {
res.observe(queriesWaitTimeCount, histograms.queriesWait.count);
res.observe(queriesWaitTimeSum, histograms.queriesWait.sum);
res.observe(
queriesWaitTimeMean,
histograms.queriesWait.count > 0
? histograms.queriesWait.sum / histograms.queriesWait.count
: 0
);
}

if (histograms.queriesDuration) {
res.observe(queriesDurationCount, histograms.queriesDuration.count);
res.observe(queriesDurationSum, histograms.queriesDuration.sum);
res.observe(
queriesDurationMean,
histograms.queriesDuration.count > 0
? histograms.queriesDuration.sum / histograms.queriesDuration.count
: 0
);
}

if (histograms.datasourceQueriesDuration) {
res.observe(datasourceQueriesDurationCount, histograms.datasourceQueriesDuration.count);
res.observe(datasourceQueriesDurationSum, histograms.datasourceQueriesDuration.sum);
res.observe(
datasourceQueriesDurationMean,
histograms.datasourceQueriesDuration.count > 0
? histograms.datasourceQueriesDuration.sum / histograms.datasourceQueriesDuration.count
: 0
);
}
},
[
queriesTotal,
datasourceQueriesTotal,
connectionsOpenedTotal,
connectionsClosedTotal,
queriesActive,
queriesWait,
totalGauge,
busyGauge,
freeGauge,
queriesWaitTimeCount,
queriesWaitTimeSum,
queriesWaitTimeMean,
queriesDurationCount,
queriesDurationSum,
queriesDurationMean,
datasourceQueriesDurationCount,
datasourceQueriesDurationSum,
datasourceQueriesDurationMean,
]
);
}

function configureNodejsMetrics({ meter }: { meter: Meter }) {
if (!env.INTERNAL_OTEL_NODEJS_METRICS_ENABLED) {
return;
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
"@opentelemetry/sdk-trace-node": "2.0.1",
"@opentelemetry/semantic-conventions": "1.36.0",
"@popperjs/core": "^2.11.8",
"@prisma/instrumentation": "^6.14.0",
"@prisma/instrumentation": "^7.7.0",
"@radix-ui/react-accordion": "^1.2.11",
"@radix-ui/react-alert-dialog": "^1.0.4",
"@radix-ui/react-dialog": "^1.0.3",
Expand Down
10 changes: 3 additions & 7 deletions apps/webapp/test/runsReplicationBenchmark.producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import { PrismaClient } from "@trigger.dev/database";
import { PrismaPg } from "@prisma/adapter-pg";
import { performance } from "node:perf_hooks";

interface ProducerConfig {
Expand Down Expand Up @@ -91,13 +92,8 @@ function generateError() {
}

async function runProducer(config: ProducerConfig) {
const prisma = new PrismaClient({
datasources: {
db: {
url: config.postgresUrl,
},
},
});
const adapter = new PrismaPg(config.postgresUrl);
const prisma = new PrismaClient({ adapter });

try {
console.log(
Expand Down
Loading
Loading