Skip to content

Commit 3338a22

Browse files
Copilotshoaibsharif
andcommitted
feat: realtime UI updates via Pusher/Soketi + TanStack Query invalidation (Phase 1 & 2)
Agent-Logs-Url: https://github.com/techdiary-dev/techdiary.dev/sessions/b073d05d-7457-4406-b59a-cafb8dac4c4d Co-authored-by: shoaibsharif <29075110+shoaibsharif@users.noreply.github.com>
1 parent 4e98ab7 commit 3338a22

10 files changed

Lines changed: 243 additions & 2 deletions

File tree

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
"modern-screenshot": "^4.6.8",
6161
"next": "^16.2.1",
6262
"pg": "^8.14.1",
63+
"pusher": "^5.3.3",
64+
"pusher-js": "^8.5.0",
6365
"react": "^19",
6466
"react-advanced-cropper": "^0.20.1",
6567
"react-dom": "^19",

src/app/api/pusher/auth/route.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { pusherServer } from "@/lib/pusher.server";
2+
import { authID } from "@/backend/services/session.actions";
3+
import { NextRequest, NextResponse } from "next/server";
4+
5+
/**
6+
* Pusher private-channel auth endpoint.
7+
* Pusher-js POSTs `socket_id` and `channel_name` as application/x-www-form-urlencoded.
8+
* We verify the user's session and only sign subscriptions to the caller's own channel.
9+
*/
10+
export async function POST(req: NextRequest) {
11+
if (!pusherServer) {
12+
return new NextResponse("Realtime not configured", { status: 503 });
13+
}
14+
15+
const userId = await authID();
16+
if (!userId) {
17+
return new NextResponse("Unauthorized", { status: 401 });
18+
}
19+
20+
const body = await req.text();
21+
const params = new URLSearchParams(body);
22+
const socketId = params.get("socket_id") ?? "";
23+
const channel = params.get("channel_name") ?? "";
24+
25+
// Only allow subscribing to the caller's own user channel
26+
if (channel !== `private-user.${userId}`) {
27+
return new NextResponse("Forbidden", { status: 403 });
28+
}
29+
30+
const authResponse = pusherServer.authorizeChannel(socketId, channel);
31+
return NextResponse.json(authResponse);
32+
}

src/backend/services/comment.action.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { and, eq, inArray } from "sqlkit";
1010
import { CommentPresentation } from "../models/domain-models";
1111
import { inngest } from "@/lib/inngest";
1212
import { assertCommentResourceExists } from "./notifications.payload";
13+
import { pusherServer } from "@/lib/pusher.server";
1314

1415
const sql = String.raw;
1516

@@ -68,6 +69,14 @@ export const createMyComment = async (
6869
console.error("[inngest] Failed to send notification event:", err);
6970
});
7071

72+
pusherServer
73+
?.trigger(
74+
`resource.${resource_type}.${resource_id}`,
75+
"comment.created",
76+
{ scope: "comments" },
77+
)
78+
.catch(() => {});
79+
7180
return created?.rows?.[0];
7281
};
7382

@@ -97,6 +106,14 @@ export const updateMyComment = async (
97106
data: { body: input.body, updated_at: new Date() },
98107
});
99108

109+
pusherServer
110+
?.trigger(
111+
`resource.${existing.resource_type}.${existing.resource_id}`,
112+
"comment.updated",
113+
{ scope: "comments" },
114+
)
115+
.catch(() => {});
116+
100117
return { success: true as const, data: { id: input.id } };
101118
} catch (error) {
102119
return handleActionException(error);
@@ -149,6 +166,14 @@ export const deleteMyComment = async (
149166
where: inArray("id", ids),
150167
});
151168

