@@ -186,6 +186,178 @@ def _sql_literal(value: Any) -> str:
186186 return "'" + s .replace ("'" , "''" ) + "'"
187187
188188
189+ def _pick_case_insensitive_value (item : Any , * keys : str ) -> Any :
190+ if not isinstance (item , dict ):
191+ return None
192+ for key in keys :
193+ if key in item and item [key ] is not None :
194+ return item [key ]
195+ key_lc = str (key or "" ).strip ().lower ()
196+ for actual_key , actual_value in item .items ():
197+ if str (actual_key or "" ).strip ().lower () == key_lc and actual_value is not None :
198+ return actual_value
199+ return None
200+
201+
202+ def _table_exists_case_insensitive (conn : sqlite3 .Connection , table_name : str ) -> bool :
203+ try :
204+ row = conn .execute (
205+ "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower(?) LIMIT 1" ,
206+ (str (table_name or "" ).strip (),),
207+ ).fetchone ()
208+ return bool (row )
209+ except Exception :
210+ return False
211+
212+
213+ def _ensure_output_name2id_table (conn : sqlite3 .Connection ) -> bool :
214+ if _table_exists_case_insensitive (conn , "Name2Id" ):
215+ return True
216+ try :
217+ conn .execute (
218+ """
219+ CREATE TABLE IF NOT EXISTS Name2Id (
220+ user_name TEXT,
221+ is_session INTEGER DEFAULT 1
222+ )
223+ """
224+ )
225+ conn .commit ()
226+ return True
227+ except Exception :
228+ return False
229+
230+
231+ def _best_effort_upsert_output_name2id_rows (
232+ conn : sqlite3 .Connection ,
233+ * ,
234+ account_name : str ,
235+ rows : list [dict [str , Any ]],
236+ ) -> bool :
237+ if not rows :
238+ return _table_exists_case_insensitive (conn , "Name2Id" )
239+ if not _ensure_output_name2id_table (conn ):
240+ return False
241+ try :
242+ conn .execute (
243+ "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)" ,
244+ (str (account_name or "" ).strip (), 1 ),
245+ )
246+ except Exception :
247+ pass
248+
249+ wrote = False
250+ for row in rows :
251+ try :
252+ rid = int (row .get ("real_sender_id" ) or 0 )
253+ except Exception :
254+ rid = 0
255+ username = str (row .get ("sender_username" ) or "" ).strip ()
256+ if rid <= 0 or not username :
257+ continue
258+ try :
259+ conn .execute (
260+ "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)" ,
261+ (rid , username , 1 ),
262+ )
263+ wrote = True
264+ except Exception :
265+ continue
266+
267+ if wrote :
268+ try :
269+ conn .commit ()
270+ except Exception :
271+ return False
272+ return True
273+
274+
275+ def _sync_output_name2id_from_live (
276+ conn : sqlite3 .Connection ,
277+ * ,
278+ rt_conn : Any ,
279+ msg_db_path_real : Path ,
280+ ) -> dict [str , Any ]:
281+ if not _ensure_output_name2id_table (conn ):
282+ return {"status" : "missing_local_table" , "rows" : 0 }
283+
284+ local_row = conn .execute ("SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id" ).fetchone ()
285+ try :
286+ local_count = int ((local_row ["c" ] if isinstance (local_row , sqlite3 .Row ) else local_row [0 ]) or 0 )
287+ except Exception :
288+ local_count = 0
289+ try :
290+ local_max = int ((local_row ["mx" ] if isinstance (local_row , sqlite3 .Row ) else local_row [1 ]) or 0 )
291+ except Exception :
292+ local_max = 0
293+
294+ sql_stats = "SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id"
295+ with rt_conn .lock :
296+ live_stats_rows = _wcdb_exec_query (rt_conn .handle , kind = "message" , path = str (msg_db_path_real ), sql = sql_stats )
297+
298+ live_stats = live_stats_rows [0 ] if live_stats_rows and isinstance (live_stats_rows [0 ], dict ) else {}
299+ try :
300+ live_count = int (_pick_case_insensitive_value (live_stats , "c" , "count" ) or 0 )
301+ except Exception :
302+ live_count = 0
303+ try :
304+ live_max = int (_pick_case_insensitive_value (live_stats , "mx" , "max_rowid" , "max" ) or 0 )
305+ except Exception :
306+ live_max = 0
307+
308+ if local_count == live_count and local_max == live_max :
309+ return {
310+ "status" : "up_to_date" ,
311+ "rows" : int (local_count ),
312+ "localCount" : int (local_count ),
313+ "liveCount" : int (live_count ),
314+ "localMax" : int (local_max ),
315+ "liveMax" : int (live_max ),
316+ }
317+
318+ sql_rows = "SELECT rowid AS rowid, user_name AS user_name, COALESCE(is_session, 1) AS is_session FROM Name2Id ORDER BY rowid ASC"
319+ with rt_conn .lock :
320+ live_rows = _wcdb_exec_query (rt_conn .handle , kind = "message" , path = str (msg_db_path_real ), sql = sql_rows )
321+
322+ values : list [tuple [int , str , int ]] = []
323+ seen_rowids : set [int ] = set ()
324+ for item in live_rows :
325+ if not isinstance (item , dict ):
326+ continue
327+ try :
328+ rid = int (_pick_case_insensitive_value (item , "rowid" ) or 0 )
329+ except Exception :
330+ rid = 0
331+ username = str (_pick_case_insensitive_value (item , "user_name" , "username" ) or "" ).strip ()
332+ try :
333+ is_session = int (_pick_case_insensitive_value (item , "is_session" ) or 0 )
334+ except Exception :
335+ is_session = 0
336+ if rid <= 0 or not username or rid in seen_rowids :
337+ continue
338+ seen_rowids .add (rid )
339+ values .append ((rid , username , is_session ))
340+
341+ if live_count > 0 and not values :
342+ raise ValueError ("Live Name2Id rows could not be decoded." )
343+
344+ conn .execute ("DELETE FROM Name2Id" )
345+ if values :
346+ conn .executemany (
347+ "INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)" ,
348+ values ,
349+ )
350+ conn .commit ()
351+ return {
352+ "status" : "refreshed" ,
353+ "rows" : int (len (values )),
354+ "localCount" : int (local_count ),
355+ "liveCount" : int (live_count ),
356+ "localMax" : int (local_max ),
357+ "liveMax" : int (live_max ),
358+ }
359+
360+
189361def _normalize_edit_value (col : str , value : Any , * , from_snapshot : bool = False ) -> Any :
190362 c = str (col or "" ).strip ().lower ()
191363 if value is None :
@@ -1271,6 +1443,7 @@ def sync_chat_realtime_messages(
12711443 # Some sessions may not exist in the decrypted snapshot yet; create the missing Msg_<md5> table
12721444 # so we can insert the realtime rows and make `/api/chat/messages` work after switching off realtime.
12731445 msg_db_path , table_name = _ensure_decrypted_message_table (account_dir , username )
1446+ msg_db_path_real , _res_db_path_real = _resolve_db_storage_message_paths (account_dir , msg_db_path .stem )
12741447 logger .info (
12751448 "[%s] resolved decrypted table account=%s username=%s db=%s table=%s" ,
12761449 trace_id ,
@@ -1283,6 +1456,34 @@ def sync_chat_realtime_messages(
12831456 msg_conn = sqlite3 .connect (str (msg_db_path ))
12841457 msg_conn .row_factory = sqlite3 .Row
12851458 try :
1459+ name2id_synced = False
1460+ try :
1461+ sync_t0 = time .perf_counter ()
1462+ name2id_result = _sync_output_name2id_from_live (
1463+ msg_conn ,
1464+ rt_conn = rt_conn ,
1465+ msg_db_path_real = msg_db_path_real ,
1466+ )
1467+ sync_ms = (time .perf_counter () - sync_t0 ) * 1000.0
1468+ name2id_synced = str (name2id_result .get ("status" ) or "" ) in {"up_to_date" , "refreshed" }
1469+ logger .info (
1470+ "[%s] Name2Id sync account=%s db=%s status=%s rows=%s ms=%.1f" ,
1471+ trace_id ,
1472+ account_dir .name ,
1473+ msg_db_path .stem ,
1474+ str (name2id_result .get ("status" ) or "" ),
1475+ int (name2id_result .get ("rows" ) or 0 ),
1476+ sync_ms ,
1477+ )
1478+ except Exception as e :
1479+ logger .warning (
1480+ "[%s] Name2Id sync failed account=%s db=%s error=%s" ,
1481+ trace_id ,
1482+ account_dir .name ,
1483+ msg_db_path .stem ,
1484+ str (e ),
1485+ )
1486+
12861487 quoted_table = _quote_ident (table_name )
12871488 row = msg_conn .execute (f"SELECT MAX(local_id) AS mx FROM { quoted_table } " ).fetchone ()
12881489 try :
@@ -1425,42 +1626,12 @@ def normalize(item: dict[str, Any]) -> dict[str, Any]:
14251626
14261627 inserted = 0
14271628 backfilled = 0
1428- if new_rows :
1429- # Best-effort: keep Name2Id updated so decrypted queries can resolve sender usernames.
1430- # Rowid mapping is important (message.real_sender_id joins Name2Id.rowid).
1431- try :
1432- has_name2id = bool (
1433- msg_conn .execute (
1434- "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1"
1435- ).fetchone ()
1436- )
1437- except Exception :
1438- has_name2id = False
1439-
1440- if has_name2id :
1441- try :
1442- msg_conn .execute (
1443- "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)" ,
1444- (str (account_dir .name ), 1 ),
1445- )
1446- except Exception :
1447- pass
1448-
1449- for r in new_rows :
1450- try :
1451- rid = int (r .get ("real_sender_id" ) or 0 )
1452- except Exception :
1453- rid = 0
1454- su = str (r .get ("sender_username" ) or "" ).strip ()
1455- if rid <= 0 or not su :
1456- continue
1457- try :
1458- msg_conn .execute (
1459- "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)" ,
1460- (rid , su , 1 ),
1461- )
1462- except Exception :
1463- continue
1629+ if new_rows and (not name2id_synced ):
1630+ _best_effort_upsert_output_name2id_rows (
1631+ msg_conn ,
1632+ account_name = account_dir .name ,
1633+ rows = new_rows ,
1634+ )
14641635
14651636 # Insert older -> newer to keep sqlite btree locality similar to existing data.
14661637 values = [tuple (r .get (c ) for c in insert_cols ) for r in reversed (new_rows )]
@@ -1658,6 +1829,30 @@ def _sync_chat_realtime_messages_for_table(
16581829 msg_conn = sqlite3 .connect (str (msg_db_path ))
16591830 msg_conn .row_factory = sqlite3 .Row
16601831 try :
1832+ msg_db_path_real , _res_db_path_real = _resolve_db_storage_message_paths (account_dir , msg_db_path .stem )
1833+ name2id_synced = False
1834+ try :
1835+ name2id_result = _sync_output_name2id_from_live (
1836+ msg_conn ,
1837+ rt_conn = rt_conn ,
1838+ msg_db_path_real = msg_db_path_real ,
1839+ )
1840+ name2id_synced = str (name2id_result .get ("status" ) or "" ) in {"up_to_date" , "refreshed" }
1841+ logger .info (
1842+ "[realtime] Name2Id sync account=%s db=%s status=%s rows=%s" ,
1843+ account_dir .name ,
1844+ msg_db_path .stem ,
1845+ str (name2id_result .get ("status" ) or "" ),
1846+ int (name2id_result .get ("rows" ) or 0 ),
1847+ )
1848+ except Exception as e :
1849+ logger .warning (
1850+ "[realtime] Name2Id sync failed account=%s db=%s error=%s" ,
1851+ account_dir .name ,
1852+ msg_db_path .stem ,
1853+ str (e ),
1854+ )
1855+
16611856 quoted_table = _quote_ident (table_name )
16621857 row = msg_conn .execute (f"SELECT MAX(local_id) AS mx FROM { quoted_table } " ).fetchone ()
16631858 try :
@@ -1796,40 +1991,12 @@ def normalize(item: dict[str, Any]) -> dict[str, Any]:
17961991
17971992 inserted = 0
17981993 backfilled = 0
1799- if new_rows :
1800- try :
1801- has_name2id = bool (
1802- msg_conn .execute (
1803- "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1"
1804- ).fetchone ()
1805- )
1806- except Exception :
1807- has_name2id = False
1808-
1809- if has_name2id :
1810- try :
1811- msg_conn .execute (
1812- "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)" ,
1813- (str (account_dir .name ), 1 ),
1814- )
1815- except Exception :
1816- pass
1817-
1818- for r in new_rows :
1819- try :
1820- rid = int (r .get ("real_sender_id" ) or 0 )
1821- except Exception :
1822- rid = 0
1823- su = str (r .get ("sender_username" ) or "" ).strip ()
1824- if rid <= 0 or not su :
1825- continue
1826- try :
1827- msg_conn .execute (
1828- "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)" ,
1829- (rid , su , 1 ),
1830- )
1831- except Exception :
1832- continue
1994+ if new_rows and (not name2id_synced ):
1995+ _best_effort_upsert_output_name2id_rows (
1996+ msg_conn ,
1997+ account_name = account_dir .name ,
1998+ rows = new_rows ,
1999+ )
18332000
18342001 values = [tuple (r .get (c ) for c in insert_cols ) for r in reversed (new_rows )]
18352002 insert_t0 = time .perf_counter ()
0 commit comments