From 156427ee33e8b11253a64a9f2497fe4fdfd019cb Mon Sep 17 00:00:00 2001 From: mantrakp04 Date: Thu, 21 May 2026 13:58:00 -0700 Subject: [PATCH 1/3] Move internal metrics queries to ClickHouse replica Switches loadTotalUsers, loadAuthOverview, and loadRecentlyActiveUsers to read from the ClickHouse analytics tables instead of hitting Postgres directly, and routes the remaining Postgres reads through $replica(). --- .../app/api/latest/internal/metrics/route.tsx | 229 ++++++++++++------ 1 file changed, 160 insertions(+), 69 deletions(-) diff --git a/apps/backend/src/app/api/latest/internal/metrics/route.tsx b/apps/backend/src/app/api/latest/internal/metrics/route.tsx index faf8a0df6c..669e7993ad 100644 --- a/apps/backend/src/app/api/latest/internal/metrics/route.tsx +++ b/apps/backend/src/app/api/latest/internal/metrics/route.tsx @@ -219,7 +219,7 @@ async function loadActiveUsersByCountry( if (allIds.size === 0) return {}; const prisma = await getPrismaClientForTenancy(tenancy); - const dbUsers = await prisma.projectUser.findMany({ + const dbUsers = await prisma.$replica().projectUser.findMany({ where: { tenancyId: tenancy.id, projectUserId: { in: Array.from(allIds) }, @@ -313,32 +313,46 @@ async function loadLiveUsersCount( } async function loadTotalUsers(tenancy: Tenancy, now: Date, includeAnonymous: boolean = false): Promise { - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); - return (await prisma.$replica().$queryRaw<{ date: Date, dailyUsers: bigint, cumUsers: bigint }[]>` - WITH date_series AS ( - SELECT GENERATE_SERIES( - ${now}::date - INTERVAL '30 days', - ${now}::date, - '1 day' - ) - AS registration_day - ) - SELECT - ds.registration_day AS "date", - COALESCE(COUNT(pu."projectUserId"), 0) AS "dailyUsers", - SUM(COALESCE(COUNT(pu."projectUserId"), 0)) OVER (ORDER BY ds.registration_day) AS "cumUsers" - FROM date_series ds - LEFT JOIN ${sqlQuoteIdent(schema)}."ProjectUser" pu - ON DATE(COALESCE(pu."signedUpAt", pu."createdAt")) = ds.registration_day - AND pu."tenancyId" = ${tenancy.id}::UUID - AND (${includeAnonymous} OR pu."isAnonymous" = false) - GROUP BY ds.registration_day - ORDER BY ds.registration_day - `).map((x) => ({ - date: x.date.toISOString().split('T')[0], - activity: Number(x.dailyUsers), - })); + const { since, untilExclusive } = getMetricsWindowBounds(now); + const clickhouseClient = getClickhouseAdminClient(); + + const result = await clickhouseClient.query({ + query: ` + SELECT + toDate(signed_up_at) AS day, + count() AS daily_users + FROM analytics_internal.users FINAL + WHERE project_id = {projectId:String} + AND branch_id = {branchId:String} + AND sync_is_deleted = 0 + AND signed_up_at >= {since:DateTime} + AND signed_up_at < {untilExclusive:DateTime} + AND ({includeAnonymous:UInt8} = 1 OR is_anonymous = 0) + GROUP BY day + ORDER BY day + `, + query_params: { + projectId: tenancy.project.id, + branchId: tenancy.branchId, + since: formatClickhouseDateTimeParam(since), + untilExclusive: formatClickhouseDateTimeParam(untilExclusive), + includeAnonymous: includeAnonymous ? 1 : 0, + }, + format: "JSONEachRow", + }); + const rows = await result.json() as { day: string, daily_users: string | number }[]; + + const countByDay = new Map(); + for (const row of rows) { + countByDay.set(row.day.split('T')[0], Number(row.daily_users)); + } + + const out: DataPoints = []; + for (let i = 0; i <= METRICS_WINDOW_DAYS; i++) { + const dayKey = new Date(since.getTime() + i * ONE_DAY_MS).toISOString().split('T')[0]; + out.push({ date: dayKey, activity: countByDay.get(dayKey) ?? 0 }); + } + return out; } async function loadDailyActiveUsers(tenancy: Tenancy, now: Date, includeAnonymous: boolean = false) { @@ -525,21 +539,77 @@ async function loadLoginMethods(tenancy: Tenancy): Promise<{ method: string, cou `; } +const RECENTLY_ACTIVE_USERS_LIMIT = 5; + async function loadRecentlyActiveUsers(tenancy: Tenancy, includeAnonymous: boolean = false): Promise { + const { since, untilExclusive } = getMetricsWindowBounds(new Date()); + const clickhouseClient = getClickhouseAdminClient(); + + let orderedUserIds: string[] = []; + try { + const result = await clickhouseClient.query({ + query: ` + SELECT + assumeNotNull(user_id) AS user_id, + max(event_at) AS last_active + FROM analytics_internal.events + WHERE event_type = '$token-refresh' + AND project_id = {projectId:String} + AND branch_id = {branchId:String} + AND user_id IS NOT NULL + AND event_at >= {since:DateTime} + AND event_at < {untilExclusive:DateTime} + AND ({includeAnonymous:UInt8} = 1 OR coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) = 0) + GROUP BY user_id + ORDER BY last_active DESC + LIMIT {limit:UInt32} + `, + query_params: { + projectId: tenancy.project.id, + branchId: tenancy.branchId, + since: formatClickhouseDateTimeParam(since), + untilExclusive: formatClickhouseDateTimeParam(untilExclusive), + includeAnonymous: includeAnonymous ? 1 : 0, + limit: RECENTLY_ACTIVE_USERS_LIMIT, + }, + format: "JSONEachRow", + }); + const rows = await result.json() as { user_id: string, last_active: string }[]; + orderedUserIds = rows + .map((r) => normalizeUuidFromEvent(r.user_id)) + .filter((id): id is string => id != null); + } catch (error) { + if (!(error instanceof ClickHouseError)) { + throw error; + } + captureError("internal-metrics-recently-active-users-clickhouse-fallback", new StackAssertionError( + "Falling back to empty recently-active users due to ClickHouse query failure.", + { + cause: error, + projectId: tenancy.project.id, + branchId: tenancy.branchId, + }, + )); + return []; + } + + if (orderedUserIds.length === 0) return []; + const prisma = await getPrismaClientForTenancy(tenancy); - const dbUsers = await prisma.projectUser.findMany({ + const dbUsers = await prisma.$replica().projectUser.findMany({ where: { tenancyId: tenancy.id, + projectUserId: { in: orderedUserIds }, ...(!includeAnonymous ? { isAnonymous: false } : {}), }, - orderBy: { - lastActiveAt: 'desc', - }, - take: 5, include: userFullInclude, }); - return dbUsers.map((user) => userPrismaToCrud(user, tenancy.config)); + const byId = new Map(dbUsers.map((u) => [u.projectUserId, u])); + return orderedUserIds + .map((id) => byId.get(id)) + .filter((u): u is NonNullable => u != null) + .map((user) => userPrismaToCrud(user, tenancy.config)); } // Fallback visitor counts derived purely from `$token-refresh` events so the @@ -1354,48 +1424,69 @@ async function loadAnalyticsOverview(tenancy: Tenancy, now: Date, includeAnonymo // ── Auth Extra Aggregates ──────────────────────────────────────────────────── async function loadAuthOverview(tenancy: Tenancy, includeAnonymous: boolean, now: Date) { - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); + const clickhouseClient = getClickhouseAdminClient(); - const [counts, dailyActiveUsersSplit, dailyActiveTeamsSplit, mau] = await Promise.all([ - prisma.$replica().$queryRaw<[{ - total_users: number, - verified_non_anonymous_users: number, - anonymous_users: number, - total_teams: number, - }]>` - SELECT - (SELECT COUNT(*)::int - FROM ${sqlQuoteIdent(schema)}."ProjectUser" - WHERE "tenancyId" = ${tenancy.id}::UUID) AS total_users, - (SELECT COUNT(*)::int - FROM ${sqlQuoteIdent(schema)}."ProjectUser" pu - WHERE pu."tenancyId" = ${tenancy.id}::UUID - AND pu."isAnonymous" = false - AND EXISTS ( - SELECT 1 FROM ${sqlQuoteIdent(schema)}."ContactChannel" cc - WHERE cc."tenancyId" = pu."tenancyId" - AND cc."projectUserId" = pu."projectUserId" - AND cc."type" = 'EMAIL'::"ContactChannelType" - AND cc."isVerified" = true - )) AS verified_non_anonymous_users, - (SELECT COUNT(*)::int - FROM ${sqlQuoteIdent(schema)}."ProjectUser" - WHERE "tenancyId" = ${tenancy.id}::UUID - AND "isAnonymous" = true) AS anonymous_users, - (SELECT COUNT(*)::int - FROM ${sqlQuoteIdent(schema)}."Team" - WHERE "tenancyId" = ${tenancy.id}::UUID) AS total_teams - `, + const [usersRow, teamsRow, dailyActiveUsersSplit, dailyActiveTeamsSplit, mau] = await Promise.all([ + clickhouseClient.query({ + query: ` + SELECT + countIf(sync_is_deleted = 0) AS total_users, + countIf(sync_is_deleted = 0 AND is_anonymous = 1) AS anonymous_users, + countIf( + sync_is_deleted = 0 + AND is_anonymous = 0 + AND id IN ( + SELECT user_id + FROM analytics_internal.contact_channels FINAL + WHERE project_id = {projectId:String} + AND branch_id = {branchId:String} + AND sync_is_deleted = 0 + AND type = 'EMAIL' + AND is_verified = 1 + ) + ) AS verified_non_anonymous_users + FROM analytics_internal.users FINAL + WHERE project_id = {projectId:String} + AND branch_id = {branchId:String} + `, + query_params: { + projectId: tenancy.project.id, + branchId: tenancy.branchId, + }, + format: "JSONEachRow", + }).then(async (r) => { + const rows = await r.json() as [{ + total_users: string | number, + anonymous_users: string | number, + verified_non_anonymous_users: string | number, + }]; + return rows[0]; + }), + clickhouseClient.query({ + query: ` + SELECT countIf(sync_is_deleted = 0) AS total_teams + FROM analytics_internal.teams FINAL + WHERE project_id = {projectId:String} + AND branch_id = {branchId:String} + `, + query_params: { + projectId: tenancy.project.id, + branchId: tenancy.branchId, + }, + format: "JSONEachRow", + }).then(async (r) => { + const rows = await r.json() as [{ total_teams: string | number }]; + return rows[0]; + }), loadDailyActiveUsersSplit(tenancy, now, includeAnonymous), loadDailyActiveTeamsSplit(tenancy, now), loadMonthlyActiveUsers(tenancy, now, includeAnonymous), ]); - const totalUsers = Number(counts[0].total_users); - const verifiedNonAnonymousUsers = Number(counts[0].verified_non_anonymous_users); - const anonymousUsers = Number(counts[0].anonymous_users); - const totalTeams = Number(counts[0].total_teams); + const totalUsers = Number(usersRow.total_users); + const verifiedNonAnonymousUsers = Number(usersRow.verified_non_anonymous_users); + const anonymousUsers = Number(usersRow.anonymous_users); + const totalTeams = Number(teamsRow.total_teams); const nonAnonymousTotal = totalUsers - anonymousUsers; // total_users_filtered respects the includeAnonymous query flag so the // handler can use it directly without a separate count round trip. From db86c34fdcaf6198df099885466ca28d0b0a364d Mon Sep 17 00:00:00 2001 From: mantrakp04 Date: Thu, 21 May 2026 14:07:43 -0700 Subject: [PATCH 2/3] Refactor loadRecentlyActiveUsers to streamline database queries Removed ClickHouse query fallback and directly utilized Prisma for fetching recently active users. The function now orders results by last active date and limits the output to the top 5 users, improving performance and simplifying error handling. --- .../app/api/latest/internal/metrics/route.tsx | 66 ++----------------- 1 file changed, 5 insertions(+), 61 deletions(-) diff --git a/apps/backend/src/app/api/latest/internal/metrics/route.tsx b/apps/backend/src/app/api/latest/internal/metrics/route.tsx index 669e7993ad..492a82de1e 100644 --- a/apps/backend/src/app/api/latest/internal/metrics/route.tsx +++ b/apps/backend/src/app/api/latest/internal/metrics/route.tsx @@ -539,77 +539,21 @@ async function loadLoginMethods(tenancy: Tenancy): Promise<{ method: string, cou `; } -const RECENTLY_ACTIVE_USERS_LIMIT = 5; - async function loadRecentlyActiveUsers(tenancy: Tenancy, includeAnonymous: boolean = false): Promise { - const { since, untilExclusive } = getMetricsWindowBounds(new Date()); - const clickhouseClient = getClickhouseAdminClient(); - - let orderedUserIds: string[] = []; - try { - const result = await clickhouseClient.query({ - query: ` - SELECT - assumeNotNull(user_id) AS user_id, - max(event_at) AS last_active - FROM analytics_internal.events - WHERE event_type = '$token-refresh' - AND project_id = {projectId:String} - AND branch_id = {branchId:String} - AND user_id IS NOT NULL - AND event_at >= {since:DateTime} - AND event_at < {untilExclusive:DateTime} - AND ({includeAnonymous:UInt8} = 1 OR coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) = 0) - GROUP BY user_id - ORDER BY last_active DESC - LIMIT {limit:UInt32} - `, - query_params: { - projectId: tenancy.project.id, - branchId: tenancy.branchId, - since: formatClickhouseDateTimeParam(since), - untilExclusive: formatClickhouseDateTimeParam(untilExclusive), - includeAnonymous: includeAnonymous ? 1 : 0, - limit: RECENTLY_ACTIVE_USERS_LIMIT, - }, - format: "JSONEachRow", - }); - const rows = await result.json() as { user_id: string, last_active: string }[]; - orderedUserIds = rows - .map((r) => normalizeUuidFromEvent(r.user_id)) - .filter((id): id is string => id != null); - } catch (error) { - if (!(error instanceof ClickHouseError)) { - throw error; - } - captureError("internal-metrics-recently-active-users-clickhouse-fallback", new StackAssertionError( - "Falling back to empty recently-active users due to ClickHouse query failure.", - { - cause: error, - projectId: tenancy.project.id, - branchId: tenancy.branchId, - }, - )); - return []; - } - - if (orderedUserIds.length === 0) return []; - const prisma = await getPrismaClientForTenancy(tenancy); const dbUsers = await prisma.$replica().projectUser.findMany({ where: { tenancyId: tenancy.id, - projectUserId: { in: orderedUserIds }, ...(!includeAnonymous ? { isAnonymous: false } : {}), }, + orderBy: { + lastActiveAt: 'desc', + }, + take: 5, include: userFullInclude, }); - const byId = new Map(dbUsers.map((u) => [u.projectUserId, u])); - return orderedUserIds - .map((id) => byId.get(id)) - .filter((u): u is NonNullable => u != null) - .map((user) => userPrismaToCrud(user, tenancy.config)); + return dbUsers.map((user) => userPrismaToCrud(user, tenancy.config)); } // Fallback visitor counts derived purely from `$token-refresh` events so the From 9738d0452d1bcc56fd861cb2ccd85b0960aabdf6 Mon Sep 17 00:00:00 2001 From: mantrakp04 Date: Thu, 21 May 2026 14:20:58 -0700 Subject: [PATCH 3/3] Fix remaining getClickhouseAdminClient references in metrics route --- apps/backend/src/app/api/latest/internal/metrics/route.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/app/api/latest/internal/metrics/route.tsx b/apps/backend/src/app/api/latest/internal/metrics/route.tsx index 9872203897..f2b5e717d0 100644 --- a/apps/backend/src/app/api/latest/internal/metrics/route.tsx +++ b/apps/backend/src/app/api/latest/internal/metrics/route.tsx @@ -321,7 +321,7 @@ async function loadLiveUsersCount( async function loadTotalUsers(tenancy: Tenancy, now: Date, includeAnonymous: boolean = false): Promise { const { since, untilExclusive } = getMetricsWindowBounds(now); - const clickhouseClient = getClickhouseAdminClient(); + const clickhouseClient = getClickhouseAdminClientForMetrics(); const result = await clickhouseClient.query({ query: ` @@ -1386,7 +1386,7 @@ async function loadAnalyticsOverview(tenancy: Tenancy, now: Date, includeAnonymo // ── Auth Extra Aggregates ──────────────────────────────────────────────────── async function loadAuthOverview(tenancy: Tenancy, includeAnonymous: boolean, now: Date) { - const clickhouseClient = getClickhouseAdminClient(); + const clickhouseClient = getClickhouseAdminClientForMetrics(); const [usersRow, teamsRow, dailyActiveUsersSplit, dailyActiveTeamsSplit, mau] = await Promise.all([ clickhouseClient.query({