Skip to content

Commit b6a3540

Browse files
committed
Add capacity snapshot
1 parent e77d0e1 commit b6a3540

4 files changed

Lines changed: 123 additions & 17 deletions

File tree

web/src/index.ts

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
serializeContext,
77
parseContext,
88
OBS_CONTEXT_HEADER,
9+
type CapacitySnapshot,
910
type CloseReason,
1011
type RequestContext,
1112
} from "./observability";
@@ -53,6 +54,18 @@ export class BaseSession extends Container<Env> {
5354
// embed, which is cosmetic — never a correctness concern.
5455
protected obsContext?: RequestContext;
5556

57+
private async fetchCapacitySnapshot(): Promise<CapacitySnapshot | undefined> {
58+
try {
59+
const queueDO = this.env.QUEUE_DO.get(this.env.QUEUE_DO.idFromName("global-queue"));
60+
const res = await queueDO.fetch(`http://do/internal/capacity?branch=${encodeURIComponent(this.branch)}`);
61+
if (!res.ok) return undefined;
62+
return await res.json<CapacitySnapshot>();
63+
} catch (e) {
64+
console.error("[obs] fetchCapacitySnapshot failed:", e);
65+
return undefined;
66+
}
67+
}
68+
5669
constructor(ctx: DurableObjectState, env: Env) {
5770
// Pass the container config to the super constructor.
5871
super(ctx, env, {
@@ -92,11 +105,15 @@ export class BaseSession extends Container<Env> {
92105
this.sessionState = "PROVISIONING";
93106

94107
this.obsContext = parseContext(request.headers.get(OBS_CONTEXT_HEADER));
95-
void notify.sessionStarted(this.env, {
96-
sessionId,
97-
branch: this.branch,
98-
context: this.obsContext ?? { ip: "unknown" },
99-
});
108+
void (async () => {
109+
const capacity = await this.fetchCapacitySnapshot();
110+
await notify.sessionStarted(this.env, {
111+
sessionId,
112+
branch: this.branch,
113+
context: this.obsContext ?? { ip: "unknown" },
114+
capacity,
115+
});
116+
})();
100117

101118
// Start the container, but don't wait for it to finish.
102119
void this.startContainer(sessionId);
@@ -384,17 +401,21 @@ export class BaseSession extends Container<Env> {
384401

385402
await this.notifyQueueManagerOfClosure();
386403

387-
void notify.sessionEnded(this.env, {
388-
sessionId: this.ctx.id.name ?? "unknown",
389-
branch: this.branch,
390-
reason,
391-
startedAt: this.sessionMetadata?.startedAt,
392-
endedAt,
393-
scriptCount: this.scriptCount,
394-
logLineCount: this.logLineCount,
395-
extensionGranted: this.extensionGranted,
396-
context: this.obsContext,
397-
});
404+
void (async () => {
405+
const capacity = await this.fetchCapacitySnapshot();
406+
await notify.sessionEnded(this.env, {
407+
sessionId: this.ctx.id.name ?? "unknown",
408+
branch: this.branch,
409+
reason,
410+
startedAt: this.sessionMetadata?.startedAt,
411+
endedAt,
412+
scriptCount: this.scriptCount,
413+
logLineCount: this.logLineCount,
414+
extensionGranted: this.extensionGranted,
415+
context: this.obsContext,
416+
capacity,
417+
});
418+
})();
398419
}
399420

400421
async notifyQueueManagerOfClosure() {
@@ -576,9 +597,31 @@ export class QueueDO extends DurableObject<Env> {
576597
return this.activeCountForType(type) < max && this.activeSessions.size < this.maxTotalSessions;
577598
}
578599

600+
private snapshotFor(branch: string): CapacitySnapshot {
601+
return {
602+
branch,
603+
branchUsed: this.activeCountForType(branch),
604+
branchMax: this.maxPerType[branch] ?? 2,
605+
totalUsed: this.activeSessions.size,
606+
totalMax: this.maxTotalSessions,
607+
queueDepth: this.waitingQueue.length,
608+
};
609+
}
610+
579611
async fetch(request: Request): Promise<Response> {
580612
const url = new URL(request.url);
581613

614+
// Internal-only: the worker entrypoint routes /api/* and /ws/* to DOs,
615+
// so /internal/* paths are unreachable externally (404 at the worker)
616+
// but work over DO stub-to-stub calls, which bypass the entrypoint.
617+
if (url.pathname === "/internal/capacity") {
618+
const branch = url.searchParams.get("branch");
619+
if (!branch) return new Response("Missing branch", { status: 400 });
620+
return new Response(JSON.stringify(this.snapshotFor(branch)), {
621+
headers: { "Content-Type": "application/json" },
622+
});
623+
}
624+
582625
if (url.pathname === "/api/request-session") {
583626
const sessionType = url.searchParams.get("type") || "public";
584627
const rawIP = request.headers.get("CF-Connecting-IP") || "unknown";
@@ -613,6 +656,16 @@ export class QueueDO extends DurableObject<Env> {
613656
}});
614657
const position = this.waitingQueue.filter(w => w.sessionType === sessionType).length;
615658

659+
const obsContext = parseContext(request.headers.get(OBS_CONTEXT_HEADER));
660+
if (obsContext) {
661+
void notify.queueEntered(this.env, {
662+
sessionType,
663+
position,
664+
capacity: this.snapshotFor(sessionType),
665+
context: obsContext,
666+
});
667+
}
668+
616669
return new Response(JSON.stringify({
617670
status: "QUEUED",
618671
ticketId,

web/src/observability/embeds.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import type {
2+
CapacitySnapshot,
23
CloseReason,
34
ErrorEvent,
5+
QueueEnteredEvent,
46
RequestContext,
57
SessionEndedEvent,
68
SessionStartedEvent,
@@ -107,6 +109,9 @@ const networkFields = (ctx: RequestContext): DiscordEmbedField[] => [
107109
{ name: "Network", value: ispLink(ctx), inline: true },
108110
];
109111

112+
const capacityLine = (c: CapacitySnapshot, opts?: { skipQueue: boolean }): string =>
113+
`${c.branch} ${c.branchUsed}/${c.branchMax} · total ${c.totalUsed}/${c.totalMax}${!opts?.skipQueue && c.queueDepth > 0 ? ` · queue ${c.queueDepth}` : ""}`;
114+
110115
const contextSubtext = (ctx: RequestContext | undefined): string | undefined => {
111116
if (!ctx) return undefined;
112117

@@ -139,6 +144,7 @@ export function buildSessionStartedEmbed(e: SessionStartedEvent): DiscordEmbed {
139144
`### ${locationHeader(ctx)}`,
140145
sub(`started ${relativeTimestamp()}${ctx.colo ? ` · edge ${code(ctx.colo)}` : ""}`),
141146
];
147+
if (e.capacity) lines.push(sub(capacityLine(e.capacity)));
142148

143149
const fields: DiscordEmbedField[] = [
144150
{ name: "Branch", value: `${emoji} ${code(label)}`, inline: true },
@@ -175,6 +181,7 @@ export function buildSessionEndedEmbed(e: SessionEndedEvent): DiscordEmbed {
175181
`### ${ctx ? locationHeader(ctx) : "🌐 Unknown location"}`,
176182
sub(`ended ${relativeTimestamp()}${durationMs !== undefined ? ` · ran for **${duration(durationMs)}**` : ""}`),
177183
];
184+
if (e.capacity) lines.push(sub(capacityLine(e.capacity)));
178185

179186
const fields: DiscordEmbedField[] = [
180187
{ name: "Branch", value: `${emoji} ${code(label)}`, inline: true },
@@ -236,6 +243,30 @@ export function buildErrorEmbed(e: ErrorEvent): DiscordEmbed {
236243
};
237244
}
238245

246+
export function buildQueueEnteredEmbed(e: QueueEnteredEvent): DiscordEmbed {
247+
const { emoji, label } = branchMeta(e.sessionType);
248+
const ctx = e.context;
249+
250+
const lines: string[] = [
251+
`### ${locationHeader(ctx)}`,
252+
sub(`position ${code("#" + e.position)} · ${capacityLine(e.capacity, { skipQueue: true })}`),
253+
];
254+
255+
const fields: DiscordEmbedField[] = [
256+
{ name: "Branch", value: `${emoji} ${code(label)}`, inline: true },
257+
...networkFields(ctx),
258+
];
259+
260+
return {
261+
title: "⏳ Queue entered",
262+
description: truncate(lines.join("\n"), LIMIT_DESCRIPTION),
263+
color: COLORS.info,
264+
timestamp: new Date().toISOString(),
265+
fields,
266+
footer: FOOTER,
267+
};
268+
}
269+
239270
export function buildWarningEmbed(e: WarningEvent): DiscordEmbed {
240271
const lines: string[] = [
241272
`## ⚠️ ${truncate(e.title, 120)}`,

web/src/observability/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import { post } from "./webhook";
22
import {
33
buildErrorEmbed,
4+
buildQueueEnteredEmbed,
45
buildSessionEndedEmbed,
56
buildSessionStartedEmbed,
67
buildWarningEmbed,
78
} from "./embeds";
89
import type {
910
ErrorEvent,
11+
QueueEnteredEvent,
1012
SessionEndedEvent,
1113
SessionStartedEvent,
1214
WarningEvent,
1315
} from "./types";
1416

15-
export type { CloseReason, RequestContext } from "./types";
17+
export type { CapacitySnapshot, CloseReason, RequestContext } from "./types";
1618
export { extractRequestContext, serializeContext, parseContext, OBS_CONTEXT_HEADER } from "./context";
1719

1820
type Env = { DISCORD_WEBHOOK_URL?: string };
@@ -22,6 +24,8 @@ export const notify = {
2224
post(env, { embeds: [buildSessionStartedEmbed(event)] }),
2325
sessionEnded: (env: Env, event: SessionEndedEvent) =>
2426
post(env, { embeds: [buildSessionEndedEmbed(event)] }),
27+
queueEntered: (env: Env, event: QueueEnteredEvent) =>
28+
post(env, { embeds: [buildQueueEnteredEmbed(event)] }),
2529
error: (env: Env, event: ErrorEvent) =>
2630
post(env, { embeds: [buildErrorEmbed(event)] }),
2731
warning: (env: Env, event: WarningEvent) =>

web/src/observability/types.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,20 @@ export interface RequestContext {
1919
userAgent?: string;
2020
}
2121

22+
export interface CapacitySnapshot {
23+
branch: string;
24+
branchUsed: number;
25+
branchMax: number;
26+
totalUsed: number;
27+
totalMax: number;
28+
queueDepth: number;
29+
}
30+
2231
export interface SessionStartedEvent {
2332
sessionId: string;
2433
branch: string;
2534
context: RequestContext;
35+
capacity?: CapacitySnapshot;
2636
}
2737

2838
export interface SessionEndedEvent {
@@ -35,6 +45,14 @@ export interface SessionEndedEvent {
3545
logLineCount: number;
3646
extensionGranted: boolean;
3747
context?: RequestContext;
48+
capacity?: CapacitySnapshot;
49+
}
50+
51+
export interface QueueEnteredEvent {
52+
sessionType: string;
53+
position: number;
54+
capacity: CapacitySnapshot;
55+
context: RequestContext;
3856
}
3957

4058
export interface ErrorEvent {

0 commit comments

Comments
 (0)