Skip to content

Commit 8a7febc

Browse files
authored
feat(firestore): Implement Search pipeline stage and related expressions (#14383)
**Summary** This PR implements the `search` pipeline stage and its associated expressions in the Go Firestore SDK, providing parity with the Java (googleapis/java-firestore#2346) and Node.js (googleapis/google-cloud-node#7824) SDKs for full-text and geospatial search capabilities within Firestore Pipelines. **Key Changes** * **Search Pipeline Stage**: * Implemented the `Search` stage in `Pipeline` with a functional options pattern. * Introduced `WithSearchQuery`, `WithSearchSort`, `WithSearchAddFields`, and `WithSearchRetrievalDepth` options. * Ensured protobuf encoding places the `query` within the `Options` map (not `Args`) to align with backend requirements and the Java SDK implementation. * **Search Expressions**: * Implemented `DocumentMatches(query string)` and `Matches(field, query string)` for full-text search. * Implemented `GeoDistance(field, location)` for geospatial distance calculations. * Implemented `Score()` to retrieve the search topicality score. * Implemented `Snippet(field, query)` for highlighted search result snippets. * **Fluent API**: * Added `GeoDistance(location)`, `Matches(query)`, and `Snippet(query)` methods to the `Expression` interface to support fluent chaining (e.g., `FieldOf("location").GeoDistance(loc)`). * **Testing**: * Added comprehensive unit tests in `pipeline_test.go` and `pipeline_function_test.go`. * Verified backend request structure consistency via JSON dump comparisons. **Usage Example** ```go pipeline := client.Pipeline(). Collection("restaurants"). Search( WithSearchQuery(DocumentMatches("waffles OR pancakes")), WithSearchSort(Descending(Score())), WithSearchAddFields(Snippet("menu", "waffles").As("highlight")), ) ```
1 parent 7be0887 commit 8a7febc

9 files changed

Lines changed: 963 additions & 133 deletions

firestore/integration_test.go

Lines changed: 286 additions & 130 deletions
Large diffs are not rendered by default.

firestore/pipeline.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ func (RawOptions) isLiteralsOption() {}
184184

185185
func (RawOptions) isDefineOption() {}
186186

187+
func (RawOptions) isSearchOption() {}
188+
187189
func (r RawOptions) apply(eo *executeSettings) {
188190
if eo.RawOptions == nil {
189191
eo.RawOptions = make(map[string]any)
@@ -1125,6 +1127,120 @@ func (p *Pipeline) FindNearest(vectorField any, queryVector any, measure Pipelin
11251127
return p.append(stage)
11261128
}
11271129

1130+
// SearchOption is an option for a Search pipeline stage.
1131+
//
1132+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1133+
// regardless of any other documented package stability guarantees.
1134+
type SearchOption interface {
1135+
StageOption
1136+
isSearchOption()
1137+
}
1138+
1139+
type funcSearchOption struct {
1140+
f func(map[string]any)
1141+
}
1142+
1143+
func (fso *funcSearchOption) applyStage(so map[string]any) {
1144+
fso.f(so)
1145+
}
1146+
1147+
func (*funcSearchOption) isSearchOption() {}
1148+
1149+
func newFuncSearchOption(f func(map[string]any)) *funcSearchOption {
1150+
return &funcSearchOption{
1151+
f: f,
1152+
}
1153+
}
1154+
1155+
// WithSearchQuery specifies the search query that will be used to query and score documents by the search stage.
1156+
// It can be a string (automatically wrapped in DocumentMatches) or a BooleanExpression.
1157+
//
1158+
// Example:
1159+
//
1160+
// client.Pipeline().Collection("restaurants").Search(
1161+
// WithSearchQuery("waffles"),
1162+
// )
1163+
//
1164+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1165+
// regardless of any other documented package stability guarantees.
1166+
func WithSearchQuery(query any) SearchOption {
1167+
return newFuncSearchOption(func(so map[string]any) {
1168+
so["query"] = query
1169+
})
1170+
}
1171+
1172+
// WithSearchSort specifies how the returned documents are sorted. One or more ordering are required.
1173+
//
1174+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1175+
// regardless of any other documented package stability guarantees.
1176+
func WithSearchSort(orders ...Ordering) SearchOption {
1177+
return newFuncSearchOption(func(so map[string]any) {
1178+
t, ok := so["sort"].([]Ordering)
1179+
if !ok {
1180+
t = []Ordering{}
1181+
}
1182+
so["sort"] = append(t, orders...)
1183+
})
1184+
}
1185+
1186+
// WithSearchAddFields specifies the fields to add to each document.
1187+
//
1188+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1189+
// regardless of any other documented package stability guarantees.
1190+
func WithSearchAddFields(fields ...Selectable) SearchOption {
1191+
return newFuncSearchOption(func(so map[string]any) {
1192+
t, ok := so["add_fields"].([]Selectable)
1193+
if !ok {
1194+
t = []Selectable{}
1195+
}
1196+
so["add_fields"] = append(t, fields...)
1197+
})
1198+
}
1199+
1200+
// WithSearchRetrievalDepth specifies the maximum number of documents to retrieve. Documents will be retrieved in the
1201+
// pre-sort order specified by the search index.
1202+
//
1203+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1204+
// regardless of any other documented package stability guarantees.
1205+
func WithSearchRetrievalDepth(depth int64) SearchOption {
1206+
return newFuncSearchOption(func(so map[string]any) {
1207+
so["retrieval_depth"] = depth
1208+
})
1209+
}
1210+
1211+
// Search adds a search stage to the Pipeline.
1212+
// This must be the first stage of the pipeline.
1213+
// A limited set of expressions are supported in the search stage.
1214+
// Use [WithSearchQuery] to specify the search query.
1215+
//
1216+
// Example:
1217+
//
1218+
// client.Pipeline().Collection("restaurants").Search(
1219+
// WithSearchQuery(DocumentMatches("waffles OR pancakes")),
1220+
// WithSearchSort(Descending(Score())),
1221+
// WithSearchRetrievalDepth(10),
1222+
// )
1223+
//
1224+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1225+
// regardless of any other documented package stability guarantees.
1226+
func (p *Pipeline) Search(opts ...SearchOption) *Pipeline {
1227+
if p.err != nil {
1228+
return p
1229+
}
1230+
options := make(map[string]any)
1231+
for _, opt := range opts {
1232+
if opt != nil {
1233+
opt.applyStage(options)
1234+
}
1235+
}
1236+
stage, err := newSearchStage(options)
1237+
if err != nil {
1238+
p.err = err
1239+
return p
1240+
}
1241+
return p.append(stage)
1242+
}
1243+
11281244
// RawStage adds a generic stage to the pipeline.
11291245
// This method provides a flexible way to extend the pipeline's functionality by adding custom stages.
11301246
//

firestore/pipeline_expression.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package firestore
1616

1717
import (
1818
pb "cloud.google.com/go/firestore/apiv1/firestorepb"
19+
"google.golang.org/genproto/googleapis/type/latlng"
1920
)
2021

2122
// Selectable is an interface for expressions that can be selected in a pipeline.
@@ -508,6 +509,22 @@ type Expression interface {
508509
// Descending creates an ordering expression for descending order.
509510
Descending() Ordering
510511

512+
// GeoDistance creates an expression that evaluates to the distance in meters between the location in the expression and the query location.
513+
//
514+
// The parameter 'location' is the query location.
515+
//
516+
// Example:
517+
//
518+
// client.Pipeline().Collection("restaurants").
519+
// Search(
520+
// WithSearchQuery("waffles"),
521+
// WithSearchSort(Ascending(FieldOf("location").GeoDistance(&latlng.LatLng{Latitude: 37.0, Longitude: -122.0}))),
522+
// )
523+
//
524+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
525+
// regardless of any other documented package stability guarantees.
526+
GeoDistance(location *latlng.LatLng) Expression
527+
511528
// As assigns an alias to an expression.
512529
// Aliases are useful for renaming fields in the output of a stage.
513530
As(alias string) *AliasedExpression
@@ -760,6 +777,10 @@ func (b *baseExpression) VectorLength() Expression { return Vector
760777
func (b *baseExpression) Ascending() Ordering { return Ascending(b) }
761778
func (b *baseExpression) Descending() Ordering { return Descending(b) }
762779

780+
func (b *baseExpression) GeoDistance(location *latlng.LatLng) Expression {
781+
return GeoDistance(b, location)
782+
}
783+
763784
func (b *baseExpression) As(alias string) *AliasedExpression {
764785
return newAliasedExpr(b, alias)
765786
}

firestore/pipeline_function.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919

2020
pb "cloud.google.com/go/firestore/apiv1/firestorepb"
21+
"google.golang.org/genproto/googleapis/type/latlng"
2122
)
2223

2324
// FunctionExpression represents Firestore [Pipeline] functions, which can be evaluated within pipeline
@@ -1154,3 +1155,57 @@ func RegexFindAll(exprOrField any, pattern any) Expression {
11541155
func Rand() Expression {
11551156
return newBaseFunction("rand", nil)
11561157
}
1158+
1159+
// DocumentMatches creates a boolean expression that performs a full-text search on all indexed search fields in the document.
1160+
//
1161+
// This Expression can only be used within a Search stage.
1162+
//
1163+
// Example:
1164+
//
1165+
// client.Pipeline().Collection("restaurants").
1166+
// Search(WithSearchQuery(DocumentMatches("waffles OR pancakes")))
1167+
//
1168+
// - query: Define the search query using the search domain-specific language (DSL).
1169+
//
1170+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1171+
// regardless of any other documented package stability guarantees.
1172+
func DocumentMatches(query string) BooleanExpression {
1173+
return &baseBooleanExpression{baseFunction: newBaseFunction("document_matches", []Expression{ConstantOf(query)})}
1174+
}
1175+
1176+
// GeoDistance creates an expression that evaluates to the distance in meters between the location in the specified field and the query location.
1177+
//
1178+
// This Expression can only be used within a Search stage.
1179+
//
1180+
// Example:
1181+
//
1182+
// client.Pipeline().Collection("restaurants").
1183+
// Search(
1184+
// WithSearchQuery("waffles"),
1185+
// WithSearchSort(Ascending(GeoDistance("location", &latlng.LatLng{Latitude: 37.0, Longitude: -122.0}))),
1186+
// )
1187+
//
1188+
// - field: Specifies the field in the document which contains the GeoPoint for distance computation. It can be a field path string, [FieldPath] or [Expression].
1189+
// - location: Compute distance to this GeoPoint.
1190+
//
1191+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1192+
// regardless of any other documented package stability guarantees.
1193+
func GeoDistance(field any, location *latlng.LatLng) Expression {
1194+
return newBaseFunction("geo_distance", []Expression{asFieldExpr(field), ConstantOf(location)})
1195+
}
1196+
1197+
// Score creates an expression that evaluates to the search score that reflects the topicality of the document to all of the text
1198+
// predicates (for example: DocumentMatches) in the search query.
1199+
//
1200+
// This Expression can only be used within a Search stage.
1201+
//
1202+
// Example:
1203+
//
1204+
// client.Pipeline().Collection("restaurants").
1205+
// Search(WithSearchQuery("waffles"), WithSearchSort(Descending(Score())))
1206+
//
1207+
// Experimental: Firestore Pipelines is currently in preview and is subject to potential breaking changes in future versions,
1208+
// regardless of any other documented package stability guarantees.
1209+
func Score() Expression {
1210+
return newBaseFunction("score", nil)
1211+
}

firestore/pipeline_function_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
pb "cloud.google.com/go/firestore/apiv1/firestorepb"
2121
"cloud.google.com/go/internal/testutil"
22+
"google.golang.org/genproto/googleapis/type/latlng"
2223
)
2324

2425
func TestTruncFunctions(t *testing.T) {
@@ -1118,3 +1119,76 @@ func TestGetFieldVariations(t *testing.T) {
11181119
t.Fatal("expected expr6 not to be nil")
11191120
}
11201121
}
1122+
1123+
func TestSearchFunctions(t *testing.T) {
1124+
// 1. DocumentMatches
1125+
// 2. GeoDistance
1126+
// 3. Score
1127+
// 4. GeoDistance method
1128+
// 8. Matches method
1129+
testcases := []struct {
1130+
desc string
1131+
expr Expression
1132+
want *pb.Value
1133+
}{
1134+
{
1135+
desc: "DocumentMatches",
1136+
expr: DocumentMatches("waffles"),
1137+
want: &pb.Value{ValueType: &pb.Value_FunctionValue{
1138+
FunctionValue: &pb.Function{
1139+
Name: "document_matches",
1140+
Args: []*pb.Value{
1141+
{ValueType: &pb.Value_StringValue{StringValue: "waffles"}},
1142+
},
1143+
},
1144+
}},
1145+
},
1146+
{
1147+
desc: "GeoDistance",
1148+
expr: GeoDistance("location", &latlng.LatLng{Latitude: 37.0, Longitude: -122.0}),
1149+
want: &pb.Value{ValueType: &pb.Value_FunctionValue{
1150+
FunctionValue: &pb.Function{
1151+
Name: "geo_distance",
1152+
Args: []*pb.Value{
1153+
{ValueType: &pb.Value_FieldReferenceValue{FieldReferenceValue: "location"}},
1154+
{ValueType: &pb.Value_GeoPointValue{GeoPointValue: &latlng.LatLng{Latitude: 37.0, Longitude: -122.0}}},
1155+
},
1156+
},
1157+
}},
1158+
},
1159+
{
1160+
desc: "Score",
1161+
expr: Score(),
1162+
want: &pb.Value{ValueType: &pb.Value_FunctionValue{
1163+
FunctionValue: &pb.Function{
1164+
Name: "score",
1165+
},
1166+
}},
1167+
},
1168+
{
1169+
desc: "GeoDistance method",
1170+
expr: FieldOf("location").GeoDistance(&latlng.LatLng{Latitude: 37.0, Longitude: -122.0}),
1171+
want: &pb.Value{ValueType: &pb.Value_FunctionValue{
1172+
FunctionValue: &pb.Function{
1173+
Name: "geo_distance",
1174+
Args: []*pb.Value{
1175+
{ValueType: &pb.Value_FieldReferenceValue{FieldReferenceValue: "location"}},
1176+
{ValueType: &pb.Value_GeoPointValue{GeoPointValue: &latlng.LatLng{Latitude: 37.0, Longitude: -122.0}}},
1177+
},
1178+
},
1179+
}},
1180+
},
1181+
}
1182+
1183+
for _, tc := range testcases {
1184+
t.Run(tc.desc, func(t *testing.T) {
1185+
got, err := tc.expr.toProto()
1186+
if err != nil {
1187+
t.Fatalf("toProto() failed: %v", err)
1188+
}
1189+
if diff := testutil.Diff(got, tc.want); diff != "" {
1190+
t.Errorf("toProto() returned diff (-got +want): %s", diff)
1191+
}
1192+
})
1193+
}
1194+
}

firestore/pipeline_integration_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,6 @@ func TestIntegration_PipelineStages(t *testing.T) {
856856
}
857857
})
858858
t.Run("Update", func(t *testing.T) {
859-
t.Skip("Skipping test until feature is available in PROD")
860859
updateIter := client.Pipeline().Collection(coll.ID).
861860
Where(Equal(FieldOf("author.country"), "UK")).
862861
Update(WithUpdateTransformations(ConstantOf("Active").As("status"))).
@@ -878,7 +877,6 @@ func TestIntegration_PipelineStages(t *testing.T) {
878877
}
879878
})
880879
t.Run("Delete", func(t *testing.T) {
881-
t.Skip("Skipping test until feature is available in PROD")
882880
deleteIter := client.Pipeline().Collection(coll.ID).Where(Equal(FieldOf("title"), "The Great Gatsby")).Delete().Execute(ctx).Results()
883881
defer deleteIter.Stop()
884882
_, err := deleteIter.GetAll()
@@ -903,7 +901,6 @@ func TestIntegration_PipelineFunctions(t *testing.T) {
903901
t.Run("arrayFuncs", arrayFuncs)
904902
t.Run("stringFuncs", stringFuncs)
905903
t.Run("vectorFuncs", vectorFuncs)
906-
907904
t.Run("timestampFuncs", timestampFuncs)
908905
t.Run("arithmeticFuncs", arithmeticFuncs)
909906
t.Run("aggregateFuncs", aggregateFuncs)
@@ -945,6 +942,8 @@ func aggregationFuncs(t *testing.T) {
945942
Aggregate(Accumulators(
946943
First("val").As("first_val"),
947944
Last("val").As("last_val"),
945+
Maximum("val").As("max_val"),
946+
Minimum("val").As("min_val"),
948947
ArrayAgg("val").As("all_vals"),
949948
ArrayAggDistinct("val").As("distinct_vals"),
950949
CountDistinct("val").As("distinct_count_val"),
@@ -961,6 +960,13 @@ func aggregationFuncs(t *testing.T) {
961960

962961
data := res.Data()
963962

963+
if data["max_val"] != int64(2) {
964+
t.Errorf("got max_val %v, want 2", data["max_val"])
965+
}
966+
if data["min_val"] != int64(1) {
967+
t.Errorf("got min_val %v, want 1", data["min_val"])
968+
}
969+
964970
// Check ArrayAgg "all_vals" -> [1, 2, 1] (order irrelevant)
965971
allValsRaw, ok := data["all_vals"].([]interface{})
966972
if !ok {
@@ -1326,6 +1332,11 @@ func objectFuncs(t *testing.T) {
13261332
pipeline: client.Pipeline().Collection(coll.ID).Select(Fields(MapGet("m1", "a").As("value"))),
13271333
want: map[string]interface{}{"value": int64(1)},
13281334
},
1335+
{
1336+
name: "GetField",
1337+
pipeline: client.Pipeline().Collection(coll.ID).Select(Fields(GetField("m1", "a").As("value"))),
1338+
want: map[string]interface{}{"value": int64(1)},
1339+
},
13291340
{
13301341
name: "MapMerge",
13311342
pipeline: client.Pipeline().Collection(coll.ID).Select(Fields(MapMerge("m1", FieldOf("m2")).As("merged"))),
@@ -2427,6 +2438,13 @@ func comparisonFuncs(t *testing.T) {
24272438
Where(NotEqual("a", 2)),
24282439
want: []map[string]interface{}{doc1want},
24292440
},
2441+
{
2442+
name: "GreaterThan",
2443+
pipeline: client.Pipeline().
2444+
Collection(coll.ID).
2445+
Where(GreaterThan("a", 1)),
2446+
want: []map[string]interface{}{{"a": int64(2), "b": int64(2), "c": int64(-3), "d": float64(4.5), "e": float64(-5.5), "timestamp": now.Truncate(time.Microsecond)}},
2447+
},
24302448
{
24312449
name: "LessThan",
24322450
pipeline: client.Pipeline().

0 commit comments

Comments
 (0)