From 39abeea81096f17248e9e10a5f2ee96646c3b873 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 19:57:58 -0700 Subject: [PATCH 01/17] initial commit --- .../cloud/firestore_v1/base_pipeline.py | 72 +++++++++++++++++++ .../firestore_v1/pipeline_expressions.py | 50 +++++++++++++ .../cloud/firestore_v1/pipeline_source.py | 22 ++++++ .../cloud/firestore_v1/pipeline_stages.py | 23 ++++++ .../tests/system/pipeline_e2e/subqueries.yaml | 57 +++++++++++++++ .../tests/system/test_pipeline_acceptance.py | 6 ++ 6 files changed, 230 insertions(+) create mode 100644 packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index fac7f8bc4bce..615cf03e9b59 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -25,6 +25,8 @@ Expression, Field, Selectable, + FunctionExpression, + _PipelineValueExpression, ) from google.cloud.firestore_v1.types.pipeline import ( StructuredPipeline as StructuredPipeline_pb, @@ -90,6 +92,51 @@ def _to_pb(self, **options) -> StructuredPipeline_pb: options=options, ) + def to_array_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to an array of results. + Used for embedding 1:N subqueries into stages like `addFields`. + + Example: + >>> # Get a list of all reviewer names for each book + >>> db.pipeline().collection("books").define(Field.of("id").as_("book_id")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("book_id").equal(Variable("book_id"))) + ... .select(Field.of("reviewer").as_("name")) + ... .to_array_expression().as_("reviewers") + ... ) + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("array", [_PipelineValueExpression(self)]) + + def to_scalar_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to a single scalar result. + Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. + + **Result Unwrapping:** + For simpler access, scalar subqueries producing a single field automatically unwrap that value to the + top level, ignoring the inner alias. If the subquery returns multiple fields, they are preserved as a map. + + Example: + >>> # Calculate average rating for each restaurant using a subquery + >>> db.pipeline().collection("restaurants").define(Field.of("id").as_("rid")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("restaurant_id").equal(Variable("rid"))) + ... .aggregate(AggregateFunction.average("rating").as_("value")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + Raises: + RuntimeError: If the result set contains more than one item. If the pipeline has zero results, it evaluates to `null` instead of raising an error. + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("scalar", [_PipelineValueExpression(self)]) + def _append(self, new_stage): """ Create a new Pipeline object with a new stage appended @@ -610,3 +657,28 @@ def distinct(self, *fields: str | Selectable) -> "_BasePipeline": A new Pipeline object with this stage appended to the stage list """ return self._append(stages.Distinct(*fields)) + + def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": + """ + Binds one or more expressions to Variables that can be accessed in subsequent stages + or inner subqueries using `Variable`. + + Each Variable is defined using an :class:`AliasedExpression`, which pairs an expression with + a name (alias). + + Example: + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice"), + ... Field.of("stock").add(10).as_("newStock") + ... ).where( + ... Variable("discountedPrice").less_than(100) + ... ).select(Field.of("name"), Variable("newStock")) + + Args: + *aliased_expressions: One or more :class:`AliasedExpression` defining the Variable names and values. + + Returns: + A new Pipeline object with this stage appended to the stage list. + """ + return self._append(stages.Define(*aliased_expressions)) + diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 1833d2ffbac0..b58ae6834b41 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -2634,3 +2634,53 @@ class Rand(FunctionExpression): def __init__(self): super().__init__("rand", [], use_infix_repr=False) + + +class Variable(Expression): + """ + Creates an expression that retrieves the value of a variable bound via `Pipeline.define`. + + Example: + >>> # Define a variable "discountedPrice" and use it in a filter + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice") + ... ).where(Variable("discountedPrice").less_than(100)) + + Args: + name: The name of the variable to retrieve. + """ + + def __init__(self, name: str): + self.name = name + + def _to_pb(self) -> Value: + return Value(variable_reference_value=self.name) + + +class _PipelineValueExpression(Expression): + """Internal wrapper to represent a pipeline as an expression.""" + + def __init__(self, pipeline): + self.pipeline = pipeline + + def _to_pb(self) -> Value: + return Value(pipeline_value=self.pipeline._to_pb()) + + +class CurrentDocument(FunctionExpression): + """ + Creates an expression that represents the current document being processed. + + This acts as a handle, allowing you to bind the entire document to a variable or pass the + document itself to a function or subquery. + + Example: + >>> # Define the current document as a variable "doc" + >>> db.pipeline().collection("books").define( + ... CurrentDocument().as_("doc") + ... ).select(Variable("doc").get_field("title")) + """ + + def __init__(self): + super().__init__("current_document", []) + diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index 7075797b3d57..a8ee5230dfcb 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -171,3 +171,25 @@ def literals( A new Pipeline object with this stage appended to the stage list. """ return self._create_pipeline(stages.Literals(*documents)) + + def subcollection(self, path: str) -> PipelineType: + """ + Creates a new Pipeline targeted at a subcollection relative to the current document context. + + This is used inside stages like `addFields` to query physically nested subcollections + without manually joining on IDs. + + Example: + >>> db.pipeline().collection("books").add_fields( + ... db.pipeline().subcollection("reviews") + ... .aggregate(AggregateFunction.average("rating").as_("avg_rating")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + Args: + path: The path of the subcollection. + + Returns: + A new :class:`Pipeline` instance scoped to the subcollection. + """ + return self._create_pipeline(stages.Subcollection(path)) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py index cac9c70d4b99..d1f5a459833d 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py @@ -494,3 +494,26 @@ def __init__(self, condition: BooleanExpression): def _pb_args(self): return [self.condition._to_pb()] + + +class Define(Stage): + """Binds one or more expressions to variables.""" + + def __init__(self, *expressions: AliasedExpression): + super().__init__("let") + self.expressions = list(expressions) + + def _pb_args(self) -> list[Value]: + return [Selectable._to_value(self.expressions)] + + +class Subcollection(Stage): + """Targets a subcollection relative to the current document.""" + + def __init__(self, path: str): + super().__init__("subcollection") + self.path = path + + def _pb_args(self) -> list[Value]: + return [encode_value(self.path)] + diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml new file mode 100644 index 000000000000..3228bbf998e5 --- /dev/null +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -0,0 +1,57 @@ +tests: + - description: array_subquery_with_variable + pipeline: + - Collection: publishers + - Where: + - Field.of.equal: + - publisherId + - pub1 + - Define: + - Field.of.as_: + - publisherId + - pub_id + - AddFields: + - Pipeline.to_array_expression: + - Collection: books + - Where: + - Field.of.equal: + - publisherId + - Variable: pub_id + as_: books + - Select: + - Field: name + - Field: books + assert_results: + - name: Publisher 1 + books: + - title: The Hitchhiker's Guide to the Galaxy + author: Douglas Adams + - title: Pride and Prejudice + author: Jane Austen + + - description: scalar_subquery_with_current_document + pipeline: + - Collection: books + - Where: + - Field.of.equal: + - title + - 1984 + - Define: + - CurrentDocument.as_: doc + - AddFields: + - Pipeline.to_scalar_expression: + - Collection: reviews + - Where: + - Field.of.equal: + - bookId + - Variable.get_field: + - doc + - __name__ + - Aggregate: + - AggregateFunction.average.as_: + - rating + - avg_rating + as_: average_rating + assert_results: + - title: 1984 + average_rating: 4.5 diff --git a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py index f1fd1326c765..9023ea389aab 100644 --- a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py +++ b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py @@ -258,6 +258,12 @@ def _parse_expressions(client, yaml_element: Any): # find Pipeline objects for Union expressions other_ppl = yaml_element["Pipeline"] return parse_pipeline(client, other_ppl) + elif len(yaml_element) == 1 and list(yaml_element)[0] == "Pipeline.to_array_expression": + other_ppl = yaml_element["Pipeline.to_array_expression"] + return parse_pipeline(client, other_ppl).to_array_expression() + elif len(yaml_element) == 1 and list(yaml_element)[0] == "Pipeline.to_scalar_expression": + other_ppl = yaml_element["Pipeline.to_scalar_expression"] + return parse_pipeline(client, other_ppl).to_scalar_expression() else: # otherwise, return dict return { From 479d1cd7401afd986d399662d924530682699a14 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 20:06:19 -0700 Subject: [PATCH 02/17] update docstring --- .../google/cloud/firestore_v1/pipeline_source.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index a8ee5230dfcb..5b16f7b67926 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -174,10 +174,13 @@ def literals( def subcollection(self, path: str) -> PipelineType: """ - Creates a new Pipeline targeted at a subcollection relative to the current document context. + Initializes a pipeline scoped to a subcollection. - This is used inside stages like `addFields` to query physically nested subcollections - without manually joining on IDs. + This method allows you to start a new pipeline that operates on a subcollection of the + current document. It is intended to be used as a subquery. + + **Note:** A pipeline created with `subcollection` cannot be executed directly using + `execute()`. It must be used within a parent pipeline. Example: >>> db.pipeline().collection("books").add_fields( From 351bd022c65a1f2083574a7e08ef359b36e5d95e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 21:22:36 -0700 Subject: [PATCH 03/17] fixed formatting --- .../cloud/firestore_v1/base_pipeline.py | 1 - .../firestore_v1/pipeline_expressions.py | 6 +- .../cloud/firestore_v1/pipeline_stages.py | 1 - .../tests/system/pipeline_e2e/subqueries.yaml | 90 ++++++++++--------- .../tests/system/test_pipeline_acceptance.py | 12 ++- 5 files changed, 62 insertions(+), 48 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index 615cf03e9b59..328e64155155 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -681,4 +681,3 @@ def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": A new Pipeline object with this stage appended to the stage list. """ return self._append(stages.Define(*aliased_expressions)) - diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index b58ae6834b41..a28ddeffc390 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -30,7 +30,7 @@ ) from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value -from google.cloud.firestore_v1.types.document import Value +from google.cloud.firestore_v1.types.document import Value, Pipeline from google.cloud.firestore_v1.types.query import StructuredQuery as Query_pb from google.cloud.firestore_v1.vector import Vector @@ -2664,7 +2664,8 @@ def __init__(self, pipeline): self.pipeline = pipeline def _to_pb(self) -> Value: - return Value(pipeline_value=self.pipeline._to_pb()) + pipeline_pb = Pipeline(stages=[s._to_pb() for s in self.pipeline.stages]) + return Value(pipeline_value=pipeline_pb) class CurrentDocument(FunctionExpression): @@ -2683,4 +2684,3 @@ class CurrentDocument(FunctionExpression): def __init__(self): super().__init__("current_document", []) - diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py index d1f5a459833d..7a2d8d3b08af 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py @@ -516,4 +516,3 @@ def __init__(self, path: str): def _pb_args(self) -> list[Value]: return [encode_value(self.path)] - diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml index 3228bbf998e5..ac2c854b84e8 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -1,57 +1,65 @@ tests: - description: array_subquery_with_variable pipeline: - - Collection: publishers + - Collection: books - Where: - - Field.of.equal: - - publisherId - - pub1 + - FunctionExpression.equal: + - Field: title + - Constant: "1984" - Define: - - Field.of.as_: - - publisherId - - pub_id + - AliasedExpression: + - Field: genre + - target_genre - AddFields: - - Pipeline.to_array_expression: - - Collection: books - - Where: - - Field.of.equal: - - publisherId - - Variable: pub_id - as_: books + - AliasedExpression: + - Pipeline.to_array_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - Variable: target_genre + - Select: + - Field: title + - same_genre_books - Select: - - Field: name - - Field: books + - Field: title + - Field: same_genre_books assert_results: - - name: Publisher 1 - books: - - title: The Hitchhiker's Guide to the Galaxy - author: Douglas Adams - - title: Pride and Prejudice - author: Jane Austen + - title: "1984" + same_genre_books: + - "1984" + - "The Handmaid's Tale" - description: scalar_subquery_with_current_document pipeline: - Collection: books - Where: - - Field.of.equal: - - title - - 1984 + - FunctionExpression.equal: + - Field: title + - Constant: "1984" - Define: - - CurrentDocument.as_: doc + - AliasedExpression: + - CurrentDocument: + - doc - AddFields: - - Pipeline.to_scalar_expression: - - Collection: reviews - - Where: - - Field.of.equal: - - bookId - - Variable.get_field: - - doc - - __name__ - - Aggregate: - - AggregateFunction.average.as_: - - rating - - avg_rating - as_: average_rating + - AliasedExpression: + - Pipeline.to_scalar_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - FunctionExpression.map_get: + - Variable: doc + - Constant: "genre" + - Aggregate: + - AliasedExpression: + - FunctionExpression.average: + - Field: rating + - avg_rating + - average_rating + - Select: + - Field: title + - Field: average_rating assert_results: - - title: 1984 - average_rating: 4.5 + - title: "1984" + average_rating: 4.15 diff --git a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py index 9023ea389aab..81bad2e8c3cf 100644 --- a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py +++ b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py @@ -258,10 +258,16 @@ def _parse_expressions(client, yaml_element: Any): # find Pipeline objects for Union expressions other_ppl = yaml_element["Pipeline"] return parse_pipeline(client, other_ppl) - elif len(yaml_element) == 1 and list(yaml_element)[0] == "Pipeline.to_array_expression": + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_array_expression" + ): other_ppl = yaml_element["Pipeline.to_array_expression"] return parse_pipeline(client, other_ppl).to_array_expression() - elif len(yaml_element) == 1 and list(yaml_element)[0] == "Pipeline.to_scalar_expression": + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_scalar_expression" + ): other_ppl = yaml_element["Pipeline.to_scalar_expression"] return parse_pipeline(client, other_ppl).to_scalar_expression() else: @@ -292,6 +298,8 @@ def _apply_yaml_args_to_callable(callable_obj, client, yaml_args): ): # yaml has an array of arguments. Treat as args return callable_obj(*_parse_expressions(client, yaml_args)) + elif yaml_args is None: + return callable_obj() else: # yaml has a single argument return callable_obj(_parse_expressions(client, yaml_args)) From b8651561df6a19c5752739b1154ad5e61b8cb604 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 21:45:21 -0700 Subject: [PATCH 04/17] added assert proto blocks --- .../tests/system/pipeline_e2e/subqueries.yaml | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml index ac2c854b84e8..7eed48ea8e73 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -29,6 +29,59 @@ tests: same_genre_books: - "1984" - "The Handmaid's Tale" + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + target_genre: + fieldReferenceValue: genre + name: let + - args: + - mapValue: + fields: + same_genre_books: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - variableReferenceValue: target_genre + name: equal + name: where + - args: + - mapValue: + fields: + title: + fieldReferenceValue: title + name: select + name: array + name: add_fields + - args: + - mapValue: + fields: + same_genre_books: + fieldReferenceValue: same_genre_books + title: + fieldReferenceValue: title + name: select - description: scalar_subquery_with_current_document pipeline: @@ -63,3 +116,65 @@ tests: assert_results: - title: "1984" average_rating: 4.15 + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + doc: + functionValue: + name: current_document + name: let + - args: + - mapValue: + fields: + average_rating: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - functionValue: + args: + - variableReferenceValue: doc + - stringValue: genre + name: map_get + name: equal + name: where + - args: + - mapValue: + fields: + avg_rating: + functionValue: + args: + - fieldReferenceValue: rating + name: average + - mapValue: {} + name: aggregate + name: scalar + name: add_fields + - args: + - mapValue: + fields: + average_rating: + fieldReferenceValue: average_rating + title: + fieldReferenceValue: title + name: select From 6f37a5b763331ec21f6bdd50317ec41132219f0c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 21:53:47 -0700 Subject: [PATCH 05/17] fixed bug in tests --- .../google/cloud/firestore_v1/pipeline_expressions.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index a28ddeffc390..f119c091d424 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -30,7 +30,8 @@ ) from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value -from google.cloud.firestore_v1.types.document import Value, Pipeline +from google.cloud.firestore_v1.types.document import Value +from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb from google.cloud.firestore_v1.types.query import StructuredQuery as Query_pb from google.cloud.firestore_v1.vector import Vector @@ -2664,7 +2665,7 @@ def __init__(self, pipeline): self.pipeline = pipeline def _to_pb(self) -> Value: - pipeline_pb = Pipeline(stages=[s._to_pb() for s in self.pipeline.stages]) + pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages]) return Value(pipeline_value=pipeline_pb) From d94d96771675418918895546b8467965f8bc97db Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Mar 2026 22:07:07 -0700 Subject: [PATCH 06/17] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../google/cloud/firestore_v1/pipeline_expressions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index f119c091d424..7bd5f6a60691 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -2661,7 +2661,7 @@ def _to_pb(self) -> Value: class _PipelineValueExpression(Expression): """Internal wrapper to represent a pipeline as an expression.""" - def __init__(self, pipeline): + def __init__(self, pipeline: "google.cloud.firestore_v1.base_pipeline._BasePipeline"): self.pipeline = pipeline def _to_pb(self) -> Value: From 5666fcb22e701e4c1ba403e7aa66dbe6e2ee54a9 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 11:33:07 -0700 Subject: [PATCH 07/17] added get_field --- .../firestore_v1/pipeline_expressions.py | 18 +++ .../tests/system/pipeline_e2e/map.yaml | 108 ++++++++++++++++++ .../unit/v1/test_pipeline_expressions.py | 10 ++ 3 files changed, 136 insertions(+) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 7bd5f6a60691..25b26e77975d 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -163,6 +163,24 @@ def __get__(self, instance, owner): else: return self.instance_func.__get__(instance, owner) + @expose_as_static + def get_field(self, key: Expression | str) -> "Expression": + """Accesses a field/property of the expression that evaluates to a Map or Document. + + Example: + >>> # Access the 'city' field from the 'address' map field. + >>> Field.of("address").get_field("city") + + Args: + key: The key of the field to access. + + Returns: + A new `Expression` representing the value of the field. + """ + return FunctionExpression( + "get_field", [self, self._cast_to_expr_or_convert_to_constant(key)] + ) + @expose_as_static def add(self, other: Expression | float) -> "Expression": """Creates an expression that adds this expression to another expression or constant. diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml index 0338c570b63e..257aef6c7228 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml @@ -444,3 +444,111 @@ tests: args: - fieldReferenceValue: awards name: select + - description: testGetField + pipeline: + - Collection: books + - Sort: + - Ordering: + - Field: published + - DESCENDING + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Field: awards + - hugo + - "hugoAward" + - Field: title + - Where: + - FunctionExpression.equal: + - Field: hugoAward + - Constant: true + assert_results: + - hugoAward: true + title: The Hitchhiker's Guide to the Galaxy + - hugoAward: true + title: Dune + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - mapValue: + fields: + direction: + stringValue: descending + expression: + fieldReferenceValue: published + name: sort + - args: + - mapValue: + fields: + hugoAward: + functionValue: + args: + - fieldReferenceValue: awards + - stringValue: hugo + name: get_field + title: + fieldReferenceValue: title + name: select + - args: + - functionValue: + args: + - fieldReferenceValue: hugoAward + - booleanValue: true + name: equal + name: where + - description: testGetFieldWithField + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "Dune" + - AddFields: + - AliasedExpression: + - Constant: "hugo" + - "award_name" + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Field: awards + - Field: award_name + - "hugoAward" + - Field: title + assert_results: + - hugoAward: true + title: Dune + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: "Dune" + name: equal + name: where + - args: + - mapValue: + fields: + award_name: + stringValue: hugo + name: add_fields + - args: + - mapValue: + fields: + hugoAward: + functionValue: + args: + - fieldReferenceValue: awards + - fieldReferenceValue: award_name + name: get_field + title: + fieldReferenceValue: title + name: select diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py index 12a41aeaa069..6f5150edacfd 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py @@ -1466,6 +1466,16 @@ def test_vector_length(self): infix_instance = arg1.vector_length() assert infix_instance == instance + def test_get_field(self): + arg1 = self._make_arg("Map") + arg2 = self._make_arg("Key") + instance = Expression.get_field(arg1, arg2) + assert instance.name == "get_field" + assert instance.params == [arg1, arg2] + assert repr(instance) == "Map.get_field(Key)" + infix_instance = arg1.get_field(arg2) + assert infix_instance == instance + def test_add(self): arg1 = self._make_arg("Left") arg2 = self._make_arg("Right") From a08409ebeef25d70ed5a56677838db0b71ef7f6d Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 11:40:26 -0700 Subject: [PATCH 08/17] fixed error docstring; added test case --- .../google/cloud/firestore_v1/base_pipeline.py | 4 ++-- .../tests/system/pipeline_e2e/subqueries.yaml | 11 +++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index 328e64155155..2c1bd2865ebb 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -129,8 +129,8 @@ def to_scalar_expression(self) -> Expression: ... .to_scalar_expression().as_("average_rating") ... ) - Raises: - RuntimeError: If the result set contains more than one item. If the pipeline has zero results, it evaluates to `null` instead of raising an error. + **Runtime Validation:** + The runtime will validate that the result set contains exactly one item. It returns an error if the result has more than one item, and evaluates to `null` if the pipeline has zero results. Returns: An :class:`Expression` representing the execution of this pipeline. diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml index 7eed48ea8e73..4a236953d7c3 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -178,3 +178,14 @@ tests: title: fieldReferenceValue: title name: select + + - description: scalar_subquery_multiple_items_error + pipeline: + - Collection: books + - Limit: 1 + - AddFields: + - AliasedExpression: + - Pipeline.to_scalar_expression: + - Collection: books + - bad_scalar + assert_error: ".*more than one item.*" From 15a778940acdf0ad3c5b75e0d8712d81843bc574 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 11:57:39 -0700 Subject: [PATCH 09/17] updated docstrings --- .../google/cloud/firestore_v1/async_pipeline.py | 6 ++++++ .../google/cloud/firestore_v1/pipeline.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py index 4e7d07cb6874..0436c4848677 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py @@ -102,6 +102,9 @@ async def execute( explain_metrics will be available on the returned list. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} stream = AsyncPipelineStream(PipelineResult, self, **kwargs) @@ -134,6 +137,9 @@ def stream( explain_metrics will be available on the returned generator. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} return AsyncPipelineStream(PipelineResult, self, **kwargs) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py index 1103c195c882..075bcdc82a89 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py @@ -99,6 +99,9 @@ def execute( explain_metrics will be available on the returned list. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} stream = PipelineStream(PipelineResult, self, **kwargs) @@ -131,6 +134,9 @@ def stream( explain_metrics will be available on the returned generator. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} return PipelineStream(PipelineResult, self, **kwargs) From 6c9b65df9e8dcc33d3a8baacb5112ee175e0734b Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:34:30 -0700 Subject: [PATCH 10/17] added new SubPipeline class --- .../cloud/firestore_v1/async_pipeline.py | 2 + .../cloud/firestore_v1/base_pipeline.py | 44 +++++++++++++++++-- .../google/cloud/firestore_v1/pipeline.py | 2 + .../firestore_v1/pipeline_expressions.py | 6 ++- .../cloud/firestore_v1/pipeline_result.py | 2 +- .../cloud/firestore_v1/pipeline_source.py | 8 ++-- packages/google-cloud-firestore/mypy.ini | 10 ++--- .../tests/unit/v1/test_async_pipeline.py | 6 +++ .../tests/unit/v1/test_pipeline.py | 20 +++++++++ 9 files changed, 85 insertions(+), 15 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py index 0436c4848677..70bff213d555 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py @@ -66,6 +66,8 @@ class AsyncPipeline(_BasePipeline): subject to potential breaking changes in future releases """ + _client: AsyncClient + def __init__(self, client: AsyncClient, *stages: stages.Stage): """ Initializes an asynchronous Pipeline. diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index 2c1bd2865ebb..dc826c9363b2 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -14,7 +14,9 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Sequence, TypeVar, Type + +_T = TypeVar("_T", bound="_BasePipeline") from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_vector_query import DistanceMeasure @@ -46,7 +48,7 @@ class _BasePipeline: Use `client.pipeline()` to create pipeline instances. """ - def __init__(self, client: Client | AsyncClient): + def __init__(self, client: Client | AsyncClient | None): """ Initializes a new pipeline. @@ -61,8 +63,8 @@ def __init__(self, client: Client | AsyncClient): @classmethod def _create_with_stages( - cls, client: Client | AsyncClient, *stages - ) -> _BasePipeline: + cls: Type[_T], client: Client | AsyncClient | None, *stages + ) -> _T: """ Initializes a new pipeline with the given stages. @@ -438,9 +440,17 @@ def union(self, other: "_BasePipeline") -> "_BasePipeline": Args: other: The other `Pipeline` whose results will be unioned with this one. + Raises: + ValueError: If the `other` pipeline is a relative pipeline (e.g. created without a client). + Returns: A new Pipeline object with this stage appended to the stage list """ + if other._client is None: + raise ValueError( + "Union only supports combining root pipelines, doesn't support relative scope Pipeline " + "like relative subcollection pipeline" + ) return self._append(stages.Union(other)) def unnest( @@ -681,3 +691,29 @@ def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": A new Pipeline object with this stage appended to the stage list. """ return self._append(stages.Define(*aliased_expressions)) + + +class _SubPipeline(_BasePipeline): + """ + A pipeline scoped to a subcollection, created without a database client. + Cannot be executed directly; it must be used as a subquery within another pipeline. + """ + + _EXECUTE_ERROR_MSG = ( + "This pipeline was created without a database (e.g., as a subcollection pipeline) and " + "cannot be executed directly. It can only be used as part of another pipeline." + ) + + def execute(self, *args, **kwargs): + """ + Raises: + RuntimeError: Always, as a subcollection pipeline cannot be executed directly. + """ + raise RuntimeError(self._EXECUTE_ERROR_MSG) + + def stream(self, *args, **kwargs): + """ + Raises: + RuntimeError: Always, as a subcollection pipeline cannot be streamed directly. + """ + raise RuntimeError(self._EXECUTE_ERROR_MSG) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py index 075bcdc82a89..570c16389c2b 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py @@ -63,6 +63,8 @@ class Pipeline(_BasePipeline): subject to potential breaking changes in future releases. """ + _client: Client + def __init__(self, client: Client, *stages: stages.Stage): """ Initializes a Pipeline. diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 25b26e77975d..612a28a156eb 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -23,12 +23,16 @@ from abc import ABC, abstractmethod from enum import Enum from typing import ( + TYPE_CHECKING, Any, Generic, Sequence, TypeVar, ) +if TYPE_CHECKING: + from google.cloud.firestore_v1.base_pipeline import _BasePipeline + from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value from google.cloud.firestore_v1.types.document import Value from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb @@ -2679,7 +2683,7 @@ def _to_pb(self) -> Value: class _PipelineValueExpression(Expression): """Internal wrapper to represent a pipeline as an expression.""" - def __init__(self, pipeline: "google.cloud.firestore_v1.base_pipeline._BasePipeline"): + def __init__(self, pipeline: "_BasePipeline"): self.pipeline = pipeline def _to_pb(self) -> Value: diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py index 432c1dd8206d..1add0e53d54c 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py @@ -190,7 +190,7 @@ def __init__( ): # public self.transaction = transaction - self.pipeline: _BasePipeline = pipeline + self.pipeline: Pipeline | AsyncPipeline = pipeline self.execution_time: Timestamp | None = None # private self._client: Client | AsyncClient = pipeline._client diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index 5b16f7b67926..61497e4b482a 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -23,7 +23,7 @@ from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER -from google.cloud.firestore_v1.base_pipeline import _BasePipeline +from google.cloud.firestore_v1.base_pipeline import _BasePipeline, _SubPipeline if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient @@ -172,7 +172,7 @@ def literals( """ return self._create_pipeline(stages.Literals(*documents)) - def subcollection(self, path: str) -> PipelineType: + def subcollection(self, path: str) -> _SubPipeline: """ Initializes a pipeline scoped to a subcollection. @@ -193,6 +193,6 @@ def subcollection(self, path: str) -> PipelineType: path: The path of the subcollection. Returns: - A new :class:`Pipeline` instance scoped to the subcollection. + A new :class:`_SubPipeline` instance scoped to the subcollection. """ - return self._create_pipeline(stages.Subcollection(path)) + return _SubPipeline._create_with_stages(None, stages.Subcollection(path)) diff --git a/packages/google-cloud-firestore/mypy.ini b/packages/google-cloud-firestore/mypy.ini index e0e0da2e9e40..f16b72c620ba 100644 --- a/packages/google-cloud-firestore/mypy.ini +++ b/packages/google-cloud-firestore/mypy.ini @@ -3,13 +3,13 @@ python_version = 3.14 namespace_packages = True ignore_missing_imports = False -# TODO(https://github.com/googleapis/gapic-generator-python/issues/2563): -# Dependencies that historically lacks py.typed markers -[mypy-google.iam.*] -ignore_missing_imports = True - # Helps mypy navigate the 'google' namespace more reliably in 3.10+ explicit_package_bases = True # Performance: reuse results from previous runs to speed up 'nox' incremental = True + +# TODO(https://github.com/googleapis/gapic-generator-python/issues/2563): +# Dependencies that historically lacks py.typed markers +[mypy-google.iam.*] +ignore_missing_imports = True diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py index 4059355c687d..9899cf1f6c1d 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py @@ -448,3 +448,9 @@ def test_async_pipeline_aggregate_with_groups(): assert isinstance(result_ppl.stages[0], stages.Aggregate) assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + +def test_async_pipeline_union_relative_error(): + start_ppl = _make_async_pipeline(client=mock.Mock()) + other_ppl = _make_async_pipeline(client=None) + with pytest.raises(ValueError, match="Union only supports combining root pipelines"): + start_ppl.union(other_ppl) diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py index 5953398709a3..90603fcd031c 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py @@ -437,3 +437,23 @@ def test_pipeline_aggregate_with_groups(): assert isinstance(result_ppl.stages[0], stages.Aggregate) assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + +def test_pipeline_union_relative_error(): + start_ppl = _make_pipeline(client=mock.Mock()) + other_ppl = _make_pipeline(client=None) + with pytest.raises(ValueError, match="Union only supports combining root pipelines"): + start_ppl.union(other_ppl) + +def test_subpipeline_execute_error(): + from google.cloud.firestore_v1.base_pipeline import _SubPipeline + + ppl = _SubPipeline._create_with_stages(None) + with pytest.raises(RuntimeError, match="This pipeline was created without a database"): + ppl.execute() + +def test_subpipeline_stream_error(): + from google.cloud.firestore_v1.base_pipeline import _SubPipeline + + ppl = _SubPipeline._create_with_stages(None) + with pytest.raises(RuntimeError, match="This pipeline was created without a database"): + ppl.stream() From 604e10d8bbca56330e47a188539a7e736e7bb658 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:35:48 -0700 Subject: [PATCH 11/17] male SubPipeline public --- .../google/cloud/firestore_v1/base_pipeline.py | 2 +- .../google/cloud/firestore_v1/pipeline_source.py | 8 ++++---- .../google-cloud-firestore/tests/unit/v1/test_pipeline.py | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index dc826c9363b2..e36c98cea6fd 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -693,7 +693,7 @@ def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": return self._append(stages.Define(*aliased_expressions)) -class _SubPipeline(_BasePipeline): +class SubPipeline(_BasePipeline): """ A pipeline scoped to a subcollection, created without a database client. Cannot be executed directly; it must be used as a subquery within another pipeline. diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index 61497e4b482a..f63139be57cf 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -23,7 +23,7 @@ from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER -from google.cloud.firestore_v1.base_pipeline import _BasePipeline, _SubPipeline +from google.cloud.firestore_v1.base_pipeline import _BasePipeline, SubPipeline if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient @@ -172,7 +172,7 @@ def literals( """ return self._create_pipeline(stages.Literals(*documents)) - def subcollection(self, path: str) -> _SubPipeline: + def subcollection(self, path: str) -> SubPipeline: """ Initializes a pipeline scoped to a subcollection. @@ -193,6 +193,6 @@ def subcollection(self, path: str) -> _SubPipeline: path: The path of the subcollection. Returns: - A new :class:`_SubPipeline` instance scoped to the subcollection. + A new :class:`SubPipeline` instance scoped to the subcollection. """ - return _SubPipeline._create_with_stages(None, stages.Subcollection(path)) + return SubPipeline._create_with_stages(None, stages.Subcollection(path)) diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py index 90603fcd031c..4c05a043ba90 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py @@ -445,15 +445,15 @@ def test_pipeline_union_relative_error(): start_ppl.union(other_ppl) def test_subpipeline_execute_error(): - from google.cloud.firestore_v1.base_pipeline import _SubPipeline + from google.cloud.firestore_v1.base_pipeline import SubPipeline - ppl = _SubPipeline._create_with_stages(None) + ppl = SubPipeline._create_with_stages(None) with pytest.raises(RuntimeError, match="This pipeline was created without a database"): ppl.execute() def test_subpipeline_stream_error(): - from google.cloud.firestore_v1.base_pipeline import _SubPipeline + from google.cloud.firestore_v1.base_pipeline import SubPipeline - ppl = _SubPipeline._create_with_stages(None) + ppl = SubPipeline._create_with_stages(None) with pytest.raises(RuntimeError, match="This pipeline was created without a database"): ppl.stream() From 1f9e70caac9cfeadf46a419d836aacf3cbeda456 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:41:06 -0700 Subject: [PATCH 12/17] fixed lint --- .../google/cloud/firestore_v1/base_pipeline.py | 3 ++- .../google/cloud/firestore_v1/pipeline_result.py | 1 - .../tests/unit/v1/test_async_pipeline.py | 5 ++++- .../tests/unit/v1/test_pipeline.py | 15 ++++++++++++--- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index e36c98cea6fd..aa5db35a41dc 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -16,7 +16,6 @@ from typing import TYPE_CHECKING, Sequence, TypeVar, Type -_T = TypeVar("_T", bound="_BasePipeline") from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_vector_query import DistanceMeasure @@ -39,6 +38,8 @@ from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.client import Client +_T = TypeVar("_T", bound="_BasePipeline") + class _BasePipeline: """ diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py index 1add0e53d54c..7bf17bed40e4 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py @@ -49,7 +49,6 @@ from google.cloud.firestore_v1.async_transaction import AsyncTransaction from google.cloud.firestore_v1.base_client import BaseClient from google.cloud.firestore_v1.base_document import BaseDocumentReference - from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.pipeline import Pipeline from google.cloud.firestore_v1.pipeline_expressions import Constant diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py index 9899cf1f6c1d..402ab42ee732 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py @@ -449,8 +449,11 @@ def test_async_pipeline_aggregate_with_groups(): assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + def test_async_pipeline_union_relative_error(): start_ppl = _make_async_pipeline(client=mock.Mock()) other_ppl = _make_async_pipeline(client=None) - with pytest.raises(ValueError, match="Union only supports combining root pipelines"): + with pytest.raises( + ValueError, match="Union only supports combining root pipelines" + ): start_ppl.union(other_ppl) diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py index 4c05a043ba90..82d89a12f978 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py @@ -438,22 +438,31 @@ def test_pipeline_aggregate_with_groups(): assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + def test_pipeline_union_relative_error(): start_ppl = _make_pipeline(client=mock.Mock()) other_ppl = _make_pipeline(client=None) - with pytest.raises(ValueError, match="Union only supports combining root pipelines"): + with pytest.raises( + ValueError, match="Union only supports combining root pipelines" + ): start_ppl.union(other_ppl) + def test_subpipeline_execute_error(): from google.cloud.firestore_v1.base_pipeline import SubPipeline ppl = SubPipeline._create_with_stages(None) - with pytest.raises(RuntimeError, match="This pipeline was created without a database"): + with pytest.raises( + RuntimeError, match="This pipeline was created without a database" + ): ppl.execute() + def test_subpipeline_stream_error(): from google.cloud.firestore_v1.base_pipeline import SubPipeline ppl = SubPipeline._create_with_stages(None) - with pytest.raises(RuntimeError, match="This pipeline was created without a database"): + with pytest.raises( + RuntimeError, match="This pipeline was created without a database" + ): ppl.stream() From 9e4cb07e0cf275798477359a5e3a3933dade88cb Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:41:27 -0700 Subject: [PATCH 13/17] added e2e test for failed union --- .../tests/system/pipeline_e2e/general.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml index 1b03beb4a3c1..3a59fbc847ee 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml @@ -743,4 +743,11 @@ tests: fields: res: fieldReferenceValue: res - name: select \ No newline at end of file + name: select + - description: union_subpipeline_error + pipeline: + - Collection: books + - Union: + - Pipeline: + - Subcollection: reviews + assert_error: ".*root pipelines.*" \ No newline at end of file From b1f87efcf9c8693c30b1de7523b4ae9b7c5bc2d9 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:48:33 -0700 Subject: [PATCH 14/17] updating docstrings --- .../google/cloud/firestore_v1/pipeline_expressions.py | 2 ++ .../google/cloud/firestore_v1/pipeline_source.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 612a28a156eb..b98ba574c79a 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -174,6 +174,8 @@ def get_field(self, key: Expression | str) -> "Expression": Example: >>> # Access the 'city' field from the 'address' map field. >>> Field.of("address").get_field("city") + >>> # Create a map and access a field from it. + >>> Map({"foo": "bar"}).get_field("foo") Args: key: The key of the field to access. diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index f63139be57cf..9319c34beae8 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -168,7 +168,7 @@ def literals( *documents: One or more documents to be returned by this stage. Each can be a `dict` of values of `Expression` or `CONSTANT_TYPE` types. Returns: - A new Pipeline object with this stage appended to the stage list. + A new pipeline instance targeting the specified literal documents """ return self._create_pipeline(stages.Literals(*documents)) @@ -193,6 +193,6 @@ def subcollection(self, path: str) -> SubPipeline: path: The path of the subcollection. Returns: - A new :class:`SubPipeline` instance scoped to the subcollection. + A new pipeline instance targeting the specified subcollection """ return SubPipeline._create_with_stages(None, stages.Subcollection(path)) From 6f9d8dd0fd025e40983e462429be5ad75a568cfc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:55:50 -0700 Subject: [PATCH 15/17] reordered file --- .../firestore_v1/pipeline_expressions.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index e706b1a87ee2..630258f9cadd 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -2734,6 +2734,17 @@ def _from_query_filter_pb(filter_pb, client): raise TypeError(f"Unexpected filter type: {type(filter_pb)}") +class _PipelineValueExpression(Expression): + """Internal wrapper to represent a pipeline as an expression.""" + + def __init__(self, pipeline: "_BasePipeline"): + self.pipeline = pipeline + + def _to_pb(self) -> Value: + pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages]) + return Value(pipeline_value=pipeline_pb) + + class Array(FunctionExpression): """ Creates an expression that creates a Firestore array value from an input list. @@ -2937,17 +2948,6 @@ def _to_pb(self) -> Value: return Value(variable_reference_value=self.name) -class _PipelineValueExpression(Expression): - """Internal wrapper to represent a pipeline as an expression.""" - - def __init__(self, pipeline: "_BasePipeline"): - self.pipeline = pipeline - - def _to_pb(self) -> Value: - pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages]) - return Value(pipeline_value=pipeline_pb) - - class CurrentDocument(FunctionExpression): """ Creates an expression that represents the current document being processed. From cc296ac67e1daca57022ccab66c713db9af000b4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 12:56:02 -0700 Subject: [PATCH 16/17] added test case --- .../tests/system/pipeline_e2e/map.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml index caeaaa8ddbb0..93b8fc20d1c3 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml @@ -494,3 +494,16 @@ tests: assert_results: - hugoAward: true title: Dune + - description: testGetFieldWithMap + pipeline: + - Collection: books + - Limit: 1 + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Map: + elements: {"foo": "bar"} + - "foo" + - "foo_value" + assert_results: + - foo_value: "bar" From d49efd4308303f1f3b051e121f94da6c5610f3fc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 31 Mar 2026 13:09:39 -0700 Subject: [PATCH 17/17] updating test yaml --- .../tests/system/pipeline_e2e/general.yaml | 2 +- .../tests/system/pipeline_e2e/logical.yaml | 3 ++- .../tests/system/pipeline_e2e/map.yaml | 8 ++------ .../tests/system/pipeline_e2e/subqueries.yaml | 2 +- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml index b0b466571e58..4063d8b971ca 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml @@ -771,4 +771,4 @@ tests: - Union: - Pipeline: - Subcollection: reviews - assert_error: ".*root pipelines.*" + assert_error: ".*start of a nested pipeline.*" diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml index a3de3a4a07fe..d9f96cd3cd65 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml @@ -712,7 +712,8 @@ tests: - Where: - FunctionExpression.equal: - Field: value - - Constant: null + - Constant: + value: null - Select: - AliasedExpression: - FunctionExpression.if_null: diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml index 93b8fc20d1c3..2166e14a5776 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml @@ -162,7 +162,8 @@ tests: - FunctionExpression.map_set: - Field: awards - "hugo" - - Constant: null + - Constant: + value: null - "awards_set" assert_results: - awards_set: @@ -468,11 +469,6 @@ tests: - FunctionExpression.equal: - Field: hugoAward - Constant: true - assert_results: - - hugoAward: true - title: The Hitchhiker's Guide to the Galaxy - - hugoAward: true - title: Dune - description: testGetFieldWithField pipeline: - Collection: books diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml index 4a236953d7c3..8d7efe8c633b 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -188,4 +188,4 @@ tests: - Pipeline.to_scalar_expression: - Collection: books - bad_scalar - assert_error: ".*more than one item.*" + assert_error: ".*multiple results.*"