Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class AsyncPipeline(_BasePipeline):
subject to potential breaking changes in future releases
"""

_client: AsyncClient

def __init__(self, client: AsyncClient, *stages: stages.Stage):
"""
Initializes an asynchronous Pipeline.
Expand Down Expand Up @@ -102,6 +104,9 @@ async def execute(
explain_metrics will be available on the returned list.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)

Raises:
google.api_core.exceptions.GoogleAPIError: If there is a backend error.
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
Expand Down Expand Up @@ -134,6 +139,9 @@ def stream(
explain_metrics will be available on the returned generator.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)

Raises:
google.api_core.exceptions.GoogleAPIError: If there is a backend error.
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
return AsyncPipelineStream(PipelineResult, self, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence
from typing import TYPE_CHECKING, Sequence, TypeVar, Type


from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
Expand All @@ -25,6 +26,8 @@
Expression,
Field,
Selectable,
FunctionExpression,
_PipelineValueExpression,
)
from google.cloud.firestore_v1.types.pipeline import (
StructuredPipeline as StructuredPipeline_pb,
Expand All @@ -35,6 +38,8 @@
from google.cloud.firestore_v1.async_client import AsyncClient
from google.cloud.firestore_v1.client import Client

_T = TypeVar("_T", bound="_BasePipeline")


class _BasePipeline:
"""
Expand All @@ -44,7 +49,7 @@ class _BasePipeline:
Use `client.pipeline()` to create pipeline instances.
"""

def __init__(self, client: Client | AsyncClient):
def __init__(self, client: Client | AsyncClient | None):
"""
Initializes a new pipeline.

Expand All @@ -59,8 +64,8 @@ def __init__(self, client: Client | AsyncClient):

@classmethod
def _create_with_stages(
cls, client: Client | AsyncClient, *stages
) -> _BasePipeline:
cls: Type[_T], client: Client | AsyncClient | None, *stages
) -> _T:
"""
Initializes a new pipeline with the given stages.

Expand Down Expand Up @@ -90,6 +95,51 @@ def _to_pb(self, **options) -> StructuredPipeline_pb:
options=options,
)

def to_array_expression(self) -> Expression:
"""
Converts this Pipeline into an expression that evaluates to an array of results.
Used for embedding 1:N subqueries into stages like `addFields`.

Example:
>>> # Get a list of all reviewer names for each book
>>> db.pipeline().collection("books").define(Field.of("id").as_("book_id")).add_fields(
... db.pipeline().collection("reviews")
... .where(Field.of("book_id").equal(Variable("book_id")))
... .select(Field.of("reviewer").as_("name"))
... .to_array_expression().as_("reviewers")
... )

Returns:
An :class:`Expression` representing the execution of this pipeline.
"""
return FunctionExpression("array", [_PipelineValueExpression(self)])

def to_scalar_expression(self) -> Expression:
"""
Converts this Pipeline into an expression that evaluates to a single scalar result.
Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object.

**Result Unwrapping:**
For simpler access, scalar subqueries producing a single field automatically unwrap that value to the
top level, ignoring the inner alias. If the subquery returns multiple fields, they are preserved as a map.

Example:
>>> # Calculate average rating for each restaurant using a subquery
>>> db.pipeline().collection("restaurants").define(Field.of("id").as_("rid")).add_fields(
... db.pipeline().collection("reviews")
... .where(Field.of("restaurant_id").equal(Variable("rid")))
... .aggregate(AggregateFunction.average("rating").as_("value"))
... .to_scalar_expression().as_("average_rating")
... )

**Runtime Validation:**
The runtime will validate that the result set contains exactly one item. It returns an error if the result has more than one item, and evaluates to `null` if the pipeline has zero results.

Returns:
An :class:`Expression` representing the execution of this pipeline.
"""
return FunctionExpression("scalar", [_PipelineValueExpression(self)])

