From 1b5e838361bf221156d84b1dd20695e34016fb88 Mon Sep 17 00:00:00 2001 From: aXenDeveloper Date: Mon, 1 Jun 2026 22:43:36 +0200 Subject: [PATCH 1/2] feat: Add websockets --- apps/api/package.json | 3 + apps/api/src/index.ts | 11 +- apps/api/src/vitnode.api.config.ts | 3 +- apps/docs/content/docs/dev/meta.json | 1 + apps/docs/content/docs/dev/redis.mdx | 15 + apps/docs/content/docs/dev/websocket.mdx | 353 ++++++++++++++++++ packages/vitnode/src/api/lib/module.ts | 6 +- packages/vitnode/src/api/lib/plugin.ts | 8 + packages/vitnode/src/api/lib/websocket.ts | 66 ++++ .../src/api/middlewares/global.middleware.ts | 14 + .../src/api/modules/admin/admin.module.ts | 3 +- .../admin/routes/notifications.route.ts | 59 +++ .../core/dashboard/dashboard-admin-view.tsx | 14 +- .../send-notification/mutation-api.ts | 25 ++ .../send-notification/send-notification.tsx | 60 +++ .../password-reset/password-reset-view.tsx | 2 +- .../src/views/auth/sign-in/sign-in-view.tsx | 2 +- .../sign-up/components/password-input.tsx | 11 +- .../src/views/auth/sign-up/sign-up-view.tsx | 2 +- .../vitnode/src/views/layouts/root-layout.tsx | 5 +- .../src/views/layouts/theme/layout.tsx | 10 +- .../layouts/theme/notification-listener.tsx | 38 ++ .../layouts/theme/web-socket-auth-sync.tsx | 28 ++ packages/vitnode/src/ws/handle.ts | 107 ++++++ packages/vitnode/src/ws/manager.ts | 237 ++++++++++++ packages/vitnode/src/ws/notifications.ts | 28 ++ packages/vitnode/src/ws/provider.tsx | 122 ++++++ packages/vitnode/src/ws/registry.ts | 75 ++++ packages/vitnode/src/ws/types.ts | 73 ++++ packages/vitnode/src/ws/use-websocket.ts | 63 ++++ pnpm-lock.yaml | 33 +- 31 files changed, 1458 insertions(+), 19 deletions(-) create mode 100644 apps/docs/content/docs/dev/redis.mdx create mode 100644 apps/docs/content/docs/dev/websocket.mdx create mode 100644 packages/vitnode/src/api/lib/websocket.ts create mode 100644 packages/vitnode/src/api/modules/admin/routes/notifications.route.ts create mode 100644 packages/vitnode/src/views/admin/views/core/dashboard/send-notification/mutation-api.ts create mode 100644 packages/vitnode/src/views/admin/views/core/dashboard/send-notification/send-notification.tsx create mode 100644 packages/vitnode/src/views/layouts/theme/notification-listener.tsx create mode 100644 packages/vitnode/src/views/layouts/theme/web-socket-auth-sync.tsx create mode 100644 packages/vitnode/src/ws/handle.ts create mode 100644 packages/vitnode/src/ws/manager.ts create mode 100644 packages/vitnode/src/ws/notifications.ts create mode 100644 packages/vitnode/src/ws/provider.tsx create mode 100644 packages/vitnode/src/ws/registry.ts create mode 100644 packages/vitnode/src/ws/types.ts create mode 100644 packages/vitnode/src/ws/use-websocket.ts diff --git a/apps/api/package.json b/apps/api/package.json index 560cc3e7b..205f99d21 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -24,6 +24,7 @@ "react": "^19.2.5", "react-dom": "^19.2.5", "use-intl": "^4.11.0", + "ws": "^8.21.0", "zod": "^4.4.1" }, "devDependencies": { @@ -32,8 +33,10 @@ "@types/node": "^25.6.0", "@types/react": "^19.2.14", "@types/react-dom": "^19.2.3", + "@types/ws": "^8.18.1", "@vitnode/config": "workspace:*", "@vitnode/nodemailer": "workspace:*", + "@vitnode/blog": "workspace:*", "dotenv": "^17.4.2", "eslint": "^10.2.1", "react-email": "^6.1.5", diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 62084e46c..363476bdc 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -1,6 +1,8 @@ -import { serve } from "@hono/node-server"; +import { serve, upgradeWebSocket } from "@hono/node-server"; import { OpenAPIHono } from "@hono/zod-openapi"; import { VitNodeAPI } from "@vitnode/core/api/config"; +import { handleVitNodeWebSocket } from "@vitnode/core/ws/handle"; +import { WebSocketServer } from "ws"; import { vitNodeApiConfig } from "./vitnode.api.config.js"; @@ -11,10 +13,17 @@ VitNodeAPI({ vitNodeApiConfig, }); +const wss = new WebSocketServer({ noServer: true }); + +app.get("/ws", upgradeWebSocket(handleVitNodeWebSocket())); + serve( { fetch: app.fetch, port: 8080, + websocket: { + server: wss, + }, }, info => { const initMessage = "\x1b[34m[VitNode]\x1b[0m"; diff --git a/apps/api/src/vitnode.api.config.ts b/apps/api/src/vitnode.api.config.ts index 29274f43b..eb918358b 100644 --- a/apps/api/src/vitnode.api.config.ts +++ b/apps/api/src/vitnode.api.config.ts @@ -1,3 +1,4 @@ +import { blogApiPlugin } from "@vitnode/blog/config.api"; import { buildApiConfig } from "@vitnode/core/vitnode.config"; import { NodemailerEmailAdapter } from "@vitnode/nodemailer"; import { config } from "dotenv"; @@ -11,7 +12,7 @@ export const POSTGRES_URL = process.env.POSTGRES_URL ?? "postgresql://root:root@localhost:5432/vitnode"; export const vitNodeApiConfig = buildApiConfig({ - plugins: [], + plugins: [blogApiPlugin()], pathToMessages: async path => await import(`./locales/${path}`), dbProvider: drizzle({ connection: POSTGRES_URL, diff --git a/apps/docs/content/docs/dev/meta.json b/apps/docs/content/docs/dev/meta.json index 86c671ceb..69c79a5ff 100644 --- a/apps/docs/content/docs/dev/meta.json +++ b/apps/docs/content/docs/dev/meta.json @@ -19,6 +19,7 @@ "email", "sso", "cron", + "websocket", "---Frontend---", "layouts-and-pages", "admin-page", diff --git a/apps/docs/content/docs/dev/redis.mdx b/apps/docs/content/docs/dev/redis.mdx new file mode 100644 index 000000000..ef08ba8cb --- /dev/null +++ b/apps/docs/content/docs/dev/redis.mdx @@ -0,0 +1,15 @@ +--- +title: Redis +description: Enhance your application's performance and scalability. +--- + + + We're working hard to bring you the best documentation experience. + + +## Going to production - websockets + +The connection list lives in memory on a single server. If you run **several +server instances** behind a load balancer, a `broadcast` or `sendToUser` only +reaches the clients on the same instance. To fix that, share the messages +between instances with a pub/sub like **Redis**. diff --git a/apps/docs/content/docs/dev/websocket.mdx b/apps/docs/content/docs/dev/websocket.mdx new file mode 100644 index 000000000..d2146f5ab --- /dev/null +++ b/apps/docs/content/docs/dev/websocket.mdx @@ -0,0 +1,353 @@ +--- +title: WebSocket +description: Add real-time features — live updates and notifications — with one shared connection. +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +| Cloud | Self-Hosted | Links | +| ---------------- | ------------ | ---------------------------------------------------------------- | +| ❌ Not Supported | ✅ Supported | [Hono WebSocket Helper](https://hono.dev/docs/helpers/websocket) | + +A normal request works one way: the browser asks, the server answers, done. A +**WebSocket** keeps the line open both ways, so the server can push data to the +browser the moment something happens — no refreshing, no polling. + +Use it for things that should feel instant: live lists, presence, chat, and +notifications. + +## How it works + +VitNode takes care of the tricky parts for you: + +- **One connection.** The browser opens a single socket at `/ws`, and every + feature shares it. You never manage sockets by hand. +- **Channels.** Each message is tagged with a channel id so it reaches the + right place. An id looks like `{pluginId}_{module}_{name}`. +- **Shared across tabs.** Open your app in five tabs and there is still just + **one** connection. One tab becomes the "leader" and the others ride along, + so every tab sees the same messages with no duplicates. +- **Knows who is signed in.** The connection is tied to the logged-in user. It + automatically re-connects when they sign in or out, dropping to a "guest" + connection on sign-out. + + + The client provider is already mounted for you in the root layout, so the + connection is live on every page — there is nothing to wire up on the + frontend. + + +## Setup + +You only do this once, in your API entrypoint. + + + +### Add the WebSocket server + +The socket is served by your runtime, so install it there. + + + + + +Install the `ws` package: + +```bash title="Terminal" +pnpm add ws && pnpm add -D @types/ws +``` + +```ts title="src/index.ts" +import { serve } from "@hono/node-server"; +import { WebSocketServer } from "ws"; + +const wss = new WebSocketServer({ noServer: true }); + +serve({ + fetch: app.fetch, + port: 8080, + websocket: { server: wss }, // [!code ++] +}); +``` + + + + + +Bun has WebSockets built in: + +```ts title="src/index.ts" +import { websocket } from "hono/bun"; + +export default { + fetch: app.fetch, + websocket, // [!code ++] +}; +``` + + + + + + + +### Add the handler + +`handleVitNodeWebSocket` wires the connection into VitNode. It is the same on +every runtime — only the `upgradeWebSocket` import differs. + + + +```ts title="src/index.ts" tab="Node.js" +import { serve, upgradeWebSocket } from "@hono/node-server"; // [!code ++] +import { handleVitNodeWebSocket } from "@vitnode/core/ws/handle"; // [!code ++] + +app.get("/ws", upgradeWebSocket(handleVitNodeWebSocket())); // [!code ++] +``` + +```ts title="src/index.ts" tab="Bun" +import { upgradeWebSocket, websocket } from "hono/bun"; // [!code ++] +import { handleVitNodeWebSocket } from "@vitnode/core/ws/handle"; // [!code ++] + +app.get("/ws", upgradeWebSocket(handleVitNodeWebSocket())); // [!code ++] +``` + + + +That's it — your app now has real-time. Next, let's send a message. + + + + +## Send and receive + +Let's build a tiny **echo**: the client sends a message, the server sends it +right back. It shows the full round-trip in four small steps. + + + +### Describe the messages (the channel) + +A **channel** is a shared contract: it holds the id and the two message +types — what the client **sends** and what it **receives**. Put it in its own +file so both the server and the client can import it. + +```ts title="realtime/echo.channel.ts" +import { createWebSocketChannel } from "@vitnode/core/ws/types"; + +import { CONFIG_PLUGIN } from "@/const"; + +export interface EchoClientMessage { + message: string; +} + +export interface EchoServerMessage { + reply: string; +} + +// Full id: `@vitnode/blog_chat_echo` +export const echoChannel = createWebSocketChannel< + EchoClientMessage, // what the client sends + EchoServerMessage // what the client gets back +>({ + pluginId: CONFIG_PLUGIN.pluginId, + module: "chat", + id: "echo", +}); +``` + + + +### Handle it on the server + +`buildWebSocket` runs your `onMessage` whenever a client sends to this channel. +You get `data` (already parsed and typed) and `send` to reply on the same +channel. You also get `c` — the same context as a route, so you can reach the +database, the user, the logger, and more. + +```ts title="api/modules/chat/echo.ws.ts" +import { buildWebSocket } from "@vitnode/core/api/lib/websocket"; + +import type { + EchoClientMessage, + EchoServerMessage, +} from "@/realtime/echo.channel"; + +export const echoWebSocket = buildWebSocket< + EchoClientMessage, + EchoServerMessage +>({ + id: "echo", + description: "Replies with whatever it receives.", + onMessage: ({ data, send }) => { + send({ reply: `You said: ${data.message}` }); + }, +}); +``` + + + +### Register it in a module + +Add it to the module's `webSockets`, the same way you add routes. + +```ts title="api/modules/chat/chat.module.ts" +import { buildModule } from "@vitnode/core/api/lib/module"; + +import { CONFIG_PLUGIN } from "@/const"; + +import { echoWebSocket } from "./echo.ws"; // [!code ++] + +export const chatModule = buildModule({ + pluginId: CONFIG_PLUGIN.pluginId, + name: "chat", + routes: [], + webSockets: [echoWebSocket], // [!code ++] +}); +``` + + + +### Use it in a component + +`useVitNodeWebSocket` connects the component to the channel. Because it knows +the channel, `send` and the `onMessage` `data` are fully typed — no casting. + +```tsx title="views/echo.tsx" +"use client"; + +import { useVitNodeWebSocket } from "@vitnode/core/ws/use-websocket"; +import React from "react"; + +import { echoChannel } from "@/realtime/echo.channel"; + +export const Echo = () => { + const [reply, setReply] = React.useState(""); + + const { send, readyState } = useVitNodeWebSocket(echoChannel, { + onMessage: data => setReply(data.reply), // data is EchoServerMessage + }); + + return ( + + ); +}; +``` + + + + + + The subscription lives with the component: it starts on mount and stops on + unmount. So a view only receives messages while it is on screen — leave the + page and it goes quiet on its own. + + +## Broadcast to everyone + +Want to push the same update to **every** open browser? Use `broadcast` from any +route or handler. A great use is keeping a list fresh: broadcast whenever the +data changes, and every client refreshes. + +First, a "broadcast-only" channel. Since clients never send to it, its send type +is `never`: + +```ts title="realtime/posts.channel.ts" +import { createWebSocketChannel } from "@vitnode/core/ws/types"; + +import { CONFIG_PLUGIN } from "@/const"; + +export interface PostsChange { + action: "created" | "deleted" | "updated"; +} + +export const postsChannel = createWebSocketChannel({ + pluginId: CONFIG_PLUGIN.pluginId, + module: "posts", + id: "changes", +}); +``` + +Then broadcast after you change something. The payload is type-checked against +the channel: + +```ts title="api/modules/posts/routes/create.route.ts" +import { postsChannel } from "@/realtime/posts.channel"; + +// ...inside the handler, after creating the post: +c.get("realtime").broadcast(postsChannel, { action: "created" }); +``` + +On the client, listen and refresh the page when it fires: + +```tsx title="views/live-posts.tsx" +"use client"; + +import { useRouter } from "@vitnode/core/lib/navigation"; +import { useVitNodeWebSocket } from "@vitnode/core/ws/use-websocket"; + +import { postsChannel } from "@/realtime/posts.channel"; + +export const LivePosts = () => { + const router = useRouter(); + + useVitNodeWebSocket(postsChannel, { + onMessage: () => router.refresh(), + }); + + return null; +}; +``` + + + `broadcast` reaches everyone, so use it only for non-sensitive signals like + "the list changed". For private data, send to one user instead (below). + + +## Send to one user + +To reach **one person** — on every browser where they are signed in — use +`sendToUser`. VitNode knows which user owns each connection (from their sign-in +cookie), so the message never leaks to anyone else. + +This powers the built-in notification system. Send one from any handler: + +```ts title="server: notify a user" +import { notificationsChannel } from "@vitnode/core/ws/notifications"; + +c.get("realtime").sendToUser(userId, notificationsChannel, { + title: "Welcome back 👋", + description: "You have 3 new messages.", + type: "info", +}); +``` + +Show it however you like — here as a toast with `sonner`: + +```tsx title="notification-listener.tsx" +"use client"; + +import { notificationsChannel } from "@vitnode/core/ws/notifications"; +import { useVitNodeWebSocket } from "@vitnode/core/ws/use-websocket"; +import { toast } from "sonner"; + +export const NotificationListener = () => { + useVitNodeWebSocket(notificationsChannel, { + onMessage: n => toast(n.title, { description: n.description }), + }); + + return null; +}; +``` + + + The notification channel, this listener, and an admin dashboard button to send + one are all built in. A user only receives notifications while signed in — a + guest connection has no one to deliver to. + diff --git a/packages/vitnode/src/api/lib/module.ts b/packages/vitnode/src/api/lib/module.ts index 79c239f63..30af2f7af 100644 --- a/packages/vitnode/src/api/lib/module.ts +++ b/packages/vitnode/src/api/lib/module.ts @@ -2,6 +2,7 @@ import { OpenAPIHono } from "@hono/zod-openapi"; import type { BuildCronReturn } from "./cron"; import type { Route } from "./route"; +import type { BuildWebSocketReturn } from "./websocket"; export interface BuildModuleType { plugin: Plugin; @@ -19,6 +20,7 @@ export interface BaseBuildModuleReturn< name: M; pluginId: P; routes: Routes; + webSockets: BuildWebSocketReturn[]; } export interface BuildModuleReturn< @@ -41,12 +43,14 @@ export function buildModule< name, modules, cronJobs = [], + webSockets = [], }: { cronJobs?: BuildCronReturn[]; modules?: Modules; name: M; pluginId: P; routes: Routes; + webSockets?: BuildWebSocketReturn[]; }): BuildModuleReturn { const hono = new OpenAPIHono(); @@ -62,5 +66,5 @@ export function buildModule< }); } - return { routes, pluginId, hono, name, modules, cronJobs }; + return { routes, pluginId, hono, name, modules, cronJobs, webSockets }; } diff --git a/packages/vitnode/src/api/lib/plugin.ts b/packages/vitnode/src/api/lib/plugin.ts index 1e4c5b2e2..8c27aaf76 100644 --- a/packages/vitnode/src/api/lib/plugin.ts +++ b/packages/vitnode/src/api/lib/plugin.ts @@ -2,6 +2,7 @@ import { OpenAPIHono } from "@hono/zod-openapi"; import type { CronJobConfig } from "./cron"; import type { BuildModuleReturn } from "./module"; +import type { WebSocketConfig } from "./websocket"; import { checkPluginId } from "./check-plugin-id"; @@ -9,6 +10,7 @@ export interface BuildPluginApiReturn { cronJobs: Omit[]; hono: OpenAPIHono; pluginId: string; + webSockets: Omit[]; } export function buildApiPlugin

({ @@ -23,17 +25,23 @@ export function buildApiPlugin

({ const hono = new OpenAPIHono(); const cronJobs: BuildPluginApiReturn["cronJobs"] = []; + const webSockets: BuildPluginApiReturn["webSockets"] = []; modules.forEach(handler => { hono.route(`/${handler.name}`, handler.hono); handler.cronJobs?.forEach(cron => { cronJobs.push({ ...cron, module: handler.name }); }); + + handler.webSockets?.forEach(webSocket => { + webSockets.push({ ...webSocket, module: handler.name }); + }); }); return { pluginId, hono, cronJobs, + webSockets, }; } diff --git a/packages/vitnode/src/api/lib/websocket.ts b/packages/vitnode/src/api/lib/websocket.ts new file mode 100644 index 000000000..e60ed6b27 --- /dev/null +++ b/packages/vitnode/src/api/lib/websocket.ts @@ -0,0 +1,66 @@ +import type { Context } from "hono"; +import type { WSContext } from "hono/ws"; + +import type { EnvVitNode } from "../middlewares/global.middleware"; + +export { getWebSocketId } from "@/ws/types"; + +/** + * WebSocket context for controlling the connection. Use it to `send`, `close` + * or read the `readyState`. The underlying runtime socket is exposed on + * `ws.raw` and is left generic so the same handler works across runtimes + * (node.js `ws`, Bun, ...). + */ +export type VitNodeWSContext = WSContext; + +/** + * Arguments passed to a registered WebSocket's `onMessage` handler. + * + * - `TReceive` - the payload the client sends for this socket. + * - `TSend` - the payload the server sends back to the client. + */ +export interface VitNodeWSMessageParams { + /** Hono request context - reach the db, the user, the logger, etc. */ + c: Context; + /** The payload sent by the client for this socket. */ + data: TReceive; + /** Send a payload back to this client on the same socket id. */ + send: (data: TSend) => void; + /** Low-level WebSocket context (`send`, `close`, `readyState`, ...). */ + ws: VitNodeWSContext; +} + +/** + * A registered WebSocket. The generic argument types are only used at the + * definition site (via {@link buildWebSocket}); once stored they are erased so + * sockets with different message shapes can live in the same array. + */ +export interface BuildWebSocketReturn { + description?: string; + /** + * The part of the id chosen by the developer. The full id a client targets + * is `{pluginId}_{module}_{id}`. + */ + id: string; + onMessage: (params: VitNodeWSMessageParams) => Promise | void; +} + +/** + * A registered WebSocket together with the plugin and module it belongs to. + * The `module` is filled in when the module is built and the `pluginId` when + * the plugin is built, mirroring how cron jobs are wired up. + */ +export interface WebSocketConfig extends BuildWebSocketReturn { + module: string; + pluginId: string; +} + +export function buildWebSocket(webSocket: { + description?: string; + id: string; + onMessage: ( + params: VitNodeWSMessageParams, + ) => Promise | void; +}): BuildWebSocketReturn { + return webSocket as BuildWebSocketReturn; +} diff --git a/packages/vitnode/src/api/middlewares/global.middleware.ts b/packages/vitnode/src/api/middlewares/global.middleware.ts index 981300229..9c7bc2b01 100644 --- a/packages/vitnode/src/api/middlewares/global.middleware.ts +++ b/packages/vitnode/src/api/middlewares/global.middleware.ts @@ -3,13 +3,16 @@ import type { Context, Env, Next } from "hono"; import { HTTPException } from "hono/http-exception"; import type { VitNodeApiConfig, VitNodeConfig } from "@/vitnode.config"; +import type { VitNodeRealtime } from "@/ws/registry"; import { EmailModel, type EmailModelSendArgs } from "@/api/models/email"; import { SessionModel } from "@/api/models/session"; import { SessionAdminModel } from "@/api/models/session-admin"; import { CONFIG } from "@/lib/config"; +import { realtime } from "@/ws/registry"; import type { BuildCronReturn } from "../lib/cron"; +import type { WebSocketConfig } from "../lib/websocket"; import type { SSOApiPlugin } from "../models/sso"; import { @@ -62,6 +65,7 @@ export interface EnvVariablesVitNode { }; pathToMessages: (path: string) => Promise<{ default: object }>; plugins: { id: string }[]; + webSockets: WebSocketConfig[]; }; db: Pick["dbProvider"]; email: { @@ -72,6 +76,7 @@ export interface EnvVariablesVitNode { plugin: { id: string; }; + realtime: VitNodeRealtime; user: null | { avatarColor: string; birthday: Date | null; @@ -119,6 +124,13 @@ export const globalMiddleware = ({ })), ); + const webSocketsMetadata: WebSocketConfig[] = plugins.flatMap(plugin => + plugin.webSockets.map(webSocket => ({ + ...webSocket, + pluginId: plugin.pluginId, + })), + ); + const ipHeaderKeys = [ "x-forwarded-for", "x-real-ip", @@ -154,6 +166,7 @@ export const globalMiddleware = ({ c.set("ipAddress", ipAddress ?? "127.0.0.1"); c.set("db", dbProvider); c.set("email", new EmailModel(c)); + c.set("realtime", realtime); c.set("core", { pathToMessages, @@ -176,6 +189,7 @@ export const globalMiddleware = ({ cronSecret: CONFIG.cronJobSecret, plugins: pluginsMetadata, cron: cronMetadata, + webSockets: webSocketsMetadata, }); const user = await new SessionModel(c).getUser(); diff --git a/packages/vitnode/src/api/modules/admin/admin.module.ts b/packages/vitnode/src/api/modules/admin/admin.module.ts index 2c9e61ea4..8f56c4612 100644 --- a/packages/vitnode/src/api/modules/admin/admin.module.ts +++ b/packages/vitnode/src/api/modules/admin/admin.module.ts @@ -3,13 +3,14 @@ import { CONFIG_PLUGIN } from "@/config"; import { advancedAdminModule } from "./advanced/advanced.admin.module"; import { debugAdminModule } from "./debug/debug.admin.module"; +import { sendNotificationRoute } from "./routes/notifications.route"; import { sessionAdminRoute } from "./routes/session.route"; import { usersAdminModule } from "./users/users.admin.module"; export const adminModule = buildModule({ pluginId: CONFIG_PLUGIN.pluginId, name: "admin", - routes: [sessionAdminRoute], + routes: [sessionAdminRoute, sendNotificationRoute], modules: [usersAdminModule, debugAdminModule, advancedAdminModule], cronJobs: [], }); diff --git a/packages/vitnode/src/api/modules/admin/routes/notifications.route.ts b/packages/vitnode/src/api/modules/admin/routes/notifications.route.ts new file mode 100644 index 000000000..f77856237 --- /dev/null +++ b/packages/vitnode/src/api/modules/admin/routes/notifications.route.ts @@ -0,0 +1,59 @@ +import { HTTPException } from "hono/http-exception"; +import { z } from "zod"; + +import { buildRoute } from "@/api/lib/route"; +import { CONFIG_PLUGIN } from "@/config"; +import { notificationsChannel } from "@/ws/notifications"; + +export const zodSendNotificationSchema = z.object({ + description: z.string().optional(), + title: z.string().min(1, "Title is required"), + type: z.enum(["error", "info", "success", "warning"]).optional(), + userId: z.number(), +}); + +export const sendNotificationRoute = buildRoute({ + pluginId: CONFIG_PLUGIN.pluginId, + route: { + method: "post", + description: "Send a notification to a user", + path: "/notifications/send", + request: { + body: { + content: { + "application/json": { + schema: zodSendNotificationSchema, + }, + }, + }, + }, + responses: { + 200: { + content: { + "application/json": { + schema: z.object({ success: z.boolean() }), + }, + }, + description: "Notification sent", + }, + 403: { + description: "Access Denied", + }, + }, + }, + handler: c => { + const admin = c.get("admin")?.user; + if (!admin) throw new HTTPException(403); + + const { userId, title, description, type } = c.req.valid("json"); + + // Delivered only to that user's connections, across all their browsers. + c.get("realtime").sendToUser(userId, notificationsChannel, { + description, + title, + type, + }); + + return c.json({ success: true }); + }, +}); diff --git a/packages/vitnode/src/views/admin/views/core/dashboard/dashboard-admin-view.tsx b/packages/vitnode/src/views/admin/views/core/dashboard/dashboard-admin-view.tsx index 067fb3038..d9a296c2b 100644 --- a/packages/vitnode/src/views/admin/views/core/dashboard/dashboard-admin-view.tsx +++ b/packages/vitnode/src/views/admin/views/core/dashboard/dashboard-admin-view.tsx @@ -6,11 +6,13 @@ import { HeaderContent } from "@/components/ui/header-content"; import { getSessionAdminApi } from "@/lib/api/get-session-admin-api"; import { CONFIG } from "@/lib/config"; +import { SendNotificationAction } from "./send-notification/send-notification"; + export const DashboardAdminView = async () => { const session = await getSessionAdminApi(); const t = await getTranslations("admin.dashboard"); if (!session) return null; - const { vitnode_version } = session; + const { user, vitnode_version } = session; return (

@@ -30,7 +32,15 @@ export const DashboardAdminView = async () => { } /> -
Dashboard Admin
+ +
+

Send a notification

+

+ Pushes a toast to the user in real time, on every browser where they + are signed in. Defaults to your own user id. +

+ +
); }; diff --git a/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/mutation-api.ts b/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/mutation-api.ts new file mode 100644 index 000000000..cee5258e5 --- /dev/null +++ b/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/mutation-api.ts @@ -0,0 +1,25 @@ +"use server"; + +import type { z } from "zod"; + +import type { zodSendNotificationSchema } from "@/api/modules/admin/routes/notifications.route"; + +import { adminModule } from "@/api/modules/admin/admin.module"; +import { fetcher } from "@/lib/fetcher"; + +export const sendNotificationMutation = async ( + body: z.infer, +) => { + const res = await fetcher(adminModule, { + path: "/notifications/send", + method: "post", + module: "admin", + args: { + body, + }, + }); + + if (!res.ok) { + return { error: await res.text() }; + } +}; diff --git a/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/send-notification.tsx b/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/send-notification.tsx new file mode 100644 index 000000000..c76bedc63 --- /dev/null +++ b/packages/vitnode/src/views/admin/views/core/dashboard/send-notification/send-notification.tsx @@ -0,0 +1,60 @@ +"use client"; + +import { SendIcon } from "lucide-react"; +import React from "react"; +import { toast } from "sonner"; + +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; + +import { sendNotificationMutation } from "./mutation-api"; + +export const SendNotificationAction = ({ + defaultUserId, +}: { + defaultUserId: number; +}) => { + const [userId, setUserId] = React.useState(String(defaultUserId)); + const [title, setTitle] = React.useState("Hello from the admin 👋"); + const [isPending, startTransition] = React.useTransition(); + + const onSend = () => { + startTransition(async () => { + const res = await sendNotificationMutation({ + title, + type: "info", + userId: Number(userId), + }); + + if (res?.error) { + toast.error("Failed to send notification", { description: res.error }); + + return; + } + + toast.success("Notification sent"); + }); + }; + + return ( +
+ + + +
+ ); +}; diff --git a/packages/vitnode/src/views/auth/password-reset/password-reset-view.tsx b/packages/vitnode/src/views/auth/password-reset/password-reset-view.tsx index 421e8fc77..bfb08cd52 100644 --- a/packages/vitnode/src/views/auth/password-reset/password-reset-view.tsx +++ b/packages/vitnode/src/views/auth/password-reset/password-reset-view.tsx @@ -19,7 +19,7 @@ export const PasswordResetView = async ({ if (!isEmail) notFound(); return ( -
+
{token && userId ? ( { return ( -
+
diff --git a/packages/vitnode/src/views/auth/sign-up/components/password-input.tsx b/packages/vitnode/src/views/auth/sign-up/components/password-input.tsx index 15178ee5d..a1d37568b 100644 --- a/packages/vitnode/src/views/auth/sign-up/components/password-input.tsx +++ b/packages/vitnode/src/views/auth/sign-up/components/password-input.tsx @@ -16,10 +16,13 @@ import { export const PasswordInput = ({ label, + labelRight, // eslint-disable-next-line @typescript-eslint/no-unused-vars - description: _, + description: _description, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + itemParams: _itemParams, field, - otherProps: { maxLength, minLength, pattern }, + otherProps: { isOptional, maxLength, minLength, pattern }, ...props }: ItemAutoFormComponentProps & Omit, "type">) => { @@ -47,7 +50,9 @@ export const PasswordInput = ({ return ( <> - {label} + + {label} + diff --git a/packages/vitnode/src/views/auth/sign-up/sign-up-view.tsx b/packages/vitnode/src/views/auth/sign-up/sign-up-view.tsx index 4743820b6..d3e316527 100644 --- a/packages/vitnode/src/views/auth/sign-up/sign-up-view.tsx +++ b/packages/vitnode/src/views/auth/sign-up/sign-up-view.tsx @@ -19,7 +19,7 @@ export const SignUpView = async () => { return ( -
+
diff --git a/packages/vitnode/src/views/layouts/root-layout.tsx b/packages/vitnode/src/views/layouts/root-layout.tsx index 30b8453f8..cf7f46039 100644 --- a/packages/vitnode/src/views/layouts/root-layout.tsx +++ b/packages/vitnode/src/views/layouts/root-layout.tsx @@ -6,6 +6,7 @@ import { setRequestLocale } from "next-intl/server"; import type { VitNodeConfig } from "@/vitnode.config"; import { I18nProvider } from "@/components/i18n-provider"; +import { VitNodeWebSocketProvider } from "@/ws/provider"; import { RootProvider } from "./provider"; @@ -40,7 +41,9 @@ export const RootLayout = async ({ return ( - {children} + + {children} + ); }; diff --git a/packages/vitnode/src/views/layouts/theme/layout.tsx b/packages/vitnode/src/views/layouts/theme/layout.tsx index 3e167e9a3..266323233 100644 --- a/packages/vitnode/src/views/layouts/theme/layout.tsx +++ b/packages/vitnode/src/views/layouts/theme/layout.tsx @@ -1,8 +1,12 @@ +import { getSessionApi } from "@/lib/api/get-session-api"; + import type { VitNodeConfig } from "../../../vitnode.config"; import { HeaderLayout } from "./header/header"; +import { NotificationListener } from "./notification-listener"; +import { WebSocketAuthSync } from "./web-socket-auth-sync"; -export const ThemeLayout = ({ +export const ThemeLayout = async ({ children, logo, vitNodeConfig, @@ -10,8 +14,12 @@ export const ThemeLayout = ({ children: React.ReactNode; vitNodeConfig: VitNodeConfig; }) => { + const { user } = await getSessionApi(); + return ( <> + + {" "}
{children}
diff --git a/packages/vitnode/src/views/layouts/theme/notification-listener.tsx b/packages/vitnode/src/views/layouts/theme/notification-listener.tsx new file mode 100644 index 000000000..3fb4208f1 --- /dev/null +++ b/packages/vitnode/src/views/layouts/theme/notification-listener.tsx @@ -0,0 +1,38 @@ +"use client"; + +import { toast } from "sonner"; + +import { notificationsChannel } from "@/ws/notifications"; +import { useVitNodeWebSocket } from "@/ws/use-websocket"; + +/** + * Listens on the current user's notification channel and shows a sonner toast + * for each notification. Renders nothing. The connection is shared across the + * user's tabs/browsers, so notifications appear wherever they are signed in. + */ +export const NotificationListener = () => { + useVitNodeWebSocket(notificationsChannel, { + onMessage: notification => { + const options = { description: notification.description }; + + switch (notification.type) { + case "error": + toast.error(notification.title, options); + break; + case "info": + toast.info(notification.title, options); + break; + case "success": + toast.success(notification.title, options); + break; + case "warning": + toast.warning(notification.title, options); + break; + default: + toast(notification.title, options); + } + }, + }); + + return null; +}; diff --git a/packages/vitnode/src/views/layouts/theme/web-socket-auth-sync.tsx b/packages/vitnode/src/views/layouts/theme/web-socket-auth-sync.tsx new file mode 100644 index 000000000..54a1f8cb2 --- /dev/null +++ b/packages/vitnode/src/views/layouts/theme/web-socket-auth-sync.tsx @@ -0,0 +1,28 @@ +"use client"; + +// The user id comes from the server render, so the reconnect must be driven by +// the prop changing (sign-in/sign-out) rather than a client event handler. +/* eslint-disable react-you-might-not-need-an-effect/no-event-handler */ +import React from "react"; + +import { useVitNodeWebSocketContext } from "@/ws/provider"; + +/** + * Reconnects the shared WebSocket whenever the signed-in user changes, so the + * long-lived connection re-authenticates from the current session cookie - + * e.g. it drops to "guest" right after sign-out (and picks up the user again + * after sign-in). Renders nothing. + */ +export const WebSocketAuthSync = ({ userId }: { userId: null | number }) => { + const { reconnect } = useVitNodeWebSocketContext(); + const previousUserIdRef = React.useRef(userId); + + React.useEffect(() => { + if (previousUserIdRef.current === userId) return; + + previousUserIdRef.current = userId; + reconnect(); + }, [userId, reconnect]); + + return null; +}; diff --git a/packages/vitnode/src/ws/handle.ts b/packages/vitnode/src/ws/handle.ts new file mode 100644 index 000000000..99d11e581 --- /dev/null +++ b/packages/vitnode/src/ws/handle.ts @@ -0,0 +1,107 @@ +import type { Context } from "hono"; +import type { WSEvents } from "hono/ws"; + +import type { EnvVitNode } from "@/api/middlewares/global.middleware"; +import type { VitNodeWSMessage } from "@/ws/types"; + +import { wsRegistry } from "@/ws/registry"; +import { getWebSocketId } from "@/ws/types"; + +/** + * Parse an incoming raw WebSocket message into a {@link VitNodeWSMessage} + * envelope. Returns `undefined` for anything that is not a JSON object + * carrying a string `id` (e.g. plain text or binary frames). + */ +const parseMessage = (raw: unknown): undefined | VitNodeWSMessage => { + if (typeof raw !== "string") return undefined; + + try { + const parsed: unknown = JSON.parse(raw); + + if ( + typeof parsed === "object" && + parsed !== null && + "id" in parsed && + typeof (parsed as Record).id === "string" + ) { + return parsed as VitNodeWSMessage; + } + } catch { + /* ignore non-JSON messages */ + } + + return undefined; +}; + +/** + * Handle VitNode WebSockets for any Hono runtime that exposes + * `upgradeWebSocket` (node.js `@hono/node-server`, Bun, ...). + * + * A single `/ws` connection is multiplexed across every registered socket. + * Each message is an envelope `{ id, data }` where `id` is + * `{pluginId}_{module}_{id}`. On every message this handler looks up the + * registered WebSocket whose composed id matches and calls its `onMessage` + * handler with the parsed `data` (and a `send` helper that wraps the reply in + * the same envelope). The registered handlers receive the request + * `Context`, so they can reach the database, the authenticated + * user, the logger, etc. + * + * The optional `createEvents` callback lets you add connection-level behavior + * (`onOpen`, `onClose`, `onError`, or a fallback `onMessage` for messages that + * do not match any registered socket) with access to the typed context. + * + * @example + * ```ts + * app.get("/ws", upgradeWebSocket(handleVitNodeWebSocket())); + * ``` + */ +export function handleVitNodeWebSocket( + createEvents?: (c: Context) => Promise | WSEvents, +) { + return async (c: Context): Promise => { + const inline = await createEvents?.(c); + const registered = c.get("core").webSockets; + // Authenticated from the session cookie sent with the upgrade request, so + // the server can target this connection's user (e.g. for notifications). + const userId = c.get("user")?.id ?? null; + + return { + onOpen: (event, ws) => { + // Track the connection so the server can push messages to it. + wsRegistry.add(ws, userId); + inline?.onOpen?.(event, ws); + }, + onClose: (event, ws) => { + wsRegistry.remove(ws); + inline?.onClose?.(event, ws); + }, + onError: inline?.onError, + onMessage: (event, ws) => { + const message = parseMessage(event.data); + const target = message + ? registered.find( + webSocket => getWebSocketId(webSocket) === message.id, + ) + : undefined; + + if (message && target) { + const send = (data: unknown) => { + ws.send( + JSON.stringify({ + id: message.id, + data, + } satisfies VitNodeWSMessage), + ); + }; + + void target.onMessage({ c, data: message.data, send, ws }); + + return; + } + + // Fall back to the connection-level handler for unmatched messages. + inline?.onMessage?.(event, ws); + }, + }; + }; +} diff --git a/packages/vitnode/src/ws/manager.ts b/packages/vitnode/src/ws/manager.ts new file mode 100644 index 000000000..e6bfaa7f0 --- /dev/null +++ b/packages/vitnode/src/ws/manager.ts @@ -0,0 +1,237 @@ +import type { VitNodeWSMessage } from "./types"; + +/** + * Messages exchanged between tabs of the same origin over the + * {@link BroadcastChannel}. + */ +type CrossTabMessage = + | { payload: number; type: "ready-state" } + | { payload: VitNodeWSMessage; type: "message" } + | { payload: VitNodeWSMessage; type: "send" } + | { type: "reconnect" } + | { type: "request-state" }; + +const LEADER_LOCK = "vitnode-ws-leader"; +const CHANNEL_NAME = "vitnode-ws"; +const RECONNECT_DELAY = 2000; + +export interface WebSocketManagerOptions { + /** Called once per logical server message, in every tab. */ + onMessage: (message: VitNodeWSMessage) => void; + /** Called whenever the shared connection's `readyState` changes. */ + onReadyStateChange: (state: number) => void; + url: string; +} + +export interface WebSocketManager { + destroy: () => void; + getReadyState: () => number; + /** + * Drop the shared connection and open a fresh one. The new socket + * re-runs the handshake, so the server re-reads the session cookie - use + * this after sign-in/sign-out so the connection's user changes. + */ + reconnect: () => void; + send: (message: VitNodeWSMessage) => void; +} + +/** + * Manages a single WebSocket connection shared across every tab of the same + * origin. + * + * One tab is elected "leader" via the Web Locks API and owns the actual socket; + * the other tabs are "followers". Outgoing messages from followers are relayed + * to the leader, and every incoming server message is fanned out to all tabs + * over a {@link BroadcastChannel}. This means data is shared across tabs with + * exactly one connection and no duplicated messages. + * + * When the leader tab closes, its Web Lock is released and a follower is + * automatically promoted, re-opening the socket. + * + * If the browser lacks the Web Locks API or `BroadcastChannel`, the manager + * gracefully falls back to a per-tab connection. + */ +export const createWebSocketManager = ({ + url, + onMessage, + onReadyStateChange, +}: WebSocketManagerOptions): WebSocketManager => { + let socket: null | WebSocket = null; + let isLeader = false; + let isDestroyed = false; + let readyState = 0; // WebSocket.CONNECTING + let releaseLeadership: (() => void) | undefined; + let reconnectTimeout: ReturnType | undefined; + const outgoingQueue: VitNodeWSMessage[] = []; + + const channel = + typeof BroadcastChannel === "undefined" + ? null + : new BroadcastChannel(CHANNEL_NAME); + + const post = (message: CrossTabMessage) => channel?.postMessage(message); + + const setReadyState = (state: number) => { + readyState = state; + onReadyStateChange(state); + }; + + const sendOverSocket = (message: VitNodeWSMessage) => { + if (socket?.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(message)); + } else { + // Buffer until the leader's socket is open. + outgoingQueue.push(message); + } + }; + + // --- Leader: owns the real socket --------------------------------------- + const openSocket = () => { + if (isDestroyed) return; + + const ws = new WebSocket(url); + socket = ws; + + ws.onopen = () => { + setReadyState(ws.readyState); + post({ payload: ws.readyState, type: "ready-state" }); + outgoingQueue + .splice(0) + .forEach(message => ws.send(JSON.stringify(message))); + }; + + ws.onmessage = (event: MessageEvent) => { + if (typeof event.data !== "string") return; + + let message: VitNodeWSMessage; + try { + message = JSON.parse(event.data) as VitNodeWSMessage; + } catch { + return; + } + + onMessage(message); // this (leader) tab + post({ payload: message, type: "message" }); // every other tab + }; + + ws.onclose = () => { + setReadyState(ws.readyState); + post({ payload: ws.readyState, type: "ready-state" }); + socket = null; + if (!isDestroyed && isLeader) { + reconnectTimeout = setTimeout(openSocket, RECONNECT_DELAY); + } + }; + }; + + const becomeLeader = () => { + if (isDestroyed) return; + isLeader = true; + openSocket(); + }; + + // Drop the current socket (without triggering its auto-reconnect) and open a + // fresh one immediately, so the handshake re-authenticates with the latest + // cookies. Only the leader owns a socket. + const reconnectLeader = () => { + if (!isLeader || isDestroyed) return; + if (reconnectTimeout) clearTimeout(reconnectTimeout); + + if (socket) { + socket.onclose = null; + socket.onerror = null; + socket.onmessage = null; + socket.onopen = null; + socket.close(); + socket = null; + } + + openSocket(); + }; + + // --- Leadership election via Web Locks ---------------------------------- + const requestLeadership = () => { + if (typeof navigator === "undefined" || !navigator.locks || !channel) { + // No cross-tab primitives available: run a standalone connection. + becomeLeader(); + + return; + } + + void navigator.locks + .request(LEADER_LOCK, { mode: "exclusive" }, async () => { + becomeLeader(); + // Hold the lock (and leadership) until the manager is destroyed or the + // tab is closed, at which point another tab is promoted. + await new Promise(resolve => { + releaseLeadership = resolve; + }); + }) + .catch(() => { + becomeLeader(); + }); + }; + + // --- Cross-tab message handling (leader + followers) -------------------- + if (channel) { + channel.onmessage = (event: MessageEvent) => { + const message = event.data; + + switch (message.type) { + case "message": + onMessage(message.payload); + break; + case "ready-state": + if (!isLeader) setReadyState(message.payload); + break; + case "reconnect": + if (isLeader) reconnectLeader(); + break; + case "request-state": + if (isLeader) post({ payload: readyState, type: "ready-state" }); + break; + case "send": + if (isLeader) sendOverSocket(message.payload); + break; + } + }; + } + + const send = (message: VitNodeWSMessage) => { + if (isLeader || !channel) { + sendOverSocket(message); + } else { + // Ask the leader tab to send on our behalf. + post({ payload: message, type: "send" }); + } + }; + + const reconnect = () => { + if (isLeader || !channel) { + reconnectLeader(); + } else { + // Ask the leader tab (which owns the socket) to reconnect. + post({ type: "reconnect" }); + } + }; + + requestLeadership(); + // In case we joined while another tab already owns an open connection. + post({ type: "request-state" }); + + const destroy = () => { + isDestroyed = true; + if (reconnectTimeout) clearTimeout(reconnectTimeout); + socket?.close(); + socket = null; + releaseLeadership?.(); + channel?.close(); + }; + + return { + destroy, + getReadyState: () => readyState, + reconnect, + send, + }; +}; diff --git a/packages/vitnode/src/ws/notifications.ts b/packages/vitnode/src/ws/notifications.ts new file mode 100644 index 000000000..0f4a7dee5 --- /dev/null +++ b/packages/vitnode/src/ws/notifications.ts @@ -0,0 +1,28 @@ +import { CONFIG_PLUGIN } from "@/config"; + +import { createWebSocketChannel } from "./types"; + +export type VitNodeNotificationType = "error" | "info" | "success" | "warning"; + +/** A notification pushed to a single user over the WebSocket. */ +export interface VitNodeNotification { + description?: string; + title: string; + type?: VitNodeNotificationType; +} + +/** + * Per-user notification channel. Every user subscribes to the same id, but the + * server delivers each notification only to the target user's connections (see + * `realtime.sendToUser`), so notifications never leak to other users. + * + * Public id: `@vitnode/core_notifications_inbox`. + */ +export const notificationsChannel = createWebSocketChannel< + never, + VitNodeNotification +>({ + id: "inbox", + module: "notifications", + pluginId: CONFIG_PLUGIN.pluginId, +}); diff --git a/packages/vitnode/src/ws/provider.tsx b/packages/vitnode/src/ws/provider.tsx new file mode 100644 index 000000000..9d4f2dbac --- /dev/null +++ b/packages/vitnode/src/ws/provider.tsx @@ -0,0 +1,122 @@ +"use client"; + +import React from "react"; + +import { CONFIG } from "@/lib/config"; + +import type { WebSocketManager } from "./manager"; +import type { VitNodeWSMessage } from "./types"; + +import { createWebSocketManager } from "./manager"; + +type Listener = (data: unknown) => void; + +interface VitNodeWebSocketContextValue { + readyState: number; + reconnect: () => void; + send: (id: string, data: unknown) => void; + subscribe: (id: string, listener: Listener) => () => void; +} + +const VitNodeWebSocketContext = + React.createContext(null); + +export const useVitNodeWebSocketContext = (): VitNodeWebSocketContextValue => { + const context = React.use(VitNodeWebSocketContext); + if (!context) { + throw new Error( + "useVitNodeWebSocket must be used within a .", + ); + } + + return context; +}; + +const getWebSocketUrl = (): string => { + const url = new URL("/api/ws", CONFIG.api); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + + return url.toString(); +}; + +/** + * Opens a single, always-on WebSocket connection to `/api/ws` and multiplexes + * it across every {@link useVitNodeWebSocket} subscriber by message id. + * + * The connection is shared across all tabs of the same origin (one leader tab + * owns the socket, the rest relay through it), so messages are delivered to + * every tab. See {@link createWebSocketManager}. + */ +export const VitNodeWebSocketProvider = ({ + children, +}: { + children: React.ReactNode; +}) => { + const listenersRef = React.useRef>>(new Map()); + const managerRef = React.useRef(null); + const pendingSendRef = React.useRef([]); + const [readyState, setReadyState] = React.useState( + WebSocket.CONNECTING, + ); + + const dispatch = React.useCallback((message: VitNodeWSMessage) => { + listenersRef.current + .get(message.id) + ?.forEach(listener => listener(message.data)); + }, []); + + React.useEffect(() => { + const manager = createWebSocketManager({ + url: getWebSocketUrl(), + onMessage: dispatch, + onReadyStateChange: setReadyState, + }); + managerRef.current = manager; + // Flush anything sent before the manager was ready. + pendingSendRef.current.splice(0).forEach(message => manager.send(message)); + + return () => { + manager.destroy(); + managerRef.current = null; + }; + }, [dispatch]); + + const subscribe = React.useCallback((id: string, listener: Listener) => { + const listeners = listenersRef.current.get(id) ?? new Set(); + listeners.add(listener); + listenersRef.current.set(id, listeners); + + return () => { + listeners.delete(listener); + if (listeners.size === 0) { + listenersRef.current.delete(id); + } + }; + }, []); + + const send = React.useCallback((id: string, data: unknown) => { + const message: VitNodeWSMessage = { data, id }; + const manager = managerRef.current; + + if (manager) { + manager.send(message); + } else { + pendingSendRef.current.push(message); + } + }, []); + + const reconnect = React.useCallback(() => { + managerRef.current?.reconnect(); + }, []); + + const value = React.useMemo( + () => ({ readyState, reconnect, send, subscribe }), + [readyState, reconnect, send, subscribe], + ); + + return ( + + {children} + + ); +}; diff --git a/packages/vitnode/src/ws/registry.ts b/packages/vitnode/src/ws/registry.ts new file mode 100644 index 000000000..5f391a205 --- /dev/null +++ b/packages/vitnode/src/ws/registry.ts @@ -0,0 +1,75 @@ +import type { WSContext } from "hono/ws"; + +import type { VitNodeWSChannel, VitNodeWSMessage } from "./types"; + +/** + * In-memory registry of the WebSocket connections open on the current server + * process, used to push messages to clients. Each connection is tagged with the + * id of the authenticated user it belongs to (or `null` when anonymous), so a + * payload can be delivered to a single user across all of their browsers. + * + * Note: this is per-process. For a horizontally-scaled deployment, back it with + * a shared pub/sub (e.g. Redis) so messages also reach clients on other + * instances. + */ +const connections = new Map(); + +const WS_OPEN = 1; // WSContext.readyState when the socket is open. + +const sendTo = (ws: WSContext, id: string, data: unknown): void => { + if (ws.readyState === WS_OPEN) { + ws.send(JSON.stringify({ data, id } satisfies VitNodeWSMessage)); + } +}; + +export const wsRegistry = { + add: (ws: WSContext, userId: null | number): void => { + connections.set(ws, userId); + }, + /** Send `{ id, data }` to every open connection. */ + broadcast: (id: string, data: unknown): void => { + connections.forEach((_userId, ws) => sendTo(ws, id, data)); + }, + remove: (ws: WSContext): void => { + connections.delete(ws); + }, + /** Send `{ id, data }` only to the connections owned by `userId`. */ + sendToUser: (userId: number, id: string, data: unknown): void => { + connections.forEach((connectionUserId, ws) => { + if (connectionUserId === userId) sendTo(ws, id, data); + }); + }, +}; + +export interface VitNodeRealtime { + /** + * Push a payload to **every** client subscribed to `channel`. Delivered to + * the matching {@link useVitNodeWebSocket} subscribers (only views currently + * on screen react). Use for non-sensitive "data changed" signals. + */ + broadcast: ( + channel: VitNodeWSChannel, + data: Receive, + ) => void; + /** + * Push a payload only to the connections of a single user, across all of + * their browsers/devices. Use for per-user data such as notifications. + */ + sendToUser: ( + userId: number, + channel: VitNodeWSChannel, + data: Receive, + ) => void; +} + +/** + * The realtime helper exposed on the request context as `c.get("realtime")`. + */ +export const realtime: VitNodeRealtime = { + broadcast: (channel, data) => { + wsRegistry.broadcast(channel.id, data); + }, + sendToUser: (userId, channel, data) => { + wsRegistry.sendToUser(userId, channel.id, data); + }, +}; diff --git a/packages/vitnode/src/ws/types.ts b/packages/vitnode/src/ws/types.ts new file mode 100644 index 000000000..55233a7a1 --- /dev/null +++ b/packages/vitnode/src/ws/types.ts @@ -0,0 +1,73 @@ +/** + * Shared WebSocket types used by both the server (`@vitnode/core/ws/handle`, + * `@vitnode/core/api/lib/websocket`) and the client (`@vitnode/core/ws/provider`, + * `@vitnode/core/ws/use-websocket`). This module must stay free of any + * server-only imports so it is safe to bundle on the client. + */ + +/** + * Envelope for every message exchanged over the VitNode WebSocket. + * + * The single `/ws` connection is multiplexed: each message carries the `id` of + * the socket it targets (`{pluginId}_{module}_{id}`), so the server can + * dispatch it to the right handler and the client can route it to the right + * subscriber. + */ +export interface VitNodeWSMessage { + data: TData; + id: string; +} + +/** + * A typed contract for a WebSocket channel, shared between the server handler + * and the client hook so both sides agree on the id and the message shapes. + * + * - `Send` - what the client sends to the server. + * - `Receive` - what the client receives from the server. + */ +export interface VitNodeWSChannel { + /** + * Phantom field - never present at runtime. It only exists to carry the + * `Send`/`Receive` types so they can be inferred from a channel value. + */ + __types?: { + receive: Receive; + send: Send; + }; + id: string; +} + +/** + * Compose the public id a client uses to target a WebSocket: + * `{pluginId}_{module}_{id}`. + */ +export const getWebSocketId = (parts: { + id: string; + module: string; + pluginId: string; +}): string => `${parts.pluginId}_${parts.module}_${parts.id}`; + +/** + * Define a typed {@link VitNodeWSChannel} that both the server and the client + * can import as the single source of truth for a socket's id and message + * shapes. + * + * @example + * ```ts + * export const echoChannel = createWebSocketChannel({ + * pluginId: "@vitnode/blog", + * module: "posts", + * id: "echo", + * }); + * ``` + */ +export const createWebSocketChannel = < + Send = unknown, + Receive = unknown, +>(parts: { + id: string; + module: string; + pluginId: string; +}): VitNodeWSChannel => ({ + id: getWebSocketId(parts), +}); diff --git a/packages/vitnode/src/ws/use-websocket.ts b/packages/vitnode/src/ws/use-websocket.ts new file mode 100644 index 000000000..88ec1633b --- /dev/null +++ b/packages/vitnode/src/ws/use-websocket.ts @@ -0,0 +1,63 @@ +"use client"; + +import React from "react"; + +import type { VitNodeWSChannel } from "./types"; + +import { useVitNodeWebSocketContext } from "./provider"; + +export interface UseVitNodeWebSocketReturn { + /** + * The current `readyState` of the shared connection (`0` CONNECTING, `1` + * OPEN, `2` CLOSING, `3` CLOSED). + */ + readyState: number; + /** Send a typed payload to this socket. */ + send: (data: Send) => void; +} + +/** + * Subscribe to a VitNode WebSocket over the shared `/ws` connection. + * + * Pass a {@link VitNodeWSChannel} (recommended - it carries the id and the + * message types) or a raw id string. Messages are routed by id, so only the + * payloads addressed to this channel reach `options.onMessage`. + * + * @example + * ```tsx + * const { send, readyState } = useVitNodeWebSocket(echoChannel, { + * onMessage: data => setMessages(prev => [...prev, data]), + * }); + * ``` + */ +export function useVitNodeWebSocket( + channel: string | VitNodeWSChannel, + options?: { + onMessage?: (data: Receive) => void; + }, +): UseVitNodeWebSocketReturn { + const { + readyState, + send: contextSend, + subscribe, + } = useVitNodeWebSocketContext(); + const id = typeof channel === "string" ? channel : channel.id; + + // Keep the latest handler in a ref so subscribing does not depend on it. + const onMessageRef = React.useRef(options?.onMessage); + React.useEffect(() => { + onMessageRef.current = options?.onMessage; + }); + + React.useEffect( + () => subscribe(id, data => onMessageRef.current?.(data as Receive)), + [id, subscribe], + ); + + const send = React.useCallback( + (data: Send) => contextSend(id, data), + [contextSend, id], + ); + + return { readyState, send }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9f7347810..e1ddf46e6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -55,7 +55,7 @@ importers: version: 4.12.21 next-intl: specifier: ^4.11.0 - version: 4.12.0(next@16.2.6(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) + version: 4.12.0(next@16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) react: specifier: ^19.2.5 version: 19.2.6 @@ -65,6 +65,9 @@ importers: use-intl: specifier: ^4.11.0 version: 4.12.0(react@19.2.6) + ws: + specifier: ^8.21.0 + version: 8.21.0 zod: specifier: ^4.4.1 version: 4.4.3 @@ -84,6 +87,12 @@ importers: '@types/react-dom': specifier: ^19.2.3 version: 19.2.3(@types/react@19.2.14) + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 + '@vitnode/blog': + specifier: workspace:* + version: link:../../plugins/blog '@vitnode/config': specifier: workspace:* version: link:../../packages/config @@ -152,7 +161,7 @@ importers: version: 16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6) next-intl: specifier: ^4.11.0 - version: 4.12.0(next@16.2.6(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) + version: 4.12.0(next@16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) node-cron: specifier: ^4.2.1 version: 4.2.1 @@ -597,7 +606,7 @@ importers: version: 16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6) next-intl: specifier: ^4.12.0 - version: 4.12.0(next@16.2.6(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) + version: 4.12.0(next@16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) react: specifier: ^19.2.6 version: 19.2.6 @@ -666,7 +675,7 @@ importers: version: 16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6) next-intl: specifier: ^4.11.0 - version: 4.12.0(next@16.2.6(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) + version: 4.12.0(next@16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3) react: specifier: ^19.2.5 version: 19.2.6 @@ -9037,6 +9046,18 @@ packages: utf-8-validate: optional: true + ws@8.21.0: + resolution: {integrity: sha512-Vsp28b7DRcimFQvrqu2Wek3z1iYxDCWqHYB8Qsnk/S4RfaCQzPGPyBNuVjJV3cd6UiKtUtp6sNM77gWvzcCH+g==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + wsl-utils@0.3.1: resolution: {integrity: sha512-g/eziiSUNBSsdDJtCLB8bdYEUMj4jR7AGeUo96p/3dTafgjHhpF4RiCFPiRILwjQoDXx5MqkBr4fwWtR3Ky4Wg==} engines: {node: '>=20'} @@ -15556,7 +15577,7 @@ snapshots: next-intl-swc-plugin-extractor@4.12.0: {} - next-intl@4.12.0(next@16.2.6(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3): + next-intl@4.12.0(next@16.2.6(@babel/core@7.29.0)(@playwright/test@1.60.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.2.6(react@19.2.6))(react@19.2.6))(react@19.2.6)(typescript@6.0.3): dependencies: '@formatjs/intl-localematcher': 0.8.8 '@parcel/watcher': 2.5.6 @@ -17651,6 +17672,8 @@ snapshots: ws@8.18.3: {} + ws@8.21.0: {} + wsl-utils@0.3.1: dependencies: is-wsl: 3.1.1 From 0e3236c2493c48cb4726d90778986e22c26fe574 Mon Sep 17 00:00:00 2001 From: aXenDeveloper Date: Mon, 1 Jun 2026 22:53:47 +0200 Subject: [PATCH 2/2] =?UTF-8?q?fix(ws):=20=F0=9F=94=84=20handle=20WebSocke?= =?UTF-8?q?t=20errors=20and=20improve=20connection=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vitnode/src/views/layouts/theme/layout.tsx | 3 ++- packages/vitnode/src/ws/handle.ts | 14 ++++++++++++-- packages/vitnode/src/ws/provider.tsx | 2 +- packages/vitnode/src/ws/registry.ts | 6 +++--- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/packages/vitnode/src/views/layouts/theme/layout.tsx b/packages/vitnode/src/views/layouts/theme/layout.tsx index 266323233..ccff32d13 100644 --- a/packages/vitnode/src/views/layouts/theme/layout.tsx +++ b/packages/vitnode/src/views/layouts/theme/layout.tsx @@ -14,7 +14,8 @@ export const ThemeLayout = async ({ children: React.ReactNode; vitNodeConfig: VitNodeConfig; }) => { - const { user } = await getSessionApi(); + const session = await getSessionApi(); + const user = session?.user ?? null; return ( <> diff --git a/packages/vitnode/src/ws/handle.ts b/packages/vitnode/src/ws/handle.ts index 99d11e581..0be8549c5 100644 --- a/packages/vitnode/src/ws/handle.ts +++ b/packages/vitnode/src/ws/handle.ts @@ -60,7 +60,7 @@ export function handleVitNodeWebSocket( ) { return async (c: Context): Promise => { const inline = await createEvents?.(c); - const registered = c.get("core").webSockets; + const registered = c.get("core")?.webSockets ?? []; // Authenticated from the session cookie sent with the upgrade request, so // the server can target this connection's user (e.g. for notifications). const userId = c.get("user")?.id ?? null; @@ -94,7 +94,17 @@ export function handleVitNodeWebSocket( ); }; - void target.onMessage({ c, data: message.data, send, ws }); + try { + Promise.resolve( + target.onMessage({ c, data: message.data, send, ws }), + ).catch((error: unknown) => { + // eslint-disable-next-line no-console + console.error("WebSocket handler error:", error); + }); + } catch (error) { + // eslint-disable-next-line no-console + console.error("WebSocket handler error:", error); + } return; } diff --git a/packages/vitnode/src/ws/provider.tsx b/packages/vitnode/src/ws/provider.tsx index 9d4f2dbac..083b47260 100644 --- a/packages/vitnode/src/ws/provider.tsx +++ b/packages/vitnode/src/ws/provider.tsx @@ -56,7 +56,7 @@ export const VitNodeWebSocketProvider = ({ const managerRef = React.useRef(null); const pendingSendRef = React.useRef([]); const [readyState, setReadyState] = React.useState( - WebSocket.CONNECTING, + typeof WebSocket !== "undefined" ? WebSocket.CONNECTING : 0, ); const dispatch = React.useCallback((message: VitNodeWSMessage) => { diff --git a/packages/vitnode/src/ws/registry.ts b/packages/vitnode/src/ws/registry.ts index 5f391a205..3b12af2e3 100644 --- a/packages/vitnode/src/ws/registry.ts +++ b/packages/vitnode/src/ws/registry.ts @@ -14,11 +14,11 @@ import type { VitNodeWSChannel, VitNodeWSMessage } from "./types"; */ const connections = new Map(); -const WS_OPEN = 1; // WSContext.readyState when the socket is open. - const sendTo = (ws: WSContext, id: string, data: unknown): void => { - if (ws.readyState === WS_OPEN) { + try { ws.send(JSON.stringify({ data, id } satisfies VitNodeWSMessage)); + } catch (_error) { + wsRegistry.remove(ws); } };