11import { DurableObject } from "cloudflare:workers" ;
22import type { SessionType } from "@glua/shared" ;
33import { MAX_SESSIONS_PER_IP , VALID_SESSION_TYPES } from "@glua/shared" ;
4- import { CAPACITY , QUEUE_TIMING } from "../constants" ;
4+ import { CAPACITY , QUEUE_TIMING , SESSION_TIMING } from "../constants" ;
55import type { Env } from "../env" ;
66import { type CapacitySnapshot , notify , OBS_CONTEXT_HEADER , parseContext } from "../observability" ;
77import { hashIP } from "../utils" ;
@@ -15,7 +15,9 @@ import {
1515 stripResolvedPrefix ,
1616 stripSessionPrefix ,
1717} from "./storage-keys" ;
18- import type { QueueEntry , ResolvedTicket } from "./types" ;
18+ import type { ActiveSession , QueueEntry , ResolvedTicket } from "./types" ;
19+
20+ const STALE_SESSION_CUTOFF = SESSION_TIMING . hardLimit + 60_000 ;
1921
2022/**
2123 * Global singleton that manages session allocation and the waiting queue
@@ -27,24 +29,25 @@ import type { QueueEntry, ResolvedTicket } from "./types";
2729 * Keyed as `idFromName("global-queue")` — there's exactly one of these
2830 */
2931export class SessionManager extends DurableObject < Env > {
30- private activeSessions : Map < string , SessionType > ;
31- private sessionIPs : Map < string , string > ;
32+ private activeSessions : Map < string , ActiveSession > ;
3233 private waitingQueue : QueueEntry [ ] ;
3334 private resolvedTickets : Map < string , ResolvedTicket > ;
3435
3536 constructor ( ctx : DurableObjectState , env : Env ) {
3637 super ( ctx , env ) ;
3738 this . activeSessions = new Map ( ) ;
38- this . sessionIPs = new Map ( ) ;
3939 this . waitingQueue = [ ] ;
4040 this . resolvedTickets = new Map ( ) ;
4141
4242 ctx . blockConcurrencyWhile ( async ( ) => {
43- const stored = await ctx . storage . list < { type : SessionType ; ip : string } > ( { prefix : SESSION_PREFIX } ) ;
43+ const stored = await ctx . storage . list < Partial < ActiveSession > > ( { prefix : SESSION_PREFIX } ) ;
4444 for ( const [ key , value ] of stored ) {
4545 const sessionId = stripSessionPrefix ( key ) ;
46- this . activeSessions . set ( sessionId , value . type ) ;
47- this . sessionIPs . set ( sessionId , value . ip ) ;
46+ this . activeSessions . set ( sessionId , {
47+ type : ( value . type ?? "public" ) as SessionType ,
48+ ip : value . ip ?? "unknown" ,
49+ createdAt : value . createdAt ?? 0 ,
50+ } ) ;
4851 }
4952
5053 const now = Date . now ( ) ;
@@ -115,10 +118,11 @@ export class SessionManager extends DurableObject<Env> {
115118 return Response . json ( { error : "Missing 'message' string in body" } , { status : 400 } ) ;
116119 }
117120
121+ this . pruneStaleSessions ( ) ;
118122 const entries = Array . from ( this . activeSessions . entries ( ) ) ;
119123 const results = await Promise . allSettled (
120- entries . map ( async ( [ sessionId , type ] ) => {
121- const binding = this . bindingForType ( type ) ;
124+ entries . map ( async ( [ sessionId , entry ] ) => {
125+ const binding = this . bindingForType ( entry . type ) ;
122126 if ( ! binding ) return ;
123127 const stub = binding . get ( binding . idFromName ( sessionId ) ) ;
124128 await stub . fetch ( "http://do/internal/broadcast" , {
@@ -246,11 +250,11 @@ export class SessionManager extends DurableObject<Env> {
246250
247251 private async handleSessionClosed ( request : Request ) : Promise < Response > {
248252 const { sessionId } = await request . json < { sessionId : string } > ( ) ;
249- const closedType = this . activeSessions . get ( sessionId ) ;
253+ const closedEntry = this . activeSessions . get ( sessionId ) ;
250254 await this . removeSession ( sessionId ) ;
251255
252- if ( closedType ) {
253- const idx = this . waitingQueue . findIndex ( ( w ) => w . sessionType === closedType ) ;
256+ if ( closedEntry ) {
257+ const idx = this . waitingQueue . findIndex ( ( w ) => w . sessionType === closedEntry . type ) ;
254258 if ( idx !== - 1 ) {
255259 const nextInLine = this . waitingQueue . splice ( idx , 1 ) [ 0 ] ;
256260 const newSessionId = crypto . randomUUID ( ) ;
@@ -275,9 +279,11 @@ export class SessionManager extends DurableObject<Env> {
275279 return Response . json ( { status : "not-found" } , { status : 404 } ) ;
276280 }
277281
278- if ( this . activeSessions . has ( sessionId ) ) {
279- const sessionType = this . activeSessions . get ( sessionId ) ! ;
280- return Response . json ( { status : "active" , sessionType } ) ;
282+ this . pruneStaleSessions ( ) ;
283+
284+ const active = this . activeSessions . get ( sessionId ) ;
285+ if ( active ) {
286+ return Response . json ( { status : "active" , sessionType : active . type } ) ;
281287 }
282288
283289 const logKey = `sessions/${ sessionId } /logs.log` ;
@@ -306,26 +312,28 @@ export class SessionManager extends DurableObject<Env> {
306312
307313 private activeSessionCountForIP ( ip : string ) : number {
308314 let count = 0 ;
309- for ( const [ sessionId , sessionIp ] of this . sessionIPs ) {
310- if ( sessionIp === ip && this . activeSessions . has ( sessionId ) ) count ++ ;
315+ for ( const entry of this . activeSessions . values ( ) ) {
316+ if ( entry . ip === ip ) count ++ ;
311317 }
312318 return count ;
313319 }
314320
315321 private activeCountForType ( type : string ) : number {
316322 let count = 0 ;
317- for ( const t of this . activeSessions . values ( ) ) {
318- if ( t === type ) count ++ ;
323+ for ( const entry of this . activeSessions . values ( ) ) {
324+ if ( entry . type === type ) count ++ ;
319325 }
320326 return count ;
321327 }
322328
323329 private hasCapacity ( type : SessionType ) : boolean {
330+ this . pruneStaleSessions ( ) ;
324331 const max = CAPACITY . maxPerType [ type ] ?? 2 ;
325332 return this . activeCountForType ( type ) < max && this . activeSessions . size < CAPACITY . maxTotal ;
326333 }
327334
328335 private snapshotFor ( branch : string ) : CapacitySnapshot {
336+ this . pruneStaleSessions ( ) ;
329337 return {
330338 branch,
331339 branchUsed : this . activeCountForType ( branch ) ,
@@ -336,17 +344,29 @@ export class SessionManager extends DurableObject<Env> {
336344 } ;
337345 }
338346
347+ private pruneStaleSessions ( ) : void {
348+ const cutoff = Date . now ( ) - STALE_SESSION_CUTOFF ;
349+ const stale : string [ ] = [ ] ;
350+ for ( const [ sessionId , entry ] of this . activeSessions ) {
351+ if ( entry . createdAt < cutoff ) stale . push ( sessionId ) ;
352+ }
353+ if ( stale . length === 0 ) return ;
354+ for ( const sessionId of stale ) {
355+ this . activeSessions . delete ( sessionId ) ;
356+ }
357+ this . notifyAsync ( this . ctx . storage . delete ( stale . map ( sessionKey ) ) . then ( ( ) => undefined ) ) ;
358+ }
359+
339360 // ── Storage helpers ──
340361
341362 private async persistSession ( sessionId : string , type : SessionType , ip : string ) {
342- this . activeSessions . set ( sessionId , type ) ;
343- this . sessionIPs . set ( sessionId , ip ) ;
344- await this . ctx . storage . put ( sessionKey ( sessionId ) , { type , ip } ) ;
363+ const entry : ActiveSession = { type, ip , createdAt : Date . now ( ) } ;
364+ this . activeSessions . set ( sessionId , entry ) ;
365+ await this . ctx . storage . put ( sessionKey ( sessionId ) , entry ) ;
345366 }
346367
347368 private async removeSession ( sessionId : string ) {
348369 this . activeSessions . delete ( sessionId ) ;
349- this . sessionIPs . delete ( sessionId ) ;
350370 await this . ctx . storage . delete ( sessionKey ( sessionId ) ) ;
351371 }
352372
0 commit comments