169+
pusherServer
170+
?.trigger(
171+
`resource.${root.resource_type}.${root.resource_id}`,
172+
"comment.deleted",
173+
{ scope: "comments" },
174+
)
175+
.catch(() => {});
176+
152177
return { success: true as const, data: { id: input.id } };
153178
} catch (error) {
154179
return handleActionException(error);

src/components/comment-section.tsx

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import {
1818
Trash2,
1919
} from "lucide-react";
2020
import Link from "next/link";
21-
import React, { useMemo, useState } from "react";
21+
import React, { useMemo, useState, useEffect } from "react";
2222
import { useImmer } from "use-immer";
2323
import { useLoginPopup } from "./app-login-popup";
2424
import ResourceReaction from "./ResourceReaction";
@@ -38,6 +38,7 @@ import { Button } from "./ui/button";
3838
import { Skeleton } from "./ui/skeleton";
3939
import { Textarea } from "./ui/textarea";
4040
import getFileUrl from "@/utils/getFileUrl";
41+
import { getPusherClient } from "@/lib/pusher.client";
4142

4243
const Context = React.createContext<
4344
{ mutatingId?: string; setMutatingId: (id?: string) => void } | undefined
@@ -155,6 +156,28 @@ export const CommentSection = (props: {
155156
refetchOnReconnect: false,
156157
});
157158

159+
// Phase 2: subscribe to the resource's public Pusher channel and
160+
// invalidate the comments query when any mutation arrives from another client.
161+
useEffect(() => {
162+
const pusher = getPusherClient();
163+
if (!pusher) return;
164+
165+
const channelName = `resource.${props.resource_type}.${props.resource_id}`;
166+
const channel = pusher.subscribe(channelName);
167+
const invalidate = () => {
168+
queryClient.invalidateQueries({
169+
queryKey: ["comments", props.resource_id, props.resource_type],
170+
});
171+
};
172+
const events = ["comment.created", "comment.updated", "comment.deleted"];
173+
events.forEach((event) => channel.bind(event, invalidate));
174+
175+
return () => {
176+
events.forEach((event) => channel.unbind(event, invalidate));
177+
pusher.unsubscribe(channelName);
178+
};
179+
}, [props.resource_id, props.resource_type, queryClient]);
180+
158181
const generated_comment_id = () => crypto.randomUUID();
159182
const listQueryKey = [
160183
"comments",

src/components/providers/CommonProviders.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { ThemeProvider } from "./theme-provider";
1111
import { AppConfirmProvider } from "../app-confirm";
1212
import { AppAlertProvider } from "../app-alert";
1313
import { AppLoginPopupProvider } from "../app-login-popup";
14+
import { RealtimeProvider } from "./RealtimeProvider";
1415

1516
type Props = PropsWithChildren<{
1617
initialTheme?: ThemePreference;
@@ -34,7 +35,7 @@ const CommonProviders: React.FC<Props> = ({
3435
initialTheme={initialTheme}
3536
migrateThemeFromLocalStorage={migrateThemeFromLocalStorage}
3637
>
37-
{children}
38+
<RealtimeProvider>{children}</RealtimeProvider>
3839
</ThemeProvider>
3940
</AppLoginPopupProvider>
4041
</Suspense>
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"use client";
2+
3+
import { getPusherClient } from "@/lib/pusher.client";
4+
import { useSession } from "@/store/session.atom";
5+
import { useQueryClient } from "@tanstack/react-query";
6+
import React, { PropsWithChildren, useEffect } from "react";
7+
8+
/**
9+
* Subscribes to the authenticated user's private Pusher channel
10+
* (`private-user.{userId}`) and invalidates TanStack Query caches
11+
* when realtime events arrive.
12+
*
13+
* Phase 1 — Notifications:
14+
* event `notification.new` → invalidate `my-notifications` and
15+
* `unread-notification-count`.
16+
*
17+
* This component is a no-op when Pusher is not configured
18+
* (NEXT_PUBLIC_PUSHER_APP_KEY is absent) or when the user is not
19+
* signed in.
20+
*/
21+
export function RealtimeProvider({ children }: PropsWithChildren) {
22+
const session = useSession();
23+
const queryClient = useQueryClient();
24+
25+
const userId = session?.user?.id;
26+
27+
useEffect(() => {
28+
if (!userId) return;
29+
30+
const pusher = getPusherClient();
31+
if (!pusher) return;
32+
33+
const channelName = `private-user.${userId}`;
34+
const channel = pusher.subscribe(channelName);
35+
36+
channel.bind("notification.new", () => {
37+
queryClient.invalidateQueries({ queryKey: ["my-notifications"] });
38+
queryClient.invalidateQueries({ queryKey: ["unread-notification-count"] });
39+
});
40+
41+
return () => {
42+
channel.unbind_all();
43+
pusher.unsubscribe(channelName);
44+
};
45+
}, [userId, queryClient]);
46+
47+
return <>{children}</>;
48+
}

src/env.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,26 @@ export const env = createEnv({
2525
// Inngest
2626
INNGEST_EVENT_KEY: z.string().optional(),
2727
INNGEST_SIGNING_KEY: z.string().optional(),
28+
29+
// Pusher / Soketi (server-side)
30+
PUSHER_APP_ID: z.string().optional(),
31+
PUSHER_APP_KEY: z.string().optional(),
32+
PUSHER_APP_SECRET: z.string().optional(),
33+
PUSHER_HOST: z.string().optional(),
34+
PUSHER_PORT: z.string().optional(),
35+
PUSHER_USE_TLS: z.string().optional(),
36+
PUSHER_CLUSTER: z.string().optional(),
2837
},
2938
client: {
3039
NEXT_PUBLIC_MEILISEARCH_API_HOST: z.url(),
3140
NEXT_PUBLIC_MEILISEARCH_SEARCH_API_KEY: z.string(),
41+
42+
// Pusher / Soketi (client-side)
43+
NEXT_PUBLIC_PUSHER_APP_KEY: z.string().optional(),
44+
NEXT_PUBLIC_PUSHER_HOST: z.string().optional(),
45+
NEXT_PUBLIC_PUSHER_PORT: z.string().optional(),
46+
NEXT_PUBLIC_PUSHER_FORCE_TLS: z.string().optional(),
47+
NEXT_PUBLIC_PUSHER_CLUSTER: z.string().optional(),
3248
},
3349
runtimeEnv: {
3450
NODE_ENV: process.env.NODE_ENV,
@@ -54,6 +70,20 @@ export const env = createEnv({
5470

5571
INNGEST_EVENT_KEY: process.env.INNGEST_EVENT_KEY,
5672
INNGEST_SIGNING_KEY: process.env.INNGEST_SIGNING_KEY,
73+
74+
PUSHER_APP_ID: process.env.PUSHER_APP_ID,
75+
PUSHER_APP_KEY: process.env.PUSHER_APP_KEY,
76+
PUSHER_APP_SECRET: process.env.PUSHER_APP_SECRET,
77+
PUSHER_HOST: process.env.PUSHER_HOST,
78+
PUSHER_PORT: process.env.PUSHER_PORT,
79+
PUSHER_USE_TLS: process.env.PUSHER_USE_TLS,
80+
PUSHER_CLUSTER: process.env.PUSHER_CLUSTER,
81+
82+
NEXT_PUBLIC_PUSHER_APP_KEY: process.env.NEXT_PUBLIC_PUSHER_APP_KEY,
83+
NEXT_PUBLIC_PUSHER_HOST: process.env.NEXT_PUBLIC_PUSHER_HOST,
84+
NEXT_PUBLIC_PUSHER_PORT: process.env.NEXT_PUBLIC_PUSHER_PORT,
85+
NEXT_PUBLIC_PUSHER_FORCE_TLS: process.env.NEXT_PUBLIC_PUSHER_FORCE_TLS,
86+
NEXT_PUBLIC_PUSHER_CLUSTER: process.env.NEXT_PUBLIC_PUSHER_CLUSTER,
5787
},
5888
onValidationError(issues: readonly StandardSchemaV1.Issue[]) {
5989
console.error("❌ Invalid environment variables:", issues);

src/lib/inngest.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { persistenceRepository } from "@/backend/persistence/persistence-repositories";
88
import { ActionException } from "@/backend/services/RepositoryException";
99
import { buildPersistableNotification } from "@/backend/services/notifications.payload";
10+
import { pusherServer } from "@/lib/pusher.server";
1011

1112
const notificationPayloadSchema = z.object({
1213
article_id: z.string().optional(),
@@ -176,6 +177,18 @@ export const persistNotificationFn = inngest.createFunction(
176177
},
177178
]);
178179

180+
// Broadcast a lightweight signal so the recipient's browser can invalidate
181+
// its TanStack Query caches without polling.
182+
if (pusherServer) {
183+
await pusherServer
184+
.trigger(`private-user.${data.recipient_id}`, "notification.new", {
185+
scope: "notifications",
186+
})
187+
.catch(() => {
188+
// Publishing is best-effort; never let a Pusher failure break notification delivery.
189+
});
190+
}
191+
179192
return { success: true };
180193
},
181194
);

src/lib/pusher.client.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import Pusher from "pusher-js";
2+
import { env } from "@/env";
3+
4+
let _pusherClient: Pusher | null = null;
5+
6+
/**
7+
* Returns a shared Pusher/Soketi client instance.
8+
* Returns null when the public app key is not configured.
9+
*/
10+
export function getPusherClient(): Pusher | null {
11+
if (typeof window === "undefined") return null;
12+
13+
const key = env.NEXT_PUBLIC_PUSHER_APP_KEY;
14+
if (!key) return null;
15+
16+
if (_pusherClient) return _pusherClient;
17+
18+
const forceTLS = env.NEXT_PUBLIC_PUSHER_FORCE_TLS !== "false";
19+
const port = env.NEXT_PUBLIC_PUSHER_PORT
20+
? Number(env.NEXT_PUBLIC_PUSHER_PORT)
21+
: undefined;
22+
23+
// pusher-js always requires cluster; use empty string as placeholder when
24+
// connecting to a self-hosted Soketi/compatible broker via wsHost.
25+
_pusherClient = new Pusher(key, {
26+
cluster: env.NEXT_PUBLIC_PUSHER_CLUSTER ?? "mt1",
27+
authEndpoint: "/api/pusher/auth",
28+
...(env.NEXT_PUBLIC_PUSHER_HOST && {
29+
wsHost: env.NEXT_PUBLIC_PUSHER_HOST,
30+
wsPort: port,
31+
wssPort: port,
32+
enabledTransports: ["ws", "wss"],
33+
}),
34+
forceTLS,
35+
});
36+
37+
return _pusherClient;
38+
}

src/lib/pusher.server.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import Pusher from "pusher";
2+
import { env } from "@/env";
3+
4+
/**
5+
* Lazy singleton for the server-side Pusher/Soketi client.
6+
* Returns null when Pusher is not configured (no PUSHER_APP_ID etc.),
7+
* so callers can gracefully skip publishing in non-realtime environments.
8+
*/
9+
function createPusherServer(): Pusher | null {
10+
const { PUSHER_APP_ID, PUSHER_APP_KEY, PUSHER_APP_SECRET } = env;
11+
if (!PUSHER_APP_ID || !PUSHER_APP_KEY || !PUSHER_APP_SECRET) {
12+
return null;
13+
}
14+
15+
const base = {
16+
appId: PUSHER_APP_ID,
17+
key: PUSHER_APP_KEY,
18+
secret: PUSHER_APP_SECRET,
19+
useTLS: env.PUSHER_USE_TLS !== "false",
20+
};
21+
22+
if (env.PUSHER_HOST) {
23+
return new Pusher({ ...base, host: env.PUSHER_HOST, port: env.PUSHER_PORT });
24+
}
25+
26+
return new Pusher({ ...base, cluster: env.PUSHER_CLUSTER ?? "mt1" });
27+
}
28+
29+
export const pusherServer = createPusherServer();

0 commit comments

Comments
 (0)