Skip to content

Commit 73815b7

Browse files
Handle disconnects and reconnects
1 parent 352baea commit 73815b7

1 file changed

Lines changed: 176 additions & 26 deletions

File tree

packages/firebase_data_connect/firebase_data_connect/lib/src/network/websocket_transport.dart

Lines changed: 176 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,24 @@ part of 'transport_library.dart';
1616

1717
/// WebSocketTransport makes requests out to the streaming endpoints of the configured backend,
1818
/// multiplexing multiple subscriptions and unary operations over a single WebSocket connection.
19+
20+
class _PendingUnary {
21+
final Completer<ServerResponse> completer;
22+
final String operationName;
23+
final Map<String, dynamic>? variables;
24+
final bool isMutation;
25+
26+
_PendingUnary(this.completer, this.operationName, this.variables, this.isMutation);
27+
}
28+
29+
class _PendingSubscription {
30+
final String operationId;
31+
final String queryName;
32+
final Map<String, dynamic>? variables;
33+
34+
_PendingSubscription(this.operationId, this.queryName, this.variables);
35+
}
36+
1937
class WebSocketTransport implements DataConnectTransport {
2038
/// Initializes necessary protocol and port.
2139
WebSocketTransport(
@@ -41,11 +59,12 @@ class WebSocketTransport implements DataConnectTransport {
4159
_currentUid = auth?.currentUser?.uid;
4260
_authSubscription = auth?.idTokenChanges().listen((user) async {
4361
final newUid = user?.uid;
44-
// Don't disconnect if auth state changes from not logged in to logged in.
45-
// Only disconnect if logged in user changes.
46-
if (_currentUid != null && _currentUid != newUid) {
62+
// Disconnect and reconnect on any fundamental user change (login, logout, switch).
63+
if (_currentUid != newUid) {
4764
_disconnect();
65+
_scheduleReconnect();
4866
} else if (newUid != null && isConnected) {
67+
// Token refreshed for the same user, push the new token natively down the socket.
4968
try {
5069
final token = await user?.getIdToken();
5170
final request = StreamRequest(
@@ -88,15 +107,29 @@ class WebSocketTransport implements DataConnectTransport {
88107
StreamSubscription? _channelSubscription;
89108

90109
// Active listeners for stream subscriptions mapped by requestId.
91-
final Map<String, List<StreamController<ServerResponse>>> _streamListeners =
92-
{};
110+
final Map<String, List<StreamController<ServerResponse>>> _streamListeners = {};
111+
112+
// Pending information for subscriptions mapped by requestId.
113+
final Map<String, _PendingSubscription> _pendingSubscriptions = {};
93114

94115
// Active completers for unary operations mapped by requestId.
95-
final Map<String, List<Completer<ServerResponse>>> _unaryListeners = {};
116+
final Map<String, List<_PendingUnary>> _unaryListeners = {};
96117

97118
// Active subscriptions mapped by operationId => requestId.
98119
final Map<String, String> _activeSubscriptions = {};
99120

121+
bool _isReconnecting = false;
122+
bool _isIdleDisconnect = false;
123+
int _reconnectAttempts = 0;
124+
125+
void _checkIdleAndDisconnect() {
126+
if (_streamListeners.isEmpty && _unaryListeners.isEmpty) {
127+
_isIdleDisconnect = true;
128+
_disconnect();
129+
}
130+
}
131+
132+
100133
final Random _random = Random();
101134
static const String _chars = 'abcdefghijklmnopqrstuvwxyz0123456789';
102135

@@ -172,10 +205,13 @@ class WebSocketTransport implements DataConnectTransport {
172205
}
173206

174207
if (_unaryListeners.containsKey(requestId)) {
175-
final completers = _unaryListeners.remove(requestId)!;
176-
for (final completer in completers) {
177-
completer.complete(serverResponse);
208+
final pendings = _unaryListeners.remove(requestId)!;
209+
for (final p in pendings) {
210+
if (!p.completer.isCompleted) {
211+
p.completer.complete(serverResponse);
212+
}
178213
}
214+
_checkIdleAndDisconnect();
179215
}
180216

181217
if (_streamListeners.containsKey(requestId)) {
@@ -186,6 +222,8 @@ class WebSocketTransport implements DataConnectTransport {
186222
}
187223
_streamListeners.remove(requestId);
188224
_activeSubscriptions.removeWhere((key, value) => value == requestId);
225+
_pendingSubscriptions.remove(requestId);
226+
_checkIdleAndDisconnect();
189227
} else {
190228
for (final controller in controllers) {
191229
controller.add(serverResponse);
@@ -198,39 +236,145 @@ class WebSocketTransport implements DataConnectTransport {
198236
}
199237
}
200238

201-
void _onError(dynamic error) {
202-
final e =
203-
DataConnectError(DataConnectErrorCode.other, 'WebSocket error: $error');
204-
for (final completers in _unaryListeners.values) {
205-
for (final completer in completers) {
206-
completer.completeError(e);
239+
void _clearState([DataConnectError? error]) {
240+
final e = error ?? DataConnectError(DataConnectErrorCode.other, 'WebSocket connection closed.');
241+
for (final pendings in _unaryListeners.values) {
242+
for (final p in pendings) {
243+
if (!p.completer.isCompleted) {
244+
p.completer.completeError(e);
245+
}
207246
}
208247
}
209248
for (final controllers in _streamListeners.values) {
210249
for (final controller in controllers) {
211250
controller.addError(e);
251+
controller.close();
212252
}
213253
}
214254
_unaryListeners.clear();
215255
_streamListeners.clear();
216256
_activeSubscriptions.clear();
257+
_pendingSubscriptions.clear();
258+
_isReconnecting = false;
259+
_reconnectAttempts = 0;
260+
}
261+
262+
void _scheduleReconnect() {
263+
if (_isReconnecting) return;
264+
_isReconnecting = true;
265+
266+
if (_reconnectAttempts >= 10) {
267+
_clearState(DataConnectError(DataConnectErrorCode.other, 'Network disconnected after max attempts.'));
268+
return;
269+
}
270+
271+
final delay = min(1000 * pow(2, _reconnectAttempts).toInt(), 30000);
272+
Future.delayed(Duration(milliseconds: delay), () {
273+
_performReconnect();
274+
});
275+
}
276+
277+
Future<void> _performReconnect() async {
278+
_channel?.sink.close();
217279
_channel = null;
280+
_reconnectAttempts++;
281+
282+
String? authToken;
283+
try {
284+
authToken = await auth?.currentUser?.getIdToken();
285+
} catch (_) {
286+
// If fetching token fails, continue unauthenticated.
287+
authToken = null;
288+
}
289+
290+
try {
291+
if (appCheck != null) {
292+
await appCheck!.getToken();
293+
}
294+
} catch (_) {
295+
// Ignored: continue without AppCheck token if it fails.
296+
}
297+
298+
try {
299+
await _ensureConnected(authToken);
300+
301+
_reconnectAttempts = 0;
302+
_isReconnecting = false;
303+
304+
// Resubscribe active subscriptions
305+
for (final sub in _pendingSubscriptions.values) {
306+
final reqId = _activeSubscriptions[sub.operationId];
307+
if (reqId == null) continue;
308+
final headers = _buildHeaders(authToken, appCheck == null ? null : await appCheck!.getToken());
309+
final request = StreamRequest(
310+
authToken: authToken,
311+
appCheckToken: headers['X-Firebase-AppCheck'],
312+
requestId: reqId,
313+
requestKind: RequestKind.subscribe,
314+
subscribe: ExecuteRequest(sub.queryName, sub.variables),
315+
headers: headers,
316+
);
317+
_channel!.sink.add(jsonEncode(request.toJson()));
318+
}
319+
320+
// Replay queries, fail mutations
321+
final unariesToReplay = <String, List<_PendingUnary>>{};
322+
for (final entry in _unaryListeners.entries) {
323+
final reqId = entry.key;
324+
final kept = <_PendingUnary>[];
325+
for (final p in entry.value) {
326+
if (p.isMutation) {
327+
p.completer.completeError(DataConnectError(DataConnectErrorCode.other, 'Network reconnected; mutations cannot be safely retried.'));
328+
} else {
329+
kept.add(p);
330+
final headers = _buildHeaders(authToken, appCheck == null ? null : await appCheck!.getToken());
331+
final request = StreamRequest(
332+
authToken: authToken,
333+
appCheckToken: headers['X-Firebase-AppCheck'],
334+
requestId: reqId,
335+
requestKind: RequestKind.execute,
336+
execute: ExecuteRequest(p.operationName, p.variables),
337+
headers: headers,
338+
);
339+
_channel!.sink.add(jsonEncode(request.toJson()));
340+
}
341+
}
342+
if (kept.isNotEmpty) {
343+
unariesToReplay[reqId] = kept;
344+
}
345+
}
346+
_unaryListeners.clear();
347+
_unaryListeners.addAll(unariesToReplay);
348+
} catch (e) {
349+
_scheduleReconnect();
350+
}
351+
}
352+
353+
void _onError(dynamic error) {
354+
developer.log('WebSocket error: $error');
355+
_channel = null;
356+
if (!_isIdleDisconnect) {
357+
_scheduleReconnect();
358+
} else {
359+
_clearState();
360+
_isIdleDisconnect = false;
361+
}
218362
}
219363

220364
void _disconnect() {
221365
_channel?.sink.close();
366+
_channel = null;
222367
}
223368

224369
void _onDone() {
370+
developer.log('WebSocket connection closed.');
225371
_channel = null;
226-
for (final controllers in _streamListeners.values) {
227-
for (final controller in controllers) {
228-
controller.close();
229-
}
372+
if (!_isIdleDisconnect) {
373+
_scheduleReconnect();
374+
} else {
375+
_clearState();
376+
_isIdleDisconnect = false;
230377
}
231-
_unaryListeners.clear();
232-
_streamListeners.clear();
233-
_activeSubscriptions.clear();
234378
}
235379

236380
@override
@@ -242,7 +386,7 @@ class WebSocketTransport implements DataConnectTransport {
242386
String? authToken,
243387
) async {
244388
return _invokeUnary(queryName, deserializer, serializer, vars, authToken,
245-
RequestKind.execute);
389+
RequestKind.execute, false);
246390
}
247391

248392
@override
@@ -254,7 +398,7 @@ class WebSocketTransport implements DataConnectTransport {
254398
String? authToken,
255399
) async {
256400
return _invokeUnary(queryName, deserializer, serializer, vars, authToken,
257-
RequestKind.execute);
401+
RequestKind.execute, true);
258402
}
259403

260404
Future<ServerResponse> _invokeUnary<Data, Variables>(
@@ -264,6 +408,7 @@ class WebSocketTransport implements DataConnectTransport {
264408
Variables? vars,
265409
String? authToken,
266410
RequestKind requestKind,
411+
bool isMutation,
267412
) async {
268413
await _ensureConnected(authToken);
269414

@@ -273,7 +418,9 @@ class WebSocketTransport implements DataConnectTransport {
273418

274419
if (_activeSubscriptions.containsKey(operationId)) {
275420
final existingRequestId = _activeSubscriptions[operationId]!;
276-
_unaryListeners.putIfAbsent(existingRequestId, () => []).add(completer);
421+
Map<String, dynamic>? variablesMap;
422+
if (vars != null && serializer != null) variablesMap = jsonDecode(serializer(vars));
423+
_unaryListeners.putIfAbsent(existingRequestId, () => []).add(_PendingUnary(completer, operationName, variablesMap, isMutation));
277424

278425
String? appCheckToken;
279426
try {
@@ -298,12 +445,12 @@ class WebSocketTransport implements DataConnectTransport {
298445
}
299446

300447
final requestId = _generateRequestId(operationId);
301-
_unaryListeners.putIfAbsent(requestId, () => []).add(completer);
302448

303449
Map<String, dynamic>? variables;
304450
if (vars != null && serializer != null) {
305451
variables = json.decode(serializer(vars));
306452
}
453+
_unaryListeners.putIfAbsent(requestId, () => []).add(_PendingUnary(completer, operationName, variables, isMutation));
307454

308455
String? appCheckToken;
309456
try {
@@ -360,6 +507,7 @@ class WebSocketTransport implements DataConnectTransport {
360507
if (vars != null && serializer != null) {
361508
variables = json.decode(serializer(vars));
362509
}
510+
_pendingSubscriptions[requestId] = _PendingSubscription(operationId, queryName, variables);
363511

364512
String? appCheckToken;
365513
try {
@@ -391,6 +539,7 @@ class WebSocketTransport implements DataConnectTransport {
391539
if (listeners.isEmpty) {
392540
_streamListeners.remove(requestId);
393541
_activeSubscriptions.remove(operationId);
542+
_pendingSubscriptions.remove(requestId);
394543

395544
if (_channel != null) {
396545
final cancelReq = StreamRequest(
@@ -400,6 +549,7 @@ class WebSocketTransport implements DataConnectTransport {
400549
);
401550
_channel!.sink.add(jsonEncode(cancelReq.toJson()));
402551
}
552+
_checkIdleAndDisconnect();
403553
}
404554
}
405555
},

0 commit comments

Comments
 (0)