@@ -2,7 +2,7 @@ import { DestroyRef, Injectable, Optional } from '@angular/core';
22import { filter , race , take , timer } from 'rxjs' ;
33import { AppError } from 'xforge-common/exception-handling.service' ;
44import { FileService } from './file.service' ;
5- import { DocSubscriberInfo , FETCH_WITHOUT_SUBSCRIBE , RealtimeDoc } from './models/realtime-doc' ;
5+ import { DocSubscriberInfo , DocSubscription , FETCH_WITHOUT_SUBSCRIBE , RealtimeDoc } from './models/realtime-doc' ;
66import { RealtimeQuery } from './models/realtime-query' ;
77import { OfflineStore } from './offline-store' ;
88import { QueryParameters } from './query-parameters' ;
@@ -13,6 +13,10 @@ function getDocKey(collection: string, id: string): string {
1313 return `${ collection } :${ id } ` ;
1414}
1515
16+ export function getCollectionFromId ( id : string ) : string {
17+ return id . split ( ':' ) [ 0 ] ;
18+ }
19+
1620/**
1721 * A no-op DestroyRef that is not associated with any component.
1822 * This may be useful to satisfy a subscribe query in testing or if a query is not associated with a component.
@@ -21,6 +25,8 @@ export const noopDestroyRef: DestroyRef = {
2125 onDestroy : _callback => ( ) => { }
2226} ;
2327
28+ const NEVER_RESOLVING_DOC_SUBSCRIPTION = new DocSubscription ( 'FETCH_WITHOUT_SUBSCRIBE' , noopDestroyRef ) ;
29+
2430/**
2531 * The realtime service is responsible for retrieving and mutating realtime data models. This service transparently
2632 * manages the interaction between three data sources: a memory cache, a local database (IndexedDB), and a realtime
@@ -49,6 +55,32 @@ export class RealtimeService {
4955 const doc : RealtimeDoc | undefined = this . docs . get ( getDocKey ( collection , docId ) ) ;
5056 await doc ?. updateOfflineData ( ) ;
5157 } ) ;
58+ this . scheduleNextGarbageCollection ( ) ;
59+ }
60+
61+ runGarbageCollection ( ) : void {
62+ let disposedDocs = 0 ;
63+ const initialDocsCount = this . docs . size ;
64+ for ( const doc of this . docs . values ( ) ) {
65+ if ( doc . activeDocSubscriptionsCount === 0 ) {
66+ void doc . dispose ( ) ;
67+ disposedDocs ++ ;
68+ }
69+ }
70+ const remainingDocsCount = initialDocsCount - disposedDocs ;
71+ console . log ( `Garbage collection: disposed ${ disposedDocs } documents. ${ remainingDocsCount } documents remaining.` ) ;
72+
73+ this . scheduleNextGarbageCollection ( ) ;
74+ }
75+
76+ scheduleNextGarbageCollection ( ) : void {
77+ setTimeout ( ( ) => {
78+ if ( window . requestIdleCallback == null ) {
79+ this . runGarbageCollection ( ) ;
80+ } else {
81+ window . requestIdleCallback ( ( ) => this . runGarbageCollection ( ) , { timeout : 5000 } ) ;
82+ }
83+ } , 1000 ) ;
5284 }
5385
5486 get totalDocCount ( ) : number {
@@ -70,7 +102,7 @@ export class RealtimeService {
70102 [ key : string ] : { docs : number ; subscribers : number ; activeDocSubscriptionsCount : number } ;
71103 } = { } ;
72104 for ( const [ id , doc ] of this . docs . entries ( ) ) {
73- const collection = id . split ( ':' ) [ 0 ] ;
105+ const collection = getCollectionFromId ( id ) ;
74106 countsByCollection [ collection ] ??= { docs : 0 , subscribers : 0 , activeDocSubscriptionsCount : 0 } ;
75107 countsByCollection [ collection ] . docs ++ ;
76108 countsByCollection [ collection ] . subscribers += doc . docSubscriptionsCount ;
@@ -82,7 +114,7 @@ export class RealtimeService {
82114 get subscriberCountsByContext ( ) : { [ key : string ] : { [ key : string ] : { all : number ; active : number } } } {
83115 const countsByContext : { [ key : string ] : { [ key : string ] : { all : number ; active : number } } } = { } ;
84116 for ( const [ id , doc ] of this . docs . entries ( ) ) {
85- const collection = id . split ( ':' ) [ 0 ] ;
117+ const collection = getCollectionFromId ( id ) ;
86118 countsByContext [ collection ] ??= { } ;
87119 for ( const subscriber of doc . docSubscriptions ) {
88120 countsByContext [ collection ] [ subscriber . callerContext ] ??= { all : 0 , active : 0 } ;
@@ -112,7 +144,11 @@ export class RealtimeService {
112144 }
113145 this . docs . set ( key , doc ) ;
114146 }
115- if ( subscriber !== FETCH_WITHOUT_SUBSCRIBE ) doc . addSubscriber ( subscriber ) ;
147+ if ( subscriber !== FETCH_WITHOUT_SUBSCRIBE ) {
148+ doc . addSubscriber ( subscriber ) ;
149+ } else {
150+ doc . addSubscriber ( NEVER_RESOLVING_DOC_SUBSCRIPTION ) ;
151+ }
116152
117153 return doc as T ;
118154 }
@@ -138,18 +174,6 @@ export class RealtimeService {
138174 return doc ;
139175 }
140176
141- // /**
142- // * Gets the real-time doc with the specified id without subscribing to remote changes (well, it actually does
143- // * subscribe to remote changes, but marks it as not needing to be kept around).
144- // *
145- // * @param {string } collection The collection name.
146- // * @param {string } id The id.
147- // * @returns {Promise<T> } The real-time doc.
148- // */
149- // async fetch<T extends RealtimeDoc>(collection: string, id: string): Promise<T> {
150- // return await this.subscribe<T>(collection, id, FETCH_WITHOUT_SUBSCRIBE);
151- // }
152-
153177 async onlineFetch < T extends RealtimeDoc > ( collection : string , id : string , subscriber : DocSubscriberInfo ) : Promise < T > {
154178 const doc = this . get < T > ( collection , id , subscriber ) ;
155179 await doc . onlineFetch ( ) ;
0 commit comments