Skip to content

Commit ce13cb8

Browse files
committed
refactor: update Pusher integration and environment variable validation
- Replaced Pusher client and server imports with a unified publishMessage function for better abstraction. - Updated environment variable validation to require non-empty values for Pusher-related keys. - Removed deprecated Pusher client and server files to streamline the codebase.
1 parent 311c7f2 commit ce13cb8

10 files changed

Lines changed: 148 additions & 132 deletions

File tree

src/app/api/socket/auth/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { pusherServer } from "@/lib/pusher.server";
1+
import { pusherServer } from "@/lib/pusher/pusher.server";
22
import { authID } from "@/backend/services/session.actions";
33
import { NextRequest, NextResponse } from "next/server";
44

src/backend/services/comment.action.ts

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { and, eq, inArray } from "sqlkit";
1010
import { CommentPresentation } from "../models/domain-models";
1111
import { inngest } from "@/lib/inngest";
1212
import { assertCommentResourceExists } from "./notifications.payload";
13-
import { pusherServer } from "@/lib/pusher.server";
13+
import { publishMessage } from "@/lib/pusher/pusher.server";
1414

1515
const sql = String.raw;
1616

@@ -69,13 +69,11 @@ export const createMyComment = async (
6969
console.error("[inngest] Failed to send notification event:", err);
7070
});
7171

72-
pusherServer
73-
?.trigger(
74-
`resource.${resource_type}.${resource_id}`,
75-
"comment.created",
76-
{ scope: "comments" },
77-
)
78-
.catch(() => {});
72+
void publishMessage(
73+
`resource.${resource_type}.${resource_id}`,
74+
"comment.created",
75+
{ scope: "comments" },
76+
);
7977

8078
return created?.rows?.[0];
8179
};
@@ -106,13 +104,11 @@ export const updateMyComment = async (
106104
data: { body: input.body, updated_at: new Date() },
107105
});
108106

109-
pusherServer
110-
?.trigger(
111-
`resource.${existing.resource_type}.${existing.resource_id}`,
112-
"comment.updated",
113-
{ scope: "comments" },
114-
)
115-
.catch(() => {});
107+
void publishMessage(
108+
`resource.${existing.resource_type}.${existing.resource_id}`,
109+
"comment.updated",
110+
{ scope: "comments" },
111+
);
116112

117113
return { success: true as const, data: { id: input.id } };
118114
} catch (error) {
@@ -166,13 +162,11 @@ export const deleteMyComment = async (
166162
where: inArray("id", ids),
167163
});
168164

169-
pusherServer
170-
?.trigger(
171-
`resource.${root.resource_type}.${root.resource_id}`,
172-
"comment.deleted",
173-
{ scope: "comments" },
174-
)
175-
.catch(() => {});
165+
void publishMessage(
166+
`resource.${root.resource_type}.${root.resource_id}`,
167+
"comment.deleted",
168+
{ scope: "comments" },
169+
);
176170

