@@ -38,35 +38,35 @@ class Client {
3838 if ( ! / ^ w s s ? : \/ \/ / . test ( url ) ) {
3939 url = `ws://${ url } ` ;
4040 }
41-
41+
4242 this . reqId = - 1 ;
4343 this . autoReconnect = autoReconnect ;
4444 this . enableTimeSync = true ; // Time synchronization is enabled by default.
45-
45+
4646 this . isOpen = false ;
4747 this . queuedRequests = { } ;
4848 this . storedPromises = { } ;
4949 this . nameToId = { } ;
5050 this . idToName = { } ;
51-
51+
5252 // Mapping for signal types (in case we need to interpret values).
5353 this . nameToType = { } ;
54-
54+
5555 // Time-diff related
5656 this . timeDiff = 0 ;
5757 this . timeReceived = null ;
5858 this . lastTimeRequest = Date . now ( ) / 1000 ;
5959 this . haveSentQueuedReq = false ;
6060 this . roundTripTimes = { } ;
61-
61+
6262 // Initialize the cache for sender tags and pending tag requests.
6363 this . senderTags = { } ; // Cache for event sender tags (keyed by sender)
6464 this . pendingSenderTags = { } ; // Holds pending promises for sender tags
65-
65+
6666 // Create the WebSocket connection
6767 this . ws = this . _connect ( url ) ;
6868 }
69-
69+
7070
7171 /**
7272 * Enable or disable time synchronization with the server.
@@ -298,42 +298,42 @@ class Client {
298298 * @param {Object } query - A simple plain object representing the EventQuery.
299299 * @returns {Promise<Array> } Resolves with an array of event objects.
300300 */
301- // Modified requestEvents() to wait for missing sender tag info.
302- requestEvents ( query ) {
303- this . _timeRequest ( ) ;
304- const requestId = this . _getRequestId ( ) ;
305- const eventQuery = this . _buildEventQuery ( query ) ;
306- if ( ! this . isOpen ) {
307- this . queuedRequests [ requestId ] = { type : "events" , query : eventQuery } ;
308- } else {
309- this . _sendEventsRequest ( requestId , eventQuery ) ;
310- }
311- return new Promise ( ( resolve , reject ) => {
312- this . storedPromises [ requestId ] = { resolve, reject } ;
313- } )
314- . then ( events => {
315- // Collect the unique sender names from events that lack cached tags.
316- const missingSenders = Array . from ( new Set (
317- events
318- . filter ( evt => ! this . senderTags [ evt . sender ] )
319- . map ( evt => evt . sender )
320- ) ) ;
321-
322- if ( missingSenders . length === 0 ) {
323- return events ;
301+ // Modified requestEvents() to wait for missing sender tag info.
302+ requestEvents ( query ) {
303+ this . _timeRequest ( ) ;
304+ const requestId = this . _getRequestId ( ) ;
305+ const eventQuery = this . _buildEventQuery ( query ) ;
306+ if ( ! this . isOpen ) {
307+ this . queuedRequests [ requestId ] = { type : "events" , query : eventQuery } ;
308+ } else {
309+ this . _sendEventsRequest ( requestId , eventQuery ) ;
324310 }
325- // Request tag info for all missing senders.
326- return Promise . all (
327- missingSenders . map ( sender => this . getSenderTags ( sender ) )
328- ) . then ( ( ) => {
329- // Attach tags to events after tag info is available.
330- events . forEach ( evt => {
331- evt . tags = this . senderTags [ evt . sender ] ;
311+ return new Promise ( ( resolve , reject ) => {
312+ this . storedPromises [ requestId ] = { resolve, reject } ;
313+ } )
314+ . then ( events => {
315+ // Collect the unique sender names from events that lack cached tags.
316+ const missingSenders = Array . from ( new Set (
317+ events
318+ . filter ( evt => ! this . senderTags [ evt . sender ] )
319+ . map ( evt => evt . sender )
320+ ) ) ;
321+
322+ if ( missingSenders . length === 0 ) {
323+ return events ;
324+ }
325+ // Request tag info for all missing senders.
326+ return Promise . all (
327+ missingSenders . map ( sender => this . getSenderTags ( sender ) )
328+ ) . then ( ( ) => {
329+ // Attach tags to events after tag info is available.
330+ events . forEach ( evt => {
331+ evt . tags = this . senderTags [ evt . sender ] ;
332+ } ) ;
333+ return events ;
334+ } ) ;
332335 } ) ;
333- return events ;
334- } ) ;
335- } ) ;
336- }
336+ }
337337
338338 /**
339339 * Request a count of events that match the given query.
@@ -437,18 +437,18 @@ requestEvents(query) {
437437 return s ;
438438 }
439439
440- /**
441- * Retrieves the tags associated with a given sender.
442- *
443- * This method checks if the tags for the specified sender are already cached. If so, it returns a
444- * resolved promise with the cached tags. Otherwise, it initializes a pending promise for the sender,
445- * sends a request for the sender's tags using `_sendEventSenderTagsRequest`, and returns a promise that
446- * resolves when the tags are received. If no response is received within 5000 ms, it falls back to resolving
447- * with an empty object.
448- *
449- * @param {string } sender - The identifier of the event sender.
450- * @returns {Promise<Object> } A promise that resolves with an object representing the tags for the sender.
451- */
440+ /**
441+ * Retrieves the tags associated with a given sender.
442+ *
443+ * This method checks if the tags for the specified sender are already cached. If so, it returns a
444+ * resolved promise with the cached tags. Otherwise, it initializes a pending promise for the sender,
445+ * sends a request for the sender's tags using `_sendEventSenderTagsRequest`, and returns a promise that
446+ * resolves when the tags are received. If no response is received within 5000 ms, it falls back to resolving
447+ * with an empty object.
448+ *
449+ * @param {string } sender - The identifier of the event sender.
450+ * @returns {Promise<Object> } A promise that resolves with an object representing the tags for the sender.
451+ */
452452 getSenderTags ( sender ) {
453453 if ( this . senderTags && this . senderTags [ sender ] ) {
454454 return Promise . resolve ( this . senderTags [ sender ] ) ;
@@ -458,16 +458,8 @@ requestEvents(query) {
458458 this . pendingSenderTags [ sender ] = [ ] ;
459459 this . _sendEventSenderTagsRequest ( sender ) ;
460460 }
461- return new Promise ( resolve => {
462- this . pendingSenderTags [ sender ] . push ( resolve ) ;
463- // Increase timeout to 5000 ms to wait longer for tag info.
464- setTimeout ( ( ) => {
465- if ( this . pendingSenderTags [ sender ] ) {
466- this . senderTags [ sender ] = { } ; // Fallback to empty object.
467- this . pendingSenderTags [ sender ] . forEach ( fn => fn ( { } ) ) ;
468- delete this . pendingSenderTags [ sender ] ;
469- }
470- } , 5000 ) ;
461+ return new Promise ( ( resolve , reject ) => {
462+ this . pendingSenderTags [ sender ] . push ( { resolve, reject } ) ;
471463 } ) ;
472464 }
473465
@@ -497,14 +489,20 @@ requestEvents(query) {
497489 if ( ! error ) {
498490 error = new Error ( "Something went wrong" ) ;
499491 }
500- if ( ! this . autoReconnect ) {
501- for ( const key in this . storedPromises ) {
502- this . storedPromises [ key ] . reject ( error ) ;
503- }
504- this . storedPromises = { } ;
505- this . queuedRequests = { } ;
492+ // Reject all stored promises.
493+ for ( const key in this . storedPromises ) {
494+ this . storedPromises [ key ] . reject ( error ) ;
495+ }
496+ this . storedPromises = { } ;
497+ this . queuedRequests = { } ;
498+
499+ // Reject any pending sender tag promises.
500+ for ( const sender in this . pendingSenderTags ) {
501+ this . pendingSenderTags [ sender ] . forEach ( promiseObj => promiseObj . reject ( error ) ) ;
502+ delete this . pendingSenderTags [ sender ] ;
506503 }
507504 }
505+
508506
509507 _onClose ( ws ) {
510508 this . isOpen = false ;
@@ -663,7 +661,7 @@ requestEvents(query) {
663661 }
664662 break ;
665663 }
666-
664+
667665
668666 case Container . Type . eCountEventsResponse : {
669667 if ( this . storedPromises [ data . countEventsResponse . requestId ] ) {
@@ -673,7 +671,7 @@ requestEvents(query) {
673671 }
674672 break ;
675673 }
676-
674+
677675 case Container . Type . eEventSenderTagsResponse : {
678676 // Get the mapping of sender names to TagMap objects.
679677 const tagsMapping = data . eventSenderTagsResponse . senderTags ;
@@ -683,13 +681,14 @@ requestEvents(query) {
683681 this . senderTags [ sender ] = tags ;
684682 // Resolve any pending promises waiting for tags for this sender.
685683 if ( this . pendingSenderTags [ sender ] ) {
686- this . pendingSenderTags [ sender ] . forEach ( resolveFn => resolveFn ( tags ) ) ;
684+ this . pendingSenderTags [ sender ] . forEach ( promiseObj => promiseObj . resolve ( tags ) ) ;
687685 delete this . pendingSenderTags [ sender ] ;
688686 }
689687 }
690688 break ;
691689 }
692-
690+
691+
693692 default :
694693 console . error ( "Unknown message type" , data . messageType ) ;
695694 }
@@ -945,7 +944,7 @@ requestEvents(query) {
945944 container . countEventsRequest = { requestId, query } ;
946945 const buffer = Container . encode ( container ) . finish ( ) ;
947946 this . ws . send ( buffer ) ;
948- }
947+ }
949948
950949 _sendEventSenderTagsRequest ( sender ) {
951950 const container = Container . create ( ) ;
0 commit comments