Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/app/api/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ export async function GET(req: NextRequest) {
let lastCheckedSyncedAt: string | null = null;
let lastCheckedUnreadCount: number | null = null;

let isClosed = false;

const stream = new ReadableStream({
start(controller) {
const checkData = async () => {
if (isClosed) return;
try {
const { data: goals } = await supabaseAdmin
.from("goals")
Expand Down Expand Up @@ -89,7 +92,7 @@ export async function GET(req: NextRequest) {
lastCheckedUnreadCount = currentUnreadCount;
}

if (hasChanges) {
if (hasChanges && !isClosed) {
controller.enqueue(`data: ${JSON.stringify(payload)}\n\n`);
}
} catch (error) {
Expand All @@ -101,12 +104,17 @@ export async function GET(req: NextRequest) {
// guaranteed to be in place before any async work begins. This prevents
// a race where abort() fires before the listener is attached.
const interval = setInterval(() => {
checkData();
if (!isClosed) checkData();
}, POLL_INTERVAL_MS);

req.signal.addEventListener("abort", () => {
isClosed = true;
clearInterval(interval);
controller.close();
try {
controller.close();
} catch (e) {
// ignore already closed
}

// Decrement the connection counter so the slot becomes available again.
const remaining = activeStreamConnections.get(userId) ?? 1;
Expand Down
11 changes: 5 additions & 6 deletions test/sse-stream-route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ vi.mock("@/lib/supabase", () => ({

function makeRequest(): NextRequest {
const controller = new AbortController();
return new NextRequest("http://localhost/api/stream", {
signal: controller.signal,
});
const req = new NextRequest("http://localhost/api/stream");
Object.defineProperty(req, 'signal', { value: controller.signal });
return req;
}

function makeAbortableRequest(): { req: NextRequest; abort: () => void } {
const controller = new AbortController();
const req = new NextRequest("http://localhost/api/stream", {
signal: controller.signal,
});
const req = new NextRequest("http://localhost/api/stream");
Object.defineProperty(req, 'signal', { value: controller.signal });
return { req, abort: () => controller.abort() };
}

Expand Down
Loading