177171
return { success: true as const, data: { id: input.id } };
178172
} catch (error) {

src/components/comment-section.tsx

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import { Button } from "./ui/button";
3838
import { Skeleton } from "./ui/skeleton";
3939
import { Textarea } from "./ui/textarea";
4040
import getFileUrl from "@/utils/getFileUrl";
41-
import { getPusherClient } from "@/lib/pusher.client";
41+
import { listenChannel } from "@/lib/pusher/pusher.client";
4242

4343
const Context = React.createContext<
4444
{ mutatingId?: string; setMutatingId: (id?: string) => void } | undefined
@@ -159,23 +159,17 @@ export const CommentSection = (props: {
159159
// Phase 2: subscribe to the resource's public Pusher channel and
160160
// invalidate the comments query when any mutation arrives from another client.
161161
useEffect(() => {
162-
const pusher = getPusherClient();
163-
if (!pusher) return;
164-
165162
const channelName = `resource.${props.resource_type}.${props.resource_id}`;
166-
const channel = pusher.subscribe(channelName);
167163
const invalidate = () => {
168164
queryClient.invalidateQueries({
169165
queryKey: ["comments", props.resource_id, props.resource_type],
170166
});
171167
};
172-
const events = ["comment.created", "comment.updated", "comment.deleted"];
173-
events.forEach((event) => channel.bind(event, invalidate));
174-
175-
return () => {
176-
events.forEach((event) => channel.unbind(event, invalidate));
177-
pusher.unsubscribe(channelName);
178-
};
168+
return listenChannel(channelName, {
169+
"comment.created": invalidate,
170+
"comment.updated": invalidate,
171+
"comment.deleted": invalidate,
172+
});
179173
}, [props.resource_id, props.resource_type, queryClient]);
180174

181175
const generated_comment_id = () => crypto.randomUUID();

src/components/providers/RealtimeProvider.tsx

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"use client";
22

3-
import { getPusherClient } from "@/lib/pusher.client";
3+
import { listenChannel } from "@/lib/pusher/pusher.client";
44
import { useSession } from "@/store/session.atom";
55
import { useQueryClient } from "@tanstack/react-query";
66
import React, { PropsWithChildren, useEffect } from "react";
@@ -27,24 +27,13 @@ export function RealtimeProvider({ children }: PropsWithChildren) {
2727
useEffect(() => {
2828
if (!userId) return;
2929

30-
const pusher = getPusherClient();
31-
console.log("pusher", pusher);
32-
if (!pusher) return;
33-
3430
const channelName = `private-user.${userId}`;
35-
const channel = pusher.subscribe(channelName);
36-
37-
channel.bind("notification.new", () => {
31+
return listenChannel(channelName, "notification.new", () => {
3832
queryClient.invalidateQueries({ queryKey: ["my-notifications"] });
3933
queryClient.invalidateQueries({
4034
queryKey: ["unread-notification-count"],
4135
});
4236
});
43-
44-
return () => {
45-
channel.unbind_all();
46-
pusher.unsubscribe(channelName);
47-
};
4837
}, [userId, queryClient]);
4938

5039
return <>{children}</>;

src/env.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,18 @@ export const env = createEnv({
2525
INNGEST_SIGNING_KEY: z.string().optional(),
2626

2727
// Pusher / Soketi (server-side)
28-
PUSHER_APP_ID: z.string().optional(),
29-
PUSHER_APP_KEY: z.string().optional(),
30-
PUSHER_APP_SECRET: z.string().optional(),
31-
PUSHER_HOST: z.string().optional(),
32-
PUSHER_PORT: z.string().optional(),
33-
PUSHER_USE_TLS: z.string().optional(),
34-
PUSHER_CLUSTER: z.string().optional(),
28+
PUSHER_WS_HOST: z.string().min(1),
29+
PUSHER_APP_ID: z.string().min(1),
30+
PUSHER_APP_KEY: z.string().min(1),
31+
PUSHER_APP_SECRET: z.string().min(1),
3532
},
3633
client: {
3734
NEXT_PUBLIC_MEILISEARCH_API_HOST: z.url(),
3835
NEXT_PUBLIC_MEILISEARCH_SEARCH_API_KEY: z.string(),
3936

4037
// Pusher / Soketi (client-side)
41-
NEXT_PUBLIC_PUSHER_APP_KEY: z.string().optional(),
42-
NEXT_PUBLIC_PUSHER_HOST: z.string().optional(),
43-
NEXT_PUBLIC_PUSHER_PORT: z.string().optional(),
44-
NEXT_PUBLIC_PUSHER_FORCE_TLS: z.string().optional(),
45-
NEXT_PUBLIC_PUSHER_CLUSTER: z.string().optional(),
38+
NEXT_PUBLIC_PUSHER_APP_KEY: z.string().min(1),
39+
NEXT_PUBLIC_PUSHER_WS_HOST: z.string().min(1),
4640
},
4741
runtimeEnv: {
4842
NODE_ENV: process.env.NODE_ENV,
@@ -70,16 +64,10 @@ export const env = createEnv({
7064
PUSHER_APP_ID: process.env.PUSHER_APP_ID,
7165
PUSHER_APP_KEY: process.env.PUSHER_APP_KEY,
7266
PUSHER_APP_SECRET: process.env.PUSHER_APP_SECRET,
73-
PUSHER_HOST: process.env.PUSHER_HOST,
74-
PUSHER_PORT: process.env.PUSHER_PORT,
75-
PUSHER_USE_TLS: process.env.PUSHER_USE_TLS,
76-
PUSHER_CLUSTER: process.env.PUSHER_CLUSTER,
67+
PUSHER_WS_HOST: process.env.PUSHER_WS_HOST,
7768

7869
NEXT_PUBLIC_PUSHER_APP_KEY: process.env.NEXT_PUBLIC_PUSHER_APP_KEY,
79-
NEXT_PUBLIC_PUSHER_HOST: process.env.NEXT_PUBLIC_PUSHER_HOST,
80-
NEXT_PUBLIC_PUSHER_PORT: process.env.NEXT_PUBLIC_PUSHER_PORT,
81-
NEXT_PUBLIC_PUSHER_FORCE_TLS: process.env.NEXT_PUBLIC_PUSHER_FORCE_TLS,
82-
NEXT_PUBLIC_PUSHER_CLUSTER: process.env.NEXT_PUBLIC_PUSHER_CLUSTER,
70+
NEXT_PUBLIC_PUSHER_WS_HOST: process.env.NEXT_PUBLIC_PUSHER_WS_HOST,
8371
},
8472
onValidationError(issues: readonly StandardSchemaV1.Issue[]) {
8573
console.error("❌ Invalid environment variables:", issues);

src/lib/inngest.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
import { persistenceRepository } from "@/backend/persistence/persistence-repositories";
88
import { ActionException } from "@/backend/services/RepositoryException";
99
import { buildPersistableNotification } from "@/backend/services/notifications.payload";
10-
import { pusherServer } from "@/lib/pusher.server";
10+
import { publishMessage } from "@/lib/pusher/pusher.server";
1111
import { deleteExpiredArticles } from "@/backend/services/article-cleanup-service";
1212

1313
const notificationPayloadSchema = z.object({
@@ -191,15 +191,9 @@ export const persistNotificationFn = inngest.createFunction(
191191

192192
// Broadcast a lightweight signal so the recipient's browser can invalidate
193193
// its TanStack Query caches without polling.
194-
if (pusherServer) {
195-
await pusherServer
196-
.trigger(`private-user.${data.recipient_id}`, "notification.new", {
197-
scope: "notifications",
198-
})
199-
.catch(() => {
200-
// Publishing is best-effort; never let a Pusher failure break notification delivery.
201-
});
202-
}
194+
await publishMessage(`private-user.${data.recipient_id}`, "notification.new", {
195+
scope: "notifications",
196+
});
203197

204198
return { success: true };
205199
},

src/lib/pusher.client.ts

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/lib/pusher.server.ts

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/lib/pusher/pusher.client.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import Pusher from "pusher-js";
2+
import { env } from "@/env";
3+
4+
let _pusherClient: Pusher | null = null;
5+
6+
function getPusherClient(): Pusher | null {
7+
if (typeof window === "undefined") return null;
8+
9+
if (_pusherClient) return _pusherClient;
10+
11+
const key = env.NEXT_PUBLIC_PUSHER_APP_KEY;
12+
if (!key) return null;
13+
14+
// pusher-js always requires cluster; use empty string as placeholder when
15+
// connecting to a self-hosted Soketi/compatible broker via wsHost.
16+
_pusherClient = new Pusher(key, {
17+
cluster: "mt1",
18+
authEndpoint: "/api/socket/auth",
19+
wsHost: env.NEXT_PUBLIC_PUSHER_WS_HOST,
20+
enabledTransports: ["ws", "wss"],
21+
});
22+
23+
return _pusherClient;
24+
}
25+
26+
type EventHandlers = Record<string, () => void>;
27+
28+
export function listenChannel(
29+
channel: string,
30+
handlers: EventHandlers,
31+
): () => void;
32+
export function listenChannel(
33+
channel: string,
34+
event: string,
35+
handler: () => void,
36+
): () => void;
37+
export function listenChannel(
38+
channel: string,
39+
eventOrHandlers: string | EventHandlers,
40+
handler?: () => void,
41+
): () => void {
42+
const handlers: EventHandlers =
43+
typeof eventOrHandlers === "string" && handler !== undefined
44+
? { [eventOrHandlers]: handler }
45+
: (eventOrHandlers as EventHandlers);
46+
47+
const pusher = getPusherClient();
48+
if (!pusher) {
49+
return () => {};
50+
}
51+
52+
const ch = pusher.subscribe(channel);
53+
for (const [event, fn] of Object.entries(handlers)) {
54+
ch.bind(event, fn);
55+
}
56+
57+
return () => {
58+
for (const [event, fn] of Object.entries(handlers)) {
59+
ch.unbind(event, fn);
60+
}
61+
pusher.unsubscribe(channel);
62+
};
63+
}
64+
65+
/** Server-side publish: `publishMessage` in `./pusher.server` (cannot run in the browser). */

src/lib/pusher/pusher.server.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import Pusher from "pusher";
2+
import { env } from "@/env";
3+
4+
/**
5+
* Lazy singleton for the server-side Pusher/Soketi client.
6+
* Returns null when Pusher is not configured (no PUSHER_APP_ID etc.),
7+
* so callers can gracefully skip publishing in non-realtime environments.
8+
*/
9+
function createPusherServer(): Pusher | null {
10+
const { PUSHER_APP_ID, PUSHER_APP_KEY, PUSHER_APP_SECRET } = env;
11+
if (!PUSHER_APP_ID || !PUSHER_APP_KEY || !PUSHER_APP_SECRET) {
12+
return null;
13+
}
14+
15+
return new Pusher({
16+
host: env.PUSHER_WS_HOST,
17+
appId: PUSHER_APP_ID,
18+
key: PUSHER_APP_KEY,
19+
secret: PUSHER_APP_SECRET,
20+
});
21+
}
22+
23+
export const pusherServer = createPusherServer();
24+
25+
/**
26+
* Best-effort publish. No-ops when Pusher is not configured; swallows errors
27+
* so callers (e.g. Inngest) are not broken by broker failures.
28+
*/
29+
export async function publishMessage(
30+
channel: string,
31+
event: "comment.created" | "comment.updated" | "comment.deleted",
32+
data: Record<string, unknown> = {},
33+
): Promise<void> {
34+
console.log(`
35+
[pusher] Publishing message to channel ${channel} with event ${event} and data ${JSON.stringify(data)}
36+
`);
37+
38+
pusherServer
39+
?.trigger(channel, event, data)
40+
.then((data) => {
41+
console.log("[pusher] Published message successfully");
42+
})
43+
.catch((err) => {
44+
console.error("[pusher] Failed to publish message:", JSON.stringify(err));
45+
});
46+
}

0 commit comments

Comments
 (0)