@@ -402,4 +402,318 @@ describe("runStaleSweepOnce — testcontainers", () => {
402402 }
403403 } ,
404404 ) ;
405+
406+ redisTest (
407+ "state survives process restart: a second state instance picks up the cursor and counts" ,
408+ { timeout : 30_000 } ,
409+ async ( { redisOptions } ) => {
410+ // This is the headline reason the sweep state is durable in Redis
411+ // instead of process-local — a webapp restart mid-cycle must not
412+ // re-emit the gauge as fresh-zero for previously-flagged envs nor
413+ // restart the cursor walk from scratch. Simulated here by closing
414+ // state1 (its Redis client quits cleanly) and constructing state2
415+ // against the same Redis. The cursor + counts that state1 wrote
416+ // are visible to state2 on its first tick.
417+ const buffer = new MollifierBuffer ( { redisOptions } ) ;
418+ const state1 = new MollifierStaleSweepState ( { redisOptions } ) ;
419+ try {
420+ await buffer . accept ( {
421+ runId : "run_a" ,
422+ envId : "env_a" ,
423+ orgId : "org_a" ,
424+ payload : JSON . stringify ( SNAPSHOT ) ,
425+ } ) ;
426+ await buffer . accept ( {
427+ runId : "run_b" ,
428+ envId : "env_b" ,
429+ orgId : "org_b" ,
430+ payload : JSON . stringify ( SNAPSHOT ) ,
431+ } ) ;
432+ const futureNow = Date . now ( ) + 5 * 60 * 1000 ;
433+ const cfg = { staleThresholdMs : 60 * 1000 , maxOrgsPerPass : 1 } ;
434+ const spies1 = spyDeps ( ) ;
435+
436+ // Tick 1 with state1: visits 1 of 2 orgs.
437+ await runStaleSweepOnce ( cfg , {
438+ ...spies1 . deps ,
439+ getBuffer : ( ) => buffer ,
440+ state : state1 ,
441+ now : ( ) => futureNow ,
442+ } ) ;
443+ expect ( spies1 . snapshots [ 0 ] . size ) . toBe ( 1 ) ;
444+ } finally {
445+ // Simulate webapp restart: state1's Redis client closes cleanly.
446+ await state1 . close ( ) ;
447+ }
448+
449+ // New process boots, constructs a fresh state pointing at the
450+ // same Redis. The cycle's frozen org_list, the cursor, and the
451+ // counts hash are all preserved — state2 picks up at the second
452+ // org of the cycle.
453+ const state2 = new MollifierStaleSweepState ( { redisOptions } ) ;
454+ try {
455+ const futureNow = Date . now ( ) + 5 * 60 * 1000 ;
456+ const cfg = { staleThresholdMs : 60 * 1000 , maxOrgsPerPass : 1 } ;
457+ const spies2 = spyDeps ( ) ;
458+
459+ await runStaleSweepOnce ( cfg , {
460+ ...spies2 . deps ,
461+ getBuffer : ( ) => buffer ,
462+ state : state2 ,
463+ now : ( ) => futureNow ,
464+ } ) ;
465+ // Snapshot now has BOTH envs: the one tick 1 flagged (still in
466+ // the counts hash from state1) plus the one tick 2 just flagged.
467+ // A non-durable design would show only the second.
468+ expect ( spies2 . snapshots [ 0 ] . size ) . toBe ( 2 ) ;
469+ } finally {
470+ await state2 . close ( ) ;
471+ await buffer . close ( ) ;
472+ }
473+ } ,
474+ ) ;
475+
476+ redisTest (
477+ "cycle wrap rebuilds the org list, so orgs that joined mid-cycle get visited on the next cycle" ,
478+ { timeout : 30_000 } ,
479+ async ( { redisOptions } ) => {
480+ // The docstring promises "orgs joining mid-cycle wait until the
481+ // next cycle to be visited." The mechanism is rebuildOrgList at
482+ // cursor=0: a fresh snapshot of buffer.listOrgs() replaces the
483+ // previous frozen LIST. Verified here by adding a third org
484+ // between cycles and asserting it shows up only in the next
485+ // cycle's snapshot.
486+ const buffer = new MollifierBuffer ( { redisOptions } ) ;
487+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
488+ try {
489+ await buffer . accept ( {
490+ runId : "run_init_a" ,
491+ envId : "env_init_a" ,
492+ orgId : "org_init_a" ,
493+ payload : JSON . stringify ( SNAPSHOT ) ,
494+ } ) ;
495+ await buffer . accept ( {
496+ runId : "run_init_b" ,
497+ envId : "env_init_b" ,
498+ orgId : "org_init_b" ,
499+ payload : JSON . stringify ( SNAPSHOT ) ,
500+ } ) ;
501+ const futureNow = Date . now ( ) + 5 * 60 * 1000 ;
502+ const spies = spyDeps ( ) ;
503+ const cfg = { staleThresholdMs : 60 * 1000 , maxOrgsPerPass : 10 } ;
504+ const baseDeps = {
505+ ...spies . deps ,
506+ getBuffer : ( ) => buffer ,
507+ state,
508+ now : ( ) => futureNow ,
509+ } ;
510+
511+ // Tick 1: cycle 1. Visits both initial orgs; cursor wraps to 0.
512+ await runStaleSweepOnce ( cfg , baseDeps ) ;
513+ expect ( spies . snapshots [ 0 ] . size ) . toBe ( 2 ) ;
514+
515+ // Mid-flight: a third org joins the buffer. It must NOT have
516+ // been part of cycle 1's frozen LIST.
517+ await buffer . accept ( {
518+ runId : "run_mid" ,
519+ envId : "env_mid" ,
520+ orgId : "org_mid" ,
521+ payload : JSON . stringify ( SNAPSHOT ) ,
522+ } ) ;
523+
524+ // Tick 2: cycle 2 begins (cursor was 0 after tick 1's wrap).
525+ // rebuildOrgList captures all 3 orgs; this tick visits all 3.
526+ const r2 = await runStaleSweepOnce ( cfg , baseDeps ) ;
527+ expect ( r2 . orgsScanned ) . toBe ( 3 ) ;
528+ expect ( spies . snapshots [ 1 ] . size ) . toBe ( 3 ) ;
529+ expect ( spies . snapshots [ 1 ] . has ( "env_mid" ) ) . toBe ( true ) ;
530+ } finally {
531+ await state . close ( ) ;
532+ await buffer . close ( ) ;
533+ }
534+ } ,
535+ ) ;
536+
537+ redisTest (
538+ "empty buffer (no orgs) advances cleanly with zero work and an empty snapshot" ,
539+ { timeout : 30_000 } ,
540+ async ( { redisOptions } ) => {
541+ // `mollifier:orgs` is empty (no entries ever accepted, or every
542+ // entry has been drained). The sweep must handle the boundary:
543+ // rebuildOrgList with [], readOrgListSlice returns total=0,
544+ // the org loop is skipped, and the cursor stays at 0 instead of
545+ // tripping the wrap math.
546+ const buffer = new MollifierBuffer ( { redisOptions } ) ;
547+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
548+ try {
549+ const spies = spyDeps ( ) ;
550+ const result = await runStaleSweepOnce (
551+ { staleThresholdMs : 60 * 1000 , maxOrgsPerPass : 10 } ,
552+ { ...spies . deps , getBuffer : ( ) => buffer , state } ,
553+ ) ;
554+ expect ( result ) . toEqual ( {
555+ orgsScanned : 0 ,
556+ envsScanned : 0 ,
557+ entriesScanned : 0 ,
558+ staleCount : 0 ,
559+ } ) ;
560+ expect ( spies . snapshots ) . toHaveLength ( 1 ) ;
561+ expect ( spies . snapshots [ 0 ] . size ) . toBe ( 0 ) ;
562+ // Cursor stayed at 0 — nothing to advance through.
563+ expect ( await state . readCursor ( ) ) . toBe ( 0 ) ;
564+ } finally {
565+ await state . close ( ) ;
566+ await buffer . close ( ) ;
567+ }
568+ } ,
569+ ) ;
570+
571+ redisTest (
572+ "buffer-null branch wipes the durable state so a re-enable starts fresh" ,
573+ { timeout : 30_000 } ,
574+ async ( { redisOptions } ) => {
575+ // The unit test above asserts the snapshot is empty when the
576+ // buffer is null, but doesn't verify the durable state was
577+ // actually cleared. Without clearAll the next re-enable would
578+ // resume on a stale cursor + carry over a stale counts hash.
579+ const buffer = new MollifierBuffer ( { redisOptions } ) ;
580+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
581+ try {
582+ await buffer . accept ( {
583+ runId : "run_seed" ,
584+ envId : "env_seed" ,
585+ orgId : "org_seed" ,
586+ payload : JSON . stringify ( SNAPSHOT ) ,
587+ } ) ;
588+ const futureNow = Date . now ( ) + 5 * 60 * 1000 ;
589+ const cfg = { staleThresholdMs : 60 * 1000 , maxOrgsPerPass : 10 } ;
590+ const spies = spyDeps ( ) ;
591+
592+ // Tick 1: populate state.
593+ await runStaleSweepOnce ( cfg , {
594+ ...spies . deps ,
595+ getBuffer : ( ) => buffer ,
596+ state,
597+ now : ( ) => futureNow ,
598+ } ) ;
599+ expect ( spies . snapshots [ 0 ] . size ) . toBe ( 1 ) ;
600+ expect ( ( await state . readAllEnvStaleCounts ( ) ) . size ) . toBe ( 1 ) ;
601+
602+ // Tick 2: mollifier flips OFF — getBuffer returns null. The
603+ // sweep must clear the durable state.
604+ await runStaleSweepOnce ( cfg , {
605+ ...spies . deps ,
606+ getBuffer : ( ) => null ,
607+ state,
608+ } ) ;
609+ expect ( spies . snapshots [ 1 ] . size ) . toBe ( 0 ) ;
610+ expect ( ( await state . readAllEnvStaleCounts ( ) ) . size ) . toBe ( 0 ) ;
611+ expect ( await state . readCursor ( ) ) . toBe ( 0 ) ;
612+ } finally {
613+ await state . close ( ) ;
614+ await buffer . close ( ) ;
615+ }
616+ } ,
617+ ) ;
618+ } ) ;
619+
620+ describe ( "MollifierStaleSweepState — direct unit tests" , ( ) => {
621+ redisTest ( "readCursor returns 0 when the key is absent" , { timeout : 20_000 } , async ( { redisOptions } ) => {
622+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
623+ try {
624+ expect ( await state . readCursor ( ) ) . toBe ( 0 ) ;
625+ } finally {
626+ await state . close ( ) ;
627+ }
628+ } ) ;
629+
630+ redisTest (
631+ "writeCursor + readCursor round-trip; readCursor parses a non-numeric value as 0" ,
632+ { timeout : 20_000 } ,
633+ async ( { redisOptions } ) => {
634+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
635+ try {
636+ await state . writeCursor ( 42 ) ;
637+ expect ( await state . readCursor ( ) ) . toBe ( 42 ) ;
638+
639+ // Defensive: a corrupted/garbage value must not throw or
640+ // propagate NaN into the sweep's cursor arithmetic.
641+ await state [ "redis" ] . set ( "mollifier:stale_sweep:cursor" , "not-a-number" ) ;
642+ expect ( await state . readCursor ( ) ) . toBe ( 0 ) ;
643+ } finally {
644+ await state . close ( ) ;
645+ }
646+ } ,
647+ ) ;
648+
649+ redisTest (
650+ "rebuildOrgList replaces the previous list (DEL + RPUSH, in order)" ,
651+ { timeout : 20_000 } ,
652+ async ( { redisOptions } ) => {
653+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
654+ try {
655+ await state . rebuildOrgList ( [ "org_a" , "org_b" , "org_c" ] ) ;
656+ let slice = await state . readOrgListSlice ( 0 , 10 ) ;
657+ expect ( slice . total ) . toBe ( 3 ) ;
658+ expect ( slice . orgs ) . toEqual ( [ "org_a" , "org_b" , "org_c" ] ) ;
659+
660+ // Replacement, not append.
661+ await state . rebuildOrgList ( [ "org_x" ] ) ;
662+ slice = await state . readOrgListSlice ( 0 , 10 ) ;
663+ expect ( slice . total ) . toBe ( 1 ) ;
664+ expect ( slice . orgs ) . toEqual ( [ "org_x" ] ) ;
665+
666+ // Empty rebuild leaves the list empty (DEL fires, no RPUSH).
667+ await state . rebuildOrgList ( [ ] ) ;
668+ slice = await state . readOrgListSlice ( 0 , 10 ) ;
669+ expect ( slice . total ) . toBe ( 0 ) ;
670+ expect ( slice . orgs ) . toEqual ( [ ] ) ;
671+ } finally {
672+ await state . close ( ) ;
673+ }
674+ } ,
675+ ) ;
676+
677+ redisTest (
678+ "setEnvStaleCount HSETs when count > 0 and HDELs when count === 0" ,
679+ { timeout : 20_000 } ,
680+ async ( { redisOptions } ) => {
681+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
682+ try {
683+ await state . setEnvStaleCount ( "env_a" , 3 ) ;
684+ await state . setEnvStaleCount ( "env_b" , 1 ) ;
685+ let counts = await state . readAllEnvStaleCounts ( ) ;
686+ expect ( Object . fromEntries ( counts ) ) . toEqual ( { env_a : 3 , env_b : 1 } ) ;
687+
688+ // Zero clears the field (HDEL), not stores 0.
689+ await state . setEnvStaleCount ( "env_a" , 0 ) ;
690+ counts = await state . readAllEnvStaleCounts ( ) ;
691+ expect ( Object . fromEntries ( counts ) ) . toEqual ( { env_b : 1 } ) ;
692+ expect ( counts . has ( "env_a" ) ) . toBe ( false ) ;
693+ } finally {
694+ await state . close ( ) ;
695+ }
696+ } ,
697+ ) ;
698+
699+ redisTest (
700+ "clearAll DELs cursor, org_list, and counts in one call" ,
701+ { timeout : 20_000 } ,
702+ async ( { redisOptions } ) => {
703+ const state = new MollifierStaleSweepState ( { redisOptions } ) ;
704+ try {
705+ await state . writeCursor ( 7 ) ;
706+ await state . rebuildOrgList ( [ "org_a" , "org_b" ] ) ;
707+ await state . setEnvStaleCount ( "env_a" , 5 ) ;
708+
709+ await state . clearAll ( ) ;
710+
711+ expect ( await state . readCursor ( ) ) . toBe ( 0 ) ;
712+ expect ( ( await state . readOrgListSlice ( 0 , 10 ) ) . total ) . toBe ( 0 ) ;
713+ expect ( ( await state . readAllEnvStaleCounts ( ) ) . size ) . toBe ( 0 ) ;
714+ } finally {
715+ await state . close ( ) ;
716+ }
717+ } ,
718+ ) ;
405719} ) ;
0 commit comments