def _append(self, new_stage):
"""
Create a new Pipeline object with a new stage appended
Expand Down Expand Up @@ -391,9 +441,17 @@ def union(self, other: "_BasePipeline") -> "_BasePipeline":
Args:
other: The other `Pipeline` whose results will be unioned with this one.

Raises:
ValueError: If the `other` pipeline is a relative pipeline (e.g. created without a client).

Returns:
A new Pipeline object with this stage appended to the stage list
"""
if other._client is None:
raise ValueError(
"Union only supports combining root pipelines, doesn't support relative scope Pipeline "
"like relative subcollection pipeline"
)
return self._append(stages.Union(other))

def unnest(
Expand Down Expand Up @@ -610,3 +668,53 @@ def distinct(self, *fields: str | Selectable) -> "_BasePipeline":
A new Pipeline object with this stage appended to the stage list
"""
return self._append(stages.Distinct(*fields))

def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline":
"""
Binds one or more expressions to Variables that can be accessed in subsequent stages
or inner subqueries using `Variable`.

Each Variable is defined using an :class:`AliasedExpression`, which pairs an expression with
a name (alias).

Example:
>>> db.pipeline().collection("products").define(
... Field.of("price").multiply(0.9).as_("discountedPrice"),
... Field.of("stock").add(10).as_("newStock")
... ).where(
... Variable("discountedPrice").less_than(100)
... ).select(Field.of("name"), Variable("newStock"))

Args:
*aliased_expressions: One or more :class:`AliasedExpression` defining the Variable names and values.

Returns:
A new Pipeline object with this stage appended to the stage list.
"""
return self._append(stages.Define(*aliased_expressions))


class SubPipeline(_BasePipeline):
"""
A pipeline scoped to a subcollection, created without a database client.
Cannot be executed directly; it must be used as a subquery within another pipeline.
"""

_EXECUTE_ERROR_MSG = (
"This pipeline was created without a database (e.g., as a subcollection pipeline) and "
"cannot be executed directly. It can only be used as part of another pipeline."
)

def execute(self, *args, **kwargs):
"""
Raises:
RuntimeError: Always, as a subcollection pipeline cannot be executed directly.
"""
raise RuntimeError(self._EXECUTE_ERROR_MSG)

def stream(self, *args, **kwargs):
"""
Raises:
RuntimeError: Always, as a subcollection pipeline cannot be streamed directly.
"""
raise RuntimeError(self._EXECUTE_ERROR_MSG)
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Pipeline(_BasePipeline):
subject to potential breaking changes in future releases.
"""

_client: Client

def __init__(self, client: Client, *stages: stages.Stage):
"""
Initializes a Pipeline.
Expand Down Expand Up @@ -99,6 +101,9 @@ def execute(
explain_metrics will be available on the returned list.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)

Raises:
google.api_core.exceptions.GoogleAPIError: If there is a backend error.
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
stream = PipelineStream(PipelineResult, self, **kwargs)
Expand Down Expand Up @@ -131,6 +136,9 @@ def stream(
explain_metrics will be available on the returned generator.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)

Raises:
google.api_core.exceptions.GoogleAPIError: If there is a backend error.
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
return PipelineStream(PipelineResult, self, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Generic,
Sequence,
TypeVar,
)

if TYPE_CHECKING:
from google.cloud.firestore_v1.base_pipeline import _BasePipeline

from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value
from google.cloud.firestore_v1.types.document import Value
from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb
from google.cloud.firestore_v1.types.query import StructuredQuery as Query_pb
from google.cloud.firestore_v1.vector import Vector

Expand Down Expand Up @@ -258,6 +263,26 @@ def __get__(self, instance, owner):
else:
return self.instance_func.__get__(instance, owner)

@expose_as_static
def get_field(self, key: Expression | str) -> "Expression":
"""Accesses a field/property of the expression that evaluates to a Map or Document.

Example:
>>> # Access the 'city' field from the 'address' map field.
>>> Field.of("address").get_field("city")
>>> # Create a map and access a field from it.
>>> Map({"foo": "bar"}).get_field("foo")

Args:
key: The key of the field to access.

Returns:
A new `Expression` representing the value of the field.
"""
return FunctionExpression(
"get_field", [self, self._cast_to_expr_or_convert_to_constant(key)]
)

@expose_as_static
def add(self, other: Expression | float) -> "Expression":
"""Creates an expression that adds this expression to another expression or constant.
Expand Down Expand Up @@ -2709,6 +2734,17 @@ def _from_query_filter_pb(filter_pb, client):
raise TypeError(f"Unexpected filter type: {type(filter_pb)}")


class _PipelineValueExpression(Expression):
"""Internal wrapper to represent a pipeline as an expression."""

def __init__(self, pipeline: "_BasePipeline"):
self.pipeline = pipeline

def _to_pb(self) -> Value:
pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages])
return Value(pipeline_value=pipeline_pb)


