1- // Copyright 2024 Google LLC
1+ // Copyright 2026 Google LLC
22//
33// Licensed under the Apache License, Version 2.0 (the "License");
44// you may not use this file except in compliance with the License.
@@ -53,15 +53,43 @@ abstract class OperationRef<Data, Variables> {
5353 );
5454 Variables ? variables;
5555 String operationName;
56- DataConnectTransport _transport;
56+ final DataConnectTransport _transport;
5757 Deserializer <Data > deserializer;
5858 Serializer <Variables > serializer;
5959 String ? _lastToken;
6060
6161 FirebaseDataConnect dataConnect;
6262
63- Future <OperationResult <Data , Variables >> execute (
64- {QueryFetchPolicy fetchPolicy = QueryFetchPolicy .preferCache});
63+ static dynamic _sortKeys (dynamic value) {
64+ if (value is Map ) {
65+ final sortedMap = < String , dynamic > {};
66+ final sortedKeys = value.keys.toList ()..sort ();
67+ for (final key in sortedKeys) {
68+ sortedMap[key.toString ()] = _sortKeys (value[key]);
69+ }
70+ return sortedMap;
71+ } else if (value is List ) {
72+ return value.map (_sortKeys).toList ();
73+ }
74+ return value;
75+ }
76+
77+ static String createOperationId <Variables >(String operationName,
78+ Variables ? vars, Serializer <Variables >? serializer) {
79+ if (vars != null && serializer != null ) {
80+ try {
81+ final decoded = jsonDecode (serializer (vars));
82+ final sortedStr = jsonEncode (_sortKeys (decoded));
83+ return '$operationName ::$sortedStr ' ;
84+ } catch (_) {
85+ return '$operationName ::${serializer (vars )}' ;
86+ }
87+ } else {
88+ return operationName;
89+ }
90+ }
91+
92+ Future <OperationResult <Data , Variables >> execute ();
6593
6694 Future <bool > _shouldRetry () async {
6795 String ? newToken;
@@ -184,15 +212,6 @@ class QueryManager {
184212 return streamController;
185213 }
186214
187- static String createQueryId <QueryVariables >(String queryName,
188- QueryVariables ? vars, Serializer <QueryVariables > varSerializer) {
189- if (vars != null ) {
190- return '$queryName ::${varSerializer (vars )}' ;
191- } else {
192- return queryName;
193- }
194- }
195-
196215 void dispose () {
197216 _impactedQueriesSubscription? .cancel ();
198217 }
@@ -216,7 +235,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
216235 variables,
217236 );
218237
219- QueryManager _queryManager;
238+ final QueryManager _queryManager;
220239
221240 @override
222241 Future <QueryResult <Data , Variables >> execute (
@@ -240,7 +259,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
240259 }
241260
242261 String get _queryId =>
243- QueryManager . createQueryId (operationName, variables, serializer);
262+ OperationRef . createOperationId (operationName, variables, serializer);
244263
245264 Future <QueryResult <Data , Variables >> _executeFromCache (
246265 QueryFetchPolicy fetchPolicy) async {
@@ -311,9 +330,58 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
311330 Stream <QueryResult <Data , Variables >> subscribe () {
312331 _streamController ?? = _queryManager.addQuery (this );
313332
314- execute ();
333+ final stream =
334+ _streamController! .stream.cast <QueryResult <Data , Variables >>();
335+
336+ // Return the stream to the caller, then execute fetches
337+ Future .microtask (() {
338+ if (dataConnect.cacheManager != null ) {
339+ _executeFromCache (QueryFetchPolicy .cacheOnly)
340+ .then ((_) {})
341+ .catchError ((err) {
342+ log ("Error fetching from cache during subscribe $err " );
343+ // Ignore cache misses here, server stream will provide latest data
344+ });
345+ }
346+
347+ // Initiate Web Socket stream
348+ _streamFromServer ();
349+ });
350+
351+ return stream;
352+ }
353+
354+ void _streamFromServer () async {
355+ bool shouldRetry = await _shouldRetry ();
356+ try {
357+ final stream = _transport.invokeStreamQuery <Data , Variables >(
358+ operationName,
359+ deserializer,
360+ serializer,
361+ variables,
362+ _lastToken,
363+ );
364+
365+ await for (final serverResponse in stream) {
366+ if (dataConnect.cacheManager != null ) {
367+ await dataConnect.cacheManager! .update (_queryId, serverResponse);
368+ }
369+ Data typedData = _convertBodyJsonToData (serverResponse.data);
315370
316- return _streamController! .stream.cast <QueryResult <Data , Variables >>();
371+ QueryResult <Data , Variables > res =
372+ QueryResult (dataConnect, typedData, DataSource .server, this );
373+ publishResultToStream (res);
374+ }
375+ } on DataConnectError catch (e) {
376+ if (shouldRetry &&
377+ e.code == DataConnectErrorCode .unauthorized.toString ()) {
378+ _streamFromServer ();
379+ } else {
380+ publishErrorToStream (e);
381+ }
382+ } catch (e) {
383+ publishErrorToStream (e as Error );
384+ }
317385 }
318386
319387 void publishResultToStream (QueryResult <Data , Variables > result) {
@@ -322,7 +390,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
322390 }
323391 }
324392
325- void publishErrorToStream (Error err) {
393+ void publishErrorToStream (Object err) {
326394 if (_streamController != null ) {
327395 _streamController? .addError (err);
328396 }
@@ -331,24 +399,16 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
331399
332400class MutationRef <Data , Variables > extends OperationRef <Data , Variables > {
333401 MutationRef (
334- FirebaseDataConnect dataConnect,
335- String operationName,
336- DataConnectTransport transport,
337- Deserializer <Data > deserializer,
338- Serializer <Variables > serializer,
339- Variables ? variables,
340- ) : super (
341- dataConnect,
342- operationName,
343- transport,
344- deserializer,
345- serializer,
346- variables,
347- );
402+ super .dataConnect,
403+ super .operationName,
404+ super .transport,
405+ super .deserializer,
406+ super .serializer,
407+ super .variables,
408+ );
348409
349410 @override
350- Future <OperationResult <Data , Variables >> execute (
351- {QueryFetchPolicy fetchPolicy = QueryFetchPolicy .serverOnly}) async {
411+ Future <OperationResult <Data , Variables >> execute () async {
352412 bool shouldRetry = await _shouldRetry ();
353413 try {
354414 // Logic below is duplicated due to the fact that `executeOperation` returns
0 commit comments