11import { DurableObject } from "cloudflare:workers" ;
22import { Container , getContainer } from "@cloudflare/containers" ;
3+ import {
4+ notify ,
5+ extractRequestContext ,
6+ serializeContext ,
7+ parseContext ,
8+ OBS_CONTEXT_HEADER ,
9+ type CloseReason ,
10+ type RequestContext ,
11+ } from "./observability" ;
312
413// --- Type Definitions ---
514interface WebSocketMessage {
@@ -14,6 +23,7 @@ export interface Env {
1423 GMOD_DEV : DurableObjectNamespace ;
1524 QUEUE_DO : DurableObjectNamespace < QueueDO > ;
1625 LOG_BUCKET : R2Bucket ;
26+ DISCORD_WEBHOOK_URL ?: string ;
1727}
1828
1929// --- Session Time Limits ---
@@ -25,6 +35,8 @@ const ACTIVITY_PING_INTERVAL = 30 * 1000; // 30 seconds
2535
2636// --- Container / Session Manager Base Class ---
2737export class BaseSession extends Container < Env > {
38+ protected branch : string = "public" ;
39+
2840 sessionState : "NEW" | "PROVISIONING" | "ACTIVE" | "CLOSED" ;
2941 browserSockets : Set < WebSocket > ;
3042 containerSocket : WebSocket | null ;
@@ -37,6 +49,10 @@ export class BaseSession extends Container<Env> {
3749 sessionDuration ?: number ;
3850 extensionGranted = false ;
3951
52+ // In-memory only: on DO hibernation we lose geo/ISP fields in the end
53+ // embed, which is cosmetic — never a correctness concern.
54+ protected obsContext ?: RequestContext ;
55+
4056 constructor ( ctx : DurableObjectState , env : Env ) {
4157 // Pass the container config to the super constructor.
4258 super ( ctx , env , {
@@ -74,6 +90,14 @@ export class BaseSession extends Container<Env> {
7490 // This is how we solve the race condition.
7591 if ( url . pathname === "/ws/browser" && this . sessionState === "NEW" ) {
7692 this . sessionState = "PROVISIONING" ;
93+
94+ this . obsContext = parseContext ( request . headers . get ( OBS_CONTEXT_HEADER ) ) ;
95+ void notify . sessionStarted ( this . env , {
96+ sessionId,
97+ branch : this . branch ,
98+ context : this . obsContext ?? { ip : "unknown" } ,
99+ } ) ;
100+
77101 // Start the container, but don't wait for it to finish.
78102 void this . startContainer ( sessionId ) ;
79103 }
@@ -95,8 +119,18 @@ export class BaseSession extends Container<Env> {
95119 ( ws as any ) . accept ( ) ;
96120
97121 if ( this . containerSocket || this . sessionState !== "PROVISIONING" ) {
98- console . warn ( `Agent connection attempted in unexpected state: ${ this . sessionState } . Already had containerSocket?: ${ ! ! this . containerSocket } ` ) ;
122+ const warnMsg = `Agent connection attempted in unexpected state: ${ this . sessionState } . Already had containerSocket?: ${ ! ! this . containerSocket } ` ;
123+ console . warn ( warnMsg ) ;
99124 // TODO: Why is this firing inappropriately for non-public branches?
125+ // Webhook is wired up to collect diagnostic data; drop it once
126+ // the root cause is known.
127+ void notify . error ( this . env , {
128+ where : "handleAgentConnection: unexpected state" ,
129+ error : new Error ( warnMsg ) ,
130+ sessionId : this . ctx . id . name ,
131+ branch : this . branch ,
132+ context : this . obsContext ,
133+ } ) ;
100134 // ws.close(1013, "Duplicate or unexpected agent connection.");
101135 // return;
102136 }
@@ -113,8 +147,8 @@ export class BaseSession extends Container<Env> {
113147 this . ctx . storage . setAlarm ( Date . now ( ) + ACTIVITY_PING_INTERVAL ) ;
114148
115149 ws . addEventListener ( "message" , this . onAgentMessage ) ;
116- ws . addEventListener ( "close" , ( ) => this . closeSession ( ) ) ;
117- ws . addEventListener ( "error" , ( ) => this . closeSession ( ) ) ;
150+ ws . addEventListener ( "close" , ( ) => this . closeSession ( "agent_ws_close" ) ) ;
151+ ws . addEventListener ( "error" , ( ) => this . closeSession ( "agent_ws_error" ) ) ;
118152 }
119153
120154 handleBrowserWebSocket ( ws : WebSocket , sessionId : string ) {
@@ -181,8 +215,15 @@ export class BaseSession extends Container<Env> {
181215 } catch ( e ) {
182216 console . error ( "Container Start Error:" , e ) ;
183217 const errorMessage = `\u001b[31mFailed to start container: ${ e instanceof Error ? e . message : String ( e ) } \u001b[0m` ;
184- this . broadcastToBrowsers ( "LOGS" , [ errorMessage ] ) ;
185- await this . closeSession ( ) ;
218+ this . broadcastToBrowsers ( "LOGS" , [ errorMessage ] ) ;
219+ void notify . error ( this . env , {
220+ where : "startContainer" ,
221+ error : e ,
222+ sessionId,
223+ branch : this . branch ,
224+ context : this . obsContext ,
225+ } ) ;
226+ await this . closeSession ( "container_start_failed" ) ;
186227 }
187228 }
188229
@@ -204,12 +245,19 @@ export class BaseSession extends Container<Env> {
204245 void this . startContainer ( sessionId ) ;
205246 return ;
206247 }
207- await this . closeSession ( ) ;
248+ await this . closeSession ( "container_stopped" ) ;
208249 }
209250
210251 override async onError ( error : unknown ) : Promise < void > {
211252 console . error ( "Container Error:" , error ) ;
212- await this . closeSession ( ) ;
253+ void notify . error ( this . env , {
254+ where : "Container.onError" ,
255+ error,
256+ sessionId : this . ctx . id . name ,
257+ branch : this . branch ,
258+ context : this . obsContext ,
259+ } ) ;
260+ await this . closeSession ( "container_error" ) ;
213261 }
214262
215263 onAgentMessage = ( msg : MessageEvent ) => {
@@ -233,7 +281,7 @@ export class BaseSession extends Container<Env> {
233281 break ;
234282 case "AGENT_SHUTDOWN" :
235283 this . broadcastToBrowsers ( "LOGS" , [ "\u001b[31mAgent is shutting down...\u001b[0m" ] ) ;
236- this . closeSession ( ) ;
284+ void this . closeSession ( "agent_shutdown" ) ;
237285 break ;
238286 default :
239287 console . warn ( `Unknown message type from agent: ${ message . type } ` ) ;
@@ -254,7 +302,7 @@ export class BaseSession extends Container<Env> {
254302 if ( message . type === "CLOSE_SESSION" ) {
255303 if ( this . sessionState === "ACTIVE" ) {
256304 console . log ( "Browser requested session close" ) ;
257- this . closeSession ( ) ;
305+ void this . closeSession ( "clean" ) ;
258306 }
259307 return ;
260308 }
@@ -306,15 +354,16 @@ export class BaseSession extends Container<Env> {
306354 this . broadcastToBrowsers ( "SESSION_TIMER" , this . sessionTimerPayload ( ) ) ;
307355 }
308356
309- async closeSession ( ) {
357+ async closeSession ( reason : CloseReason ) {
310358 const stack = new Error ( ) . stack ;
311- console . trace ( "Closing session" , this . sessionState ) ;
359+ console . trace ( "Closing session" , this . sessionState , "reason:" , reason ) ;
312360 console . log ( stack )
313361
314362 if ( this . sessionState === "CLOSED" ) return ;
315363 this . sessionState = "CLOSED" ;
364+ const endedAt = Date . now ( ) ;
316365 if ( this . sessionMetadata ) {
317- this . sessionMetadata . endedAt = Date . now ( ) ;
366+ this . sessionMetadata . endedAt = endedAt ;
318367 }
319368
320369 if ( this . containerSocket ) {
@@ -334,6 +383,18 @@ export class BaseSession extends Container<Env> {
334383 catch ( e ) { console . error ( "Error stopping container:" , e ) ; }
335384
336385 await this . notifyQueueManagerOfClosure ( ) ;
386+
387+ void notify . sessionEnded ( this . env , {
388+ sessionId : this . ctx . id . name ?? "unknown" ,
389+ branch : this . branch ,
390+ reason,
391+ startedAt : this . sessionMetadata ?. startedAt ,
392+ endedAt,
393+ scriptCount : this . scriptCount ,
394+ logLineCount : this . logLineCount ,
395+ extensionGranted : this . extensionGranted ,
396+ context : this . obsContext ,
397+ } ) ;
337398 }
338399
339400 async notifyQueueManagerOfClosure ( ) {
@@ -357,7 +418,7 @@ export class BaseSession extends Container<Env> {
357418 if ( this . sessionState === "ACTIVE" ) {
358419 if ( this . sessionEndTime && Date . now ( ) >= this . sessionEndTime ) {
359420 console . log ( "[alarm] session time expired, closing" ) ;
360- await this . closeSession ( ) ;
421+ await this . closeSession ( "timer_expired" ) ;
361422 return ;
362423 }
363424 this . renewActivityTimeout ( ) ;
@@ -383,6 +444,13 @@ export class BaseSession extends Container<Env> {
383444 } catch ( e ) {
384445 console . error ( `Failed to flush logs for DO ${ this . ctx . id . name ! } :` , e ) ;
385446 this . logBuffer . unshift ( ...logsToFlush . trim ( ) . split ( "\n" ) ) ;
447+ void notify . error ( this . env , {
448+ where : "flushLogsToR2" ,
449+ error : e ,
450+ sessionId : this . ctx . id . name ,
451+ branch : this . branch ,
452+ context : this . obsContext ,
453+ } ) ;
386454 }
387455 }
388456
@@ -409,6 +477,13 @@ export class BaseSession extends Container<Env> {
409477 this . scriptBuffer = { } ;
410478 } catch ( e ) {
411479 console . error ( `Failed to flush session data for DO ${ this . ctx . id . name ! } :` , e ) ;
480+ void notify . error ( this . env , {
481+ where : "flushSessionToR2" ,
482+ error : e ,
483+ sessionId : this . ctx . id . name ,
484+ branch : this . branch ,
485+ context : this . obsContext ,
486+ } ) ;
412487 }
413488 }
414489
@@ -430,10 +505,18 @@ export class BaseSession extends Container<Env> {
430505}
431506
432507
433- export class GmodPublic extends BaseSession { }
434- export class GmodSixtyFour extends BaseSession { }
435- export class GmodPrerelease extends BaseSession { }
436- export class GmodDev extends BaseSession { }
508+ export class GmodPublic extends BaseSession {
509+ protected override branch = "public" ;
510+ }
511+ export class GmodSixtyFour extends BaseSession {
512+ protected override branch = "sixty-four" ;
513+ }
514+ export class GmodPrerelease extends BaseSession {
515+ protected override branch = "prerelease" ;
516+ }
517+ export class GmodDev extends BaseSession {
518+ protected override branch = "dev" ;
519+ }
437520
438521const MAX_SESSIONS_PER_IP = 2 ;
439522
@@ -502,6 +585,13 @@ export class QueueDO extends DurableObject<Env> {
502585 const clientIP = rawIP !== "unknown" ? await hashIP ( rawIP ) : "unknown" ;
503586
504587 if ( clientIP !== "unknown" && this . activeSessionCountForIP ( clientIP ) >= MAX_SESSIONS_PER_IP ) {
588+ const obsContext = parseContext ( request . headers . get ( OBS_CONTEXT_HEADER ) ) ;
589+ void notify . warning ( this . env , {
590+ title : "IP rate limit hit" ,
591+ description : `Tried to open a **${ sessionType } ** session past the per-IP limit of **${ MAX_SESSIONS_PER_IP } **.` ,
592+ context : obsContext ,
593+ } ) ;
594+
505595 return new Response ( JSON . stringify ( {
506596 status : "IP_LIMIT" ,
507597 limit : MAX_SESSIONS_PER_IP ,
@@ -638,9 +728,11 @@ export default {
638728 async fetch ( request : Request , env : Env ) : Promise < Response > {
639729 const url = new URL ( request . url ) ;
640730
731+ const forwarded = withObsHeader ( request ) ;
732+
641733 if ( url . pathname . startsWith ( "/api/" ) ) {
642734 const queueDO = env . QUEUE_DO . get ( env . QUEUE_DO . idFromName ( "global-queue" ) ) ;
643- return queueDO . fetch ( request ) ;
735+ return queueDO . fetch ( forwarded ) ;
644736 }
645737
646738 if ( url . pathname . startsWith ( "/ws/" ) ) {
@@ -660,10 +752,26 @@ export default {
660752
661753 const sessionDOId = sessionBinding . idFromName ( sessionId ) ;
662754 const stub = sessionBinding . get ( sessionDOId ) ;
663- return stub . fetch ( request ) ;
755+ return stub . fetch ( forwarded ) ;
664756 }
665757
666758 return new Response ( "Not Found" , { status : 404 } ) ;
667759 } ,
668760} ;
669761
762+ // Runs on the real request path — must never throw. Worst case: we log
763+ // and forward the unmodified request (context just won't be attached).
764+ function withObsHeader ( request : Request ) : Request {
765+ try {
766+ const obsContext = extractRequestContext ( request ) ;
767+ const headers = new Headers ( request . headers ) ;
768+ headers . set ( OBS_CONTEXT_HEADER , serializeContext ( obsContext ) ) ;
769+ return new Request ( request , { headers } ) ;
770+ } catch ( e ) {
771+ const name = e instanceof Error ? e . name : typeof e ;
772+ const msg = e instanceof Error ? e . message : String ( e ) ;
773+ console . error ( `[obs] withObsHeader failed, forwarding without context: ${ name } : ${ msg } ` ) ;
774+ return request ;
775+ }
776+ }
777+
0 commit comments