class Array(FunctionExpression):
"""
Creates an expression that creates a Firestore array value from an input list.
Expand Down Expand Up @@ -2889,3 +2925,42 @@ class Rand(FunctionExpression):

def __init__(self):
super().__init__("rand", [], use_infix_repr=False)


class Variable(Expression):
"""
Creates an expression that retrieves the value of a variable bound via `Pipeline.define`.

Example:
>>> # Define a variable "discountedPrice" and use it in a filter
>>> db.pipeline().collection("products").define(
... Field.of("price").multiply(0.9).as_("discountedPrice")
... ).where(Variable("discountedPrice").less_than(100))

Args:
name: The name of the variable to retrieve.
"""

def __init__(self, name: str):
self.name = name

def _to_pb(self) -> Value:
return Value(variable_reference_value=self.name)


class CurrentDocument(FunctionExpression):
"""
Creates an expression that represents the current document being processed.

This acts as a handle, allowing you to bind the entire document to a variable or pass the
document itself to a function or subquery.

Example:
>>> # Define the current document as a variable "doc"
>>> db.pipeline().collection("books").define(
... CurrentDocument().as_("doc")
... ).select(Variable("doc").get_field("title"))
"""

def __init__(self):
super().__init__("current_document", [])
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
from google.cloud.firestore_v1.base_client import BaseClient
from google.cloud.firestore_v1.base_document import BaseDocumentReference
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.pipeline import Pipeline
from google.cloud.firestore_v1.pipeline_expressions import Constant
Expand Down Expand Up @@ -190,7 +189,7 @@ def __init__(
):
# public
self.transaction = transaction
self.pipeline: _BasePipeline = pipeline
self.pipeline: Pipeline | AsyncPipeline = pipeline
self.execution_time: Timestamp | None = None
# private
self._client: Client | AsyncClient = pipeline._client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
from google.cloud.firestore_v1.base_pipeline import _BasePipeline, SubPipeline

if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.firestore_v1.async_client import AsyncClient
Expand Down Expand Up @@ -168,6 +168,31 @@ def literals(
*documents: One or more documents to be returned by this stage. Each can be a `dict`
of values of `Expression` or `CONSTANT_TYPE` types.
Returns:
A new Pipeline object with this stage appended to the stage list.
A new pipeline instance targeting the specified literal documents
"""
return self._create_pipeline(stages.Literals(*documents))

def subcollection(self, path: str) -> SubPipeline:
"""
Initializes a pipeline scoped to a subcollection.

This method allows you to start a new pipeline that operates on a subcollection of the
current document. It is intended to be used as a subquery.

**Note:** A pipeline created with `subcollection` cannot be executed directly using
`execute()`. It must be used within a parent pipeline.

Example:
>>> db.pipeline().collection("books").add_fields(
... db.pipeline().subcollection("reviews")
... .aggregate(AggregateFunction.average("rating").as_("avg_rating"))
... .to_scalar_expression().as_("average_rating")
... )

Args:
path: The path of the subcollection.

Returns:
A new pipeline instance targeting the specified subcollection
"""
return SubPipeline._create_with_stages(None, stages.Subcollection(path))
Loading
Loading