@@ -79,25 +79,40 @@ def _on_message_handler(self, client, userdata, message):
7979 log .exception ("Failed parsing MQTT data on %s as JSON" , message .topic )
8080 return
8181
82+ log .debug ("MQTT payload: %s, %s" , message .topic , j )
83+
8284 if message .topic == "/t_ms" :
8385 # Update sync_token when received
8486 # This is received in the first message after we've created a messenger
8587 # sync queue.
8688 if "syncToken" in j and "firstDeltaSeqId" in j :
8789 self ._sync_token = j ["syncToken" ]
8890 self ._sequence_id = j ["firstDeltaSeqId" ]
91+ return
8992
9093 # Update last sequence id when received
9194 if "lastIssuedSeqId" in j :
9295 self ._sequence_id = j ["lastIssuedSeqId" ]
9396
9497 if "errorCode" in j :
95- # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
96- # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
97- log .error ("MQTT error code %s received" , j ["errorCode" ])
98- # TODO: Consider resetting the sync_token and sequence ID here?
99-
100- log .debug ("MQTT payload: %s, %s" , message .topic , j )
98+ error = j ["errorCode" ]
99+ # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
100+ if error in ("ERROR_QUEUE_NOT_FOUND" , "ERROR_QUEUE_OVERFLOW" ):
101+ # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
102+ # much time passed, or that it was simply missing
103+ # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
104+ # the desired events could not be retrieved
105+ log .error (
106+ "The MQTT listener was disconnected for too long,"
107+ " events may have been lost"
108+ )
109+ self ._sync_token = None
110+ self ._sequence_id = self ._fetch_sequence_id (self ._state )
111+ self ._messenger_queue_publish ()
112+ # TODO: Signal to the user that they should reload their data!
113+ return
114+ log .error ("MQTT error code %s received" , error )
115+ return
101116
102117 # Call the external callback
103118 self ._on_message (message .topic , j )
@@ -129,6 +144,9 @@ def _on_connect_handler(self, client, userdata, flags, rc):
129144 if rc != 0 :
130145 return # Don't try to send publish if the connection failed
131146
147+ self ._messenger_queue_publish ()
148+
149+ def _messenger_queue_publish (self ):
132150 # configure receiving messages.
133151 payload = {
134152 "sync_api_version" : 10 ,
0 commit comments