diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py index 4e7d07cb6874..70bff213d555 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/async_pipeline.py @@ -66,6 +66,8 @@ class AsyncPipeline(_BasePipeline): subject to potential breaking changes in future releases """ + _client: AsyncClient + def __init__(self, client: AsyncClient, *stages: stages.Stage): """ Initializes an asynchronous Pipeline. @@ -102,6 +104,9 @@ async def execute( explain_metrics will be available on the returned list. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} stream = AsyncPipelineStream(PipelineResult, self, **kwargs) @@ -134,6 +139,9 @@ def stream( explain_metrics will be available on the returned generator. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} return AsyncPipelineStream(PipelineResult, self, **kwargs) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index fac7f8bc4bce..aa5db35a41dc 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -14,7 +14,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Sequence, TypeVar, Type + from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_vector_query import DistanceMeasure @@ -25,6 +26,8 @@ Expression, Field, Selectable, + FunctionExpression, + _PipelineValueExpression, ) from google.cloud.firestore_v1.types.pipeline import ( StructuredPipeline as StructuredPipeline_pb, @@ -35,6 +38,8 @@ from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.client import Client +_T = TypeVar("_T", bound="_BasePipeline") + class _BasePipeline: """ @@ -44,7 +49,7 @@ class _BasePipeline: Use `client.pipeline()` to create pipeline instances. """ - def __init__(self, client: Client | AsyncClient): + def __init__(self, client: Client | AsyncClient | None): """ Initializes a new pipeline. @@ -59,8 +64,8 @@ def __init__(self, client: Client | AsyncClient): @classmethod def _create_with_stages( - cls, client: Client | AsyncClient, *stages - ) -> _BasePipeline: + cls: Type[_T], client: Client | AsyncClient | None, *stages + ) -> _T: """ Initializes a new pipeline with the given stages. @@ -90,6 +95,51 @@ def _to_pb(self, **options) -> StructuredPipeline_pb: options=options, ) + def to_array_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to an array of results. + Used for embedding 1:N subqueries into stages like `addFields`. + + Example: + >>> # Get a list of all reviewer names for each book + >>> db.pipeline().collection("books").define(Field.of("id").as_("book_id")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("book_id").equal(Variable("book_id"))) + ... .select(Field.of("reviewer").as_("name")) + ... .to_array_expression().as_("reviewers") + ... ) + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("array", [_PipelineValueExpression(self)]) + + def to_scalar_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to a single scalar result. + Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. + + **Result Unwrapping:** + For simpler access, scalar subqueries producing a single field automatically unwrap that value to the + top level, ignoring the inner alias. If the subquery returns multiple fields, they are preserved as a map. + + Example: + >>> # Calculate average rating for each restaurant using a subquery + >>> db.pipeline().collection("restaurants").define(Field.of("id").as_("rid")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("restaurant_id").equal(Variable("rid"))) + ... .aggregate(AggregateFunction.average("rating").as_("value")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + **Runtime Validation:** + The runtime will validate that the result set contains exactly one item. It returns an error if the result has more than one item, and evaluates to `null` if the pipeline has zero results. + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("scalar", [_PipelineValueExpression(self)]) + def _append(self, new_stage): """ Create a new Pipeline object with a new stage appended @@ -391,9 +441,17 @@ def union(self, other: "_BasePipeline") -> "_BasePipeline": Args: other: The other `Pipeline` whose results will be unioned with this one. + Raises: + ValueError: If the `other` pipeline is a relative pipeline (e.g. created without a client). + Returns: A new Pipeline object with this stage appended to the stage list """ + if other._client is None: + raise ValueError( + "Union only supports combining root pipelines, doesn't support relative scope Pipeline " + "like relative subcollection pipeline" + ) return self._append(stages.Union(other)) def unnest( @@ -610,3 +668,53 @@ def distinct(self, *fields: str | Selectable) -> "_BasePipeline": A new Pipeline object with this stage appended to the stage list """ return self._append(stages.Distinct(*fields)) + + def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": + """ + Binds one or more expressions to Variables that can be accessed in subsequent stages + or inner subqueries using `Variable`. + + Each Variable is defined using an :class:`AliasedExpression`, which pairs an expression with + a name (alias). + + Example: + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice"), + ... Field.of("stock").add(10).as_("newStock") + ... ).where( + ... Variable("discountedPrice").less_than(100) + ... ).select(Field.of("name"), Variable("newStock")) + + Args: + *aliased_expressions: One or more :class:`AliasedExpression` defining the Variable names and values. + + Returns: + A new Pipeline object with this stage appended to the stage list. + """ + return self._append(stages.Define(*aliased_expressions)) + + +class SubPipeline(_BasePipeline): + """ + A pipeline scoped to a subcollection, created without a database client. + Cannot be executed directly; it must be used as a subquery within another pipeline. + """ + + _EXECUTE_ERROR_MSG = ( + "This pipeline was created without a database (e.g., as a subcollection pipeline) and " + "cannot be executed directly. It can only be used as part of another pipeline." + ) + + def execute(self, *args, **kwargs): + """ + Raises: + RuntimeError: Always, as a subcollection pipeline cannot be executed directly. + """ + raise RuntimeError(self._EXECUTE_ERROR_MSG) + + def stream(self, *args, **kwargs): + """ + Raises: + RuntimeError: Always, as a subcollection pipeline cannot be streamed directly. + """ + raise RuntimeError(self._EXECUTE_ERROR_MSG) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py index 1103c195c882..570c16389c2b 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline.py @@ -63,6 +63,8 @@ class Pipeline(_BasePipeline): subject to potential breaking changes in future releases. """ + _client: Client + def __init__(self, client: Client, *stages: stages.Stage): """ Initializes a Pipeline. @@ -99,6 +101,9 @@ def execute( explain_metrics will be available on the returned list. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} stream = PipelineStream(PipelineResult, self, **kwargs) @@ -131,6 +136,9 @@ def stream( explain_metrics will be available on the returned generator. additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options) + + Raises: + google.api_core.exceptions.GoogleAPIError: If there is a backend error. """ kwargs = {k: v for k, v in locals().items() if k != "self"} return PipelineStream(PipelineResult, self, **kwargs) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 969ddf2794a5..630258f9cadd 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -23,14 +23,19 @@ from abc import ABC, abstractmethod from enum import Enum from typing import ( + TYPE_CHECKING, Any, Generic, Sequence, TypeVar, ) +if TYPE_CHECKING: + from google.cloud.firestore_v1.base_pipeline import _BasePipeline + from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value from google.cloud.firestore_v1.types.document import Value +from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb from google.cloud.firestore_v1.types.query import StructuredQuery as Query_pb from google.cloud.firestore_v1.vector import Vector @@ -258,6 +263,26 @@ def __get__(self, instance, owner): else: return self.instance_func.__get__(instance, owner) + @expose_as_static + def get_field(self, key: Expression | str) -> "Expression": + """Accesses a field/property of the expression that evaluates to a Map or Document. + + Example: + >>> # Access the 'city' field from the 'address' map field. + >>> Field.of("address").get_field("city") + >>> # Create a map and access a field from it. + >>> Map({"foo": "bar"}).get_field("foo") + + Args: + key: The key of the field to access. + + Returns: + A new `Expression` representing the value of the field. + """ + return FunctionExpression( + "get_field", [self, self._cast_to_expr_or_convert_to_constant(key)] + ) + @expose_as_static def add(self, other: Expression | float) -> "Expression": """Creates an expression that adds this expression to another expression or constant. @@ -2709,6 +2734,17 @@ def _from_query_filter_pb(filter_pb, client): raise TypeError(f"Unexpected filter type: {type(filter_pb)}") +class _PipelineValueExpression(Expression): + """Internal wrapper to represent a pipeline as an expression.""" + + def __init__(self, pipeline: "_BasePipeline"): + self.pipeline = pipeline + + def _to_pb(self) -> Value: + pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages]) + return Value(pipeline_value=pipeline_pb) + + class Array(FunctionExpression): """ Creates an expression that creates a Firestore array value from an input list. @@ -2889,3 +2925,42 @@ class Rand(FunctionExpression): def __init__(self): super().__init__("rand", [], use_infix_repr=False) + + +class Variable(Expression): + """ + Creates an expression that retrieves the value of a variable bound via `Pipeline.define`. + + Example: + >>> # Define a variable "discountedPrice" and use it in a filter + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice") + ... ).where(Variable("discountedPrice").less_than(100)) + + Args: + name: The name of the variable to retrieve. + """ + + def __init__(self, name: str): + self.name = name + + def _to_pb(self) -> Value: + return Value(variable_reference_value=self.name) + + +class CurrentDocument(FunctionExpression): + """ + Creates an expression that represents the current document being processed. + + This acts as a handle, allowing you to bind the entire document to a variable or pass the + document itself to a function or subquery. + + Example: + >>> # Define the current document as a variable "doc" + >>> db.pipeline().collection("books").define( + ... CurrentDocument().as_("doc") + ... ).select(Variable("doc").get_field("title")) + """ + + def __init__(self): + super().__init__("current_document", []) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py index 432c1dd8206d..7bf17bed40e4 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_result.py @@ -49,7 +49,6 @@ from google.cloud.firestore_v1.async_transaction import AsyncTransaction from google.cloud.firestore_v1.base_client import BaseClient from google.cloud.firestore_v1.base_document import BaseDocumentReference - from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.pipeline import Pipeline from google.cloud.firestore_v1.pipeline_expressions import Constant @@ -190,7 +189,7 @@ def __init__( ): # public self.transaction = transaction - self.pipeline: _BasePipeline = pipeline + self.pipeline: Pipeline | AsyncPipeline = pipeline self.execution_time: Timestamp | None = None # private self._client: Client | AsyncClient = pipeline._client diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index 7075797b3d57..9319c34beae8 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -23,7 +23,7 @@ from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER -from google.cloud.firestore_v1.base_pipeline import _BasePipeline +from google.cloud.firestore_v1.base_pipeline import _BasePipeline, SubPipeline if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient @@ -168,6 +168,31 @@ def literals( *documents: One or more documents to be returned by this stage. Each can be a `dict` of values of `Expression` or `CONSTANT_TYPE` types. Returns: - A new Pipeline object with this stage appended to the stage list. + A new pipeline instance targeting the specified literal documents """ return self._create_pipeline(stages.Literals(*documents)) + + def subcollection(self, path: str) -> SubPipeline: + """ + Initializes a pipeline scoped to a subcollection. + + This method allows you to start a new pipeline that operates on a subcollection of the + current document. It is intended to be used as a subquery. + + **Note:** A pipeline created with `subcollection` cannot be executed directly using + `execute()`. It must be used within a parent pipeline. + + Example: + >>> db.pipeline().collection("books").add_fields( + ... db.pipeline().subcollection("reviews") + ... .aggregate(AggregateFunction.average("rating").as_("avg_rating")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + Args: + path: The path of the subcollection. + + Returns: + A new pipeline instance targeting the specified subcollection + """ + return SubPipeline._create_with_stages(None, stages.Subcollection(path)) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py index cac9c70d4b99..7a2d8d3b08af 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py @@ -494,3 +494,25 @@ def __init__(self, condition: BooleanExpression): def _pb_args(self): return [self.condition._to_pb()] + + +class Define(Stage): + """Binds one or more expressions to variables.""" + + def __init__(self, *expressions: AliasedExpression): + super().__init__("let") + self.expressions = list(expressions) + + def _pb_args(self) -> list[Value]: + return [Selectable._to_value(self.expressions)] + + +class Subcollection(Stage): + """Targets a subcollection relative to the current document.""" + + def __init__(self, path: str): + super().__init__("subcollection") + self.path = path + + def _pb_args(self) -> list[Value]: + return [encode_value(self.path)] diff --git a/packages/google-cloud-firestore/mypy.ini b/packages/google-cloud-firestore/mypy.ini index e0e0da2e9e40..f16b72c620ba 100644 --- a/packages/google-cloud-firestore/mypy.ini +++ b/packages/google-cloud-firestore/mypy.ini @@ -3,13 +3,13 @@ python_version = 3.14 namespace_packages = True ignore_missing_imports = False -# TODO(https://github.com/googleapis/gapic-generator-python/issues/2563): -# Dependencies that historically lacks py.typed markers -[mypy-google.iam.*] -ignore_missing_imports = True - # Helps mypy navigate the 'google' namespace more reliably in 3.10+ explicit_package_bases = True # Performance: reuse results from previous runs to speed up 'nox' incremental = True + +# TODO(https://github.com/googleapis/gapic-generator-python/issues/2563): +# Dependencies that historically lacks py.typed markers +[mypy-google.iam.*] +ignore_missing_imports = True diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml index b14e636d1ebc..4063d8b971ca 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/general.yaml @@ -765,3 +765,10 @@ tests: res: fieldReferenceValue: res name: select + - description: union_subpipeline_error + pipeline: + - Collection: books + - Union: + - Pipeline: + - Subcollection: reviews + assert_error: ".*start of a nested pipeline.*" diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml index a3de3a4a07fe..d9f96cd3cd65 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/logical.yaml @@ -712,7 +712,8 @@ tests: - Where: - FunctionExpression.equal: - Field: value - - Constant: null + - Constant: + value: null - Select: - AliasedExpression: - FunctionExpression.if_null: diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml index 220282b1a8de..2166e14a5776 100644 --- a/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/map.yaml @@ -162,7 +162,8 @@ tests: - FunctionExpression.map_set: - Field: awards - "hugo" - - Constant: null + - Constant: + value: null - "awards_set" assert_results: - awards_set: @@ -450,37 +451,55 @@ tests: - award_entries: k: nebula v: true - assert_proto: - pipeline: - stages: - - args: - - referenceValue: /books - name: collection - - args: - - functionValue: - args: - - fieldReferenceValue: title - - stringValue: "Dune" - name: equal - name: where - - args: - - mapValue: - fields: - award_entries: - functionValue: - name: map_entries - args: - - fieldReferenceValue: awards - name: select - - args: - - fieldReferenceValue: award_entries - - fieldReferenceValue: award_entries - name: unnest - - args: - - mapValue: - fields: - direction: - stringValue: ascending - expression: - fieldReferenceValue: award_entries.k - name: sort + - description: testGetField + pipeline: + - Collection: books + - Sort: + - Ordering: + - Field: published + - DESCENDING + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Field: awards + - hugo + - "hugoAward" + - Field: title + - Where: + - FunctionExpression.equal: + - Field: hugoAward + - Constant: true + - description: testGetFieldWithField + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "Dune" + - AddFields: + - AliasedExpression: + - Constant: "hugo" + - "award_name" + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Field: awards + - Field: award_name + - "hugoAward" + - Field: title + assert_results: + - hugoAward: true + title: Dune + - description: testGetFieldWithMap + pipeline: + - Collection: books + - Limit: 1 + - Select: + - AliasedExpression: + - FunctionExpression.get_field: + - Map: + elements: {"foo": "bar"} + - "foo" + - "foo_value" + assert_results: + - foo_value: "bar" diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml new file mode 100644 index 000000000000..8d7efe8c633b --- /dev/null +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -0,0 +1,191 @@ +tests: + - description: array_subquery_with_variable + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "1984" + - Define: + - AliasedExpression: + - Field: genre + - target_genre + - AddFields: + - AliasedExpression: + - Pipeline.to_array_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - Variable: target_genre + - Select: + - Field: title + - same_genre_books + - Select: + - Field: title + - Field: same_genre_books + assert_results: + - title: "1984" + same_genre_books: + - "1984" + - "The Handmaid's Tale" + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + target_genre: + fieldReferenceValue: genre + name: let + - args: + - mapValue: + fields: + same_genre_books: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - variableReferenceValue: target_genre + name: equal + name: where + - args: + - mapValue: + fields: + title: + fieldReferenceValue: title + name: select + name: array + name: add_fields + - args: + - mapValue: + fields: + same_genre_books: + fieldReferenceValue: same_genre_books + title: + fieldReferenceValue: title + name: select + + - description: scalar_subquery_with_current_document + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "1984" + - Define: + - AliasedExpression: + - CurrentDocument: + - doc + - AddFields: + - AliasedExpression: + - Pipeline.to_scalar_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - FunctionExpression.map_get: + - Variable: doc + - Constant: "genre" + - Aggregate: + - AliasedExpression: + - FunctionExpression.average: + - Field: rating + - avg_rating + - average_rating + - Select: + - Field: title + - Field: average_rating + assert_results: + - title: "1984" + average_rating: 4.15 + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + doc: + functionValue: + name: current_document + name: let + - args: + - mapValue: + fields: + average_rating: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - functionValue: + args: + - variableReferenceValue: doc + - stringValue: genre + name: map_get + name: equal + name: where + - args: + - mapValue: + fields: + avg_rating: + functionValue: + args: + - fieldReferenceValue: rating + name: average + - mapValue: {} + name: aggregate + name: scalar + name: add_fields + - args: + - mapValue: + fields: + average_rating: + fieldReferenceValue: average_rating + title: + fieldReferenceValue: title + name: select + + - description: scalar_subquery_multiple_items_error + pipeline: + - Collection: books + - Limit: 1 + - AddFields: + - AliasedExpression: + - Pipeline.to_scalar_expression: + - Collection: books + - bad_scalar + assert_error: ".*multiple results.*" diff --git a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py index afff43ac6950..4619130dfee8 100644 --- a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py +++ b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py @@ -260,6 +260,18 @@ def _parse_expressions(client, yaml_element: Any): # find Pipeline objects for Union expressions other_ppl = yaml_element["Pipeline"] return parse_pipeline(client, other_ppl) + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_array_expression" + ): + other_ppl = yaml_element["Pipeline.to_array_expression"] + return parse_pipeline(client, other_ppl).to_array_expression() + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_scalar_expression" + ): + other_ppl = yaml_element["Pipeline.to_scalar_expression"] + return parse_pipeline(client, other_ppl).to_scalar_expression() else: # otherwise, return dict return { @@ -288,6 +300,8 @@ def _apply_yaml_args_to_callable(callable_obj, client, yaml_args): ): # yaml has an array of arguments. Treat as args return callable_obj(*_parse_expressions(client, yaml_args)) + elif yaml_args is None: + return callable_obj() else: # yaml has a single argument return callable_obj(_parse_expressions(client, yaml_args)) diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py index 4059355c687d..402ab42ee732 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_async_pipeline.py @@ -448,3 +448,12 @@ def test_async_pipeline_aggregate_with_groups(): assert isinstance(result_ppl.stages[0], stages.Aggregate) assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + + +def test_async_pipeline_union_relative_error(): + start_ppl = _make_async_pipeline(client=mock.Mock()) + other_ppl = _make_async_pipeline(client=None) + with pytest.raises( + ValueError, match="Union only supports combining root pipelines" + ): + start_ppl.union(other_ppl) diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py index 5953398709a3..82d89a12f978 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline.py @@ -437,3 +437,32 @@ def test_pipeline_aggregate_with_groups(): assert isinstance(result_ppl.stages[0], stages.Aggregate) assert list(result_ppl.stages[0].groups) == [Field.of("author")] assert list(result_ppl.stages[0].accumulators) == [Field.of("title")] + + +def test_pipeline_union_relative_error(): + start_ppl = _make_pipeline(client=mock.Mock()) + other_ppl = _make_pipeline(client=None) + with pytest.raises( + ValueError, match="Union only supports combining root pipelines" + ): + start_ppl.union(other_ppl) + + +def test_subpipeline_execute_error(): + from google.cloud.firestore_v1.base_pipeline import SubPipeline + + ppl = SubPipeline._create_with_stages(None) + with pytest.raises( + RuntimeError, match="This pipeline was created without a database" + ): + ppl.execute() + + +def test_subpipeline_stream_error(): + from google.cloud.firestore_v1.base_pipeline import SubPipeline + + ppl = SubPipeline._create_with_stages(None) + with pytest.raises( + RuntimeError, match="This pipeline was created without a database" + ): + ppl.stream() diff --git a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py index 98db3c3a8f17..b285e8e4b614 100644 --- a/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py +++ b/packages/google-cloud-firestore/tests/unit/v1/test_pipeline_expressions.py @@ -1466,6 +1466,16 @@ def test_vector_length(self): infix_instance = arg1.vector_length() assert infix_instance == instance + def test_get_field(self): + arg1 = self._make_arg("Map") + arg2 = self._make_arg("Key") + instance = Expression.get_field(arg1, arg2) + assert instance.name == "get_field" + assert instance.params == [arg1, arg2] + assert repr(instance) == "Map.get_field(Key)" + infix_instance = arg1.get_field(arg2) + assert infix_instance == instance + def test_add(self): arg1 = self._make_arg("Left") arg2 = self._make_arg("Right")