@@ -18,7 +18,8 @@ export class FirestoreWatcher {
1818 private retryCount = 0 ;
1919 private readonly maxRetries : number = 10 ; // Increased retries
2020 private readonly retryDelay : number = 1000 ; // 1 second
21- private isFirstSnapshot = true ; // Skip the initial snapshot that contains all existing documents
21+ private watcherStartTime : number = Date . now ( ) ; // Track when watcher starts
22+ private firstSnapshotReceived = false ; // Track if we've received the first snapshot
2223
2324 // Track processed document IDs to prevent duplicates
2425 private processedIds = new Set < string > ( ) ;
@@ -57,23 +58,53 @@ export class FirestoreWatcher {
5758
5859 // Reset stopped flag when starting
5960 this . stopped = false ;
61+
62+ // Reset watcher start time
63+ this . watcherStartTime = Date . now ( ) ;
64+ this . firstSnapshotReceived = false ;
6065
6166 try {
62- // Set up real-time listener (only for new changes, not existing documents)
67+ // Set up real-time listener
6368 this . unsubscribe = this . collection . onSnapshot (
6469 async ( snapshot ) => {
6570 // Update last snapshot time for health monitoring
6671 this . lastSnapshotTime = Date . now ( ) ;
6772
68- // Skip the first snapshot which contains all existing documents
69- if ( this . isFirstSnapshot ) {
70- console . log ( `Skipping initial snapshot for ${ collectionPath } (contains all existing documents)` ) ;
71- this . isFirstSnapshot = false ;
73+ // On first snapshot, only skip documents that were created/modified BEFORE watcher started
74+ // This ensures we don't miss any new documents created right as the watcher starts
75+ if ( ! this . firstSnapshotReceived ) {
76+ console . log ( `First snapshot received for ${ collectionPath } with ${ snapshot . size } documents` ) ;
77+ this . firstSnapshotReceived = true ;
78+
79+ // Process only documents modified AFTER watcher start time
80+ const recentChanges = snapshot . docChanges ( ) . filter ( ( change ) => {
81+ const doc = change . doc ;
82+ const data = doc . data ( ) ;
83+
84+ // Check if document was modified after watcher started
85+ // Use updatedAt if available, otherwise createdAt
86+ const timestamp = data . updatedAt || data . createdAt ;
87+ if ( timestamp && timestamp . toMillis ) {
88+ const docTime = timestamp . toMillis ( ) ;
89+ return docTime >= this . watcherStartTime ;
90+ }
91+
92+ // If no timestamp, process it to be safe
93+ return true ;
94+ } ) ;
95+
96+ if ( recentChanges . length > 0 ) {
97+ console . log ( `Processing ${ recentChanges . length } recent changes from first snapshot` ) ;
98+ await this . processChanges ( recentChanges ) ;
99+ } else {
100+ console . log ( `No recent changes in first snapshot, skipping` ) ;
101+ }
102+
103+ this . retryCount = 0 ;
72104 return ;
73105 }
74106
75- // Don't skip snapshots - queue them instead to handle large databases
76- // Process snapshot asynchronously without blocking new snapshots
107+ // For subsequent snapshots, process all changes normally
77108 this . processSnapshot ( snapshot ) . catch ( ( error ) => {
78109 console . error ( "Error processing snapshot:" , error ) ;
79110 this . handleError ( error ) ;
@@ -205,8 +236,9 @@ export class FirestoreWatcher {
205236 this . unsubscribe = null ;
206237 }
207238
208- // Reset first snapshot flag
209- this . isFirstSnapshot = true ;
239+ // Reset watcher state
240+ this . watcherStartTime = Date . now ( ) ;
241+ this . firstSnapshotReceived = false ;
210242 this . lastSnapshotTime = Date . now ( ) ;
211243
212244 // Reset reconnection attempt counter on successful reconnect
@@ -323,8 +355,9 @@ export class FirestoreWatcher {
323355 this . unsubscribe = null ;
324356 }
325357
326- // Reset first snapshot flag when restarting
327- this . isFirstSnapshot = true ;
358+ // Reset watcher state when restarting
359+ this . watcherStartTime = Date . now ( ) ;
360+ this . firstSnapshotReceived = false ;
328361 this . lastSnapshotTime = Date . now ( ) ;
329362
330363 try {
@@ -343,8 +376,10 @@ export class FirestoreWatcher {
343376 }
344377 }
345378
346- private async processSnapshot ( snapshot : QuerySnapshot ) : Promise < void > {
347- const changes = snapshot . docChanges ( ) ;
379+ /**
380+ * Processes an array of document changes
381+ */
382+ private async processChanges ( changes : DocumentChange [ ] ) : Promise < void > {
348383 const collectionPath =
349384 this . collection instanceof CollectionReference
350385 ? this . collection . path
@@ -412,6 +447,12 @@ export class FirestoreWatcher {
412447 await Promise . all ( processPromises ) ;
413448 }
414449
450+ private async processSnapshot ( snapshot : QuerySnapshot ) : Promise < void > {
451+ const changes = snapshot . docChanges ( ) ;
452+ await this . processChanges ( changes ) ;
453+ }
454+
455+
415456 private async handleCreateOrUpdate (
416457 doc : FirebaseFirestore . QueryDocumentSnapshot < DocumentData > ,
417458 data : DocumentData
0 commit comments