From 0072a0437ed968cc4121c9d98a995f795a5e6db5 Mon Sep 17 00:00:00 2001 From: ellisandrews-toast Date: Wed, 10 Jun 2026 17:56:14 -0400 Subject: [PATCH 1/4] Resolve nested sourced_from fields into update targets on the root indexed type --- .../schema_definition/indexing/index.rb | 7 +- .../indexing/nested_update_target_resolver.rb | 197 +++++++++++ .../indexing/relationship_chain_resolver.rb | 13 + .../indexing/sourced_field_params_resolver.rb | 66 ++++ .../sourced_from_update_targets_resolver.rb | 104 +++--- .../indexing/update_target_resolver.rb | 49 +-- .../schema_definition/indexing/index.rbs | 2 +- .../nested_update_target_resolver.rbs | 58 ++++ .../indexing/relationship_chain_resolver.rbs | 2 + .../sourced_field_params_resolver.rbs | 19 ++ .../sourced_from_update_targets_resolver.rbs | 12 +- .../indexing/update_target_resolver.rbs | 16 +- .../index_definitions_by_name_spec.rb | 2 +- .../update_targets_spec.rb | 321 +++++++++++++----- 14 files changed, 693 insertions(+), 175 deletions(-) create mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb create mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb create mode 100644 elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs create mode 100644 elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb index 3618ba386..061f6e5a1 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb @@ -283,10 +283,11 @@ def runtime_metadata ) end - # Registers a resolved `parent_relationship` chain on this index. + # Records the path segments that navigate from this index's documents down to a nested element that + # receives `sourced_from` data, keyed by the qualified relationship backing those fields. # @api private - def register_resolved_relationship_chain(resolved_chain) - sourced_from_nested_paths_by_qualified_relationship[resolved_chain.qualified_relationship] = resolved_chain.sourced_from_nested_paths + def register_sourced_from_nested_paths(qualified_relationship, nested_paths) + sourced_from_nested_paths_by_qualified_relationship[qualified_relationship] = nested_paths end private diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb new file mode 100644 index 000000000..7dac9c380 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb @@ -0,0 +1,197 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params" +require "elastic_graph/schema_definition/indexing/sourced_field_params_resolver" +require "elastic_graph/schema_definition/indexing/update_target_factory" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Responsible for resolving a nested `parent_relationship` chain and a set of `sourced_from` + # fields into an `UpdateTarget` that instructs the indexer to update a nested element within + # the root indexed type when a source event arrives. This is the nested analog of + # `UpdateTargetResolver`, which handles the top-level (non-nested) `sourced_from` case. + # + # @private + class NestedUpdateTargetResolver + include SourcedFieldParamsResolver + + def initialize( + object_type:, + sourced_fields:, + resolved_chain:, + field_path_resolver:, + schema_def_state: + ) + @object_type = object_type + @sourced_fields = sourced_fields + @resolved_chain = resolved_chain + @field_path_resolver = field_path_resolver + @schema_def_state = schema_def_state + end + + # Resolves the chain and `sourced_fields` into an `UpdateTarget` on the root indexed type, + # validating everything along the way. + # + # Returns a tuple of the `update_target` (if valid) and a list of errors. + def resolve + relationship_errors = validate_relationship + field_params, field_params_errors = resolve_sourced_field_params + routing_value_source, routing_error = resolve_field_source(RoutingSourceAdapter) + rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(RolloverTimestampSourceAdapter) + has_had_multiple_sources_errors = validate_has_had_multiple_sources + + all_errors = relationship_errors + field_params_errors + has_had_multiple_sources_errors + + [routing_error, rollover_timestamp_error].compact + + if all_errors.empty? + update_target = UpdateTargetFactory.new_normal_indexing_update_target( + type: root_type.name, + relationship: resolved_chain.qualified_relationship, + id_source: root_relationship.foreign_key, + top_level_fields_params: {}, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams.new( + field_params: field_params, + path_identifier_params: build_path_identifier_params + ), + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source + ) + end + + [update_target, all_errors] + end + + private + + # @dynamic object_type, sourced_fields, resolved_chain, field_path_resolver, schema_def_state + attr_reader :object_type, :sourced_fields, :resolved_chain, :field_path_resolver, :schema_def_state + + # The leaf relationship the chain was resolved from — the one backing this type's `sourced_from` fields. + def relationship + resolved_chain.leaf_relationship + end + + def root_relationship + resolved_chain.root_relationship + end + + def root_type + root_relationship.parent_type + end + + def root_index + resolved_chain.root_index + end + + def related_type + @related_type ||= schema_def_state.object_types_by_name.fetch(relationship.related_type.unwrap_non_null.name) + end + + # Applies validations specific to relationships backing nested `sourced_from` fields. + def validate_relationship + errors = [] # : ::Array[::String] + + if relationship.many? + errors << "`#{object_type.name}.#{relationship.name}` is a `relates_to_many` relationship, but nested " \ + "`sourced_from` is only supported on a `relates_to_one` relationship." + end + + errors + end + + # Builds the params identifying which nested element to update: one entry per list segment in the + # chain, pulling the matching value from the segment's foreign key on the source event. Object + # segments have no ambiguity, so they contribute no identifier. + def build_path_identifier_params + resolved_chain.path_segments.filter_map do |segment| + source_field = segment.source_field_name + next unless source_field + + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: source_field, + cardinality: :one + ) + + [source_field, param] + end.to_h + end + + # Resolves `routing_value_source` and `rollover_timestamp_value_source` against the root + # relationship and root index, using an `adapter` for the differences between the two cases. + # + # Returns a tuple of the resolved source (if successful) and an error (if invalid). + def resolve_field_source(adapter) + field_source_graphql_path_string = adapter.get_field_source(root_relationship, root_index) do |local_need| + # The update is triggered by the leaf relationship's source events (`relationship`), but routing and + # rollover are resolved through — and `equivalent_field` is configured on — the root relationship. + error = "Cannot update `#{root_type.name}` documents with nested data from related `#{relationship.name}` " \ + "events, because #{adapter.cannot_update_reason(root_type, root_relationship.name)}. To fix it, add a call " \ + "like this to the `#{root_type.name}.#{root_relationship.name}` relationship definition: `rel.equivalent_field " \ + "\"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." + + return [nil, error] + end + + if field_source_graphql_path_string + field_path = field_path_resolver.resolve_public_path(related_type, field_source_graphql_path_string) do |parent_field| + !parent_field.type.list? + end + + [field_path&.path_in_index, nil] + else + [nil, nil] + end + end + + # Validates that `has_had_multiple_sources!` has been configured on the root index, since nested + # `sourced_from` makes the root index multi-sourced. + def validate_has_had_multiple_sources + return [] if root_index.has_had_multiple_sources_flag + + ["Type `#{root_type.name}` has nested `sourced_from` fields (via `#{object_type.name}.#{relationship.name}`) but " \ + "its index `#{root_index.name}` has not been configured with `has_had_multiple_sources!`. To resolve this, add " \ + "`i.has_had_multiple_sources!` within the `t.index \"#{root_index.name}\"` block. This flag is required because " \ + "indices with multiple sources can contain incomplete documents, and ElasticGraph needs to know this to apply " \ + "proper filtering. Once set, this flag should remain even if you later remove all `sourced_from` fields, as the " \ + "index may still contain historical incomplete documents."] + end + + # Adapter for the `routing_value_source` case for use by `resolve_field_source`. + # + # @private + module RoutingSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.routing_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(root_type, relationship_name) + "`#{root_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ + "to route the `#{root_type.name}` update requests" + end + end + + # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. + # + # @private + module RolloverTimestampSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.rollover_timestamp_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(root_type, relationship_name) + "`#{root_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ + "to select an index for the `#{root_type.name}` update requests" + end + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb index 1c7c6f772..623adba22 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb @@ -20,6 +20,19 @@ class ResolvedRelationshipChain < ::Data.define( :leaf_relationship, # Relationship the chain was resolved from — backs `sourced_from` field(s) :path_segments # Array - the embedding fields to descend, ordered root-to-leaf ) + # The index the chain terminates at — where the root indexed type's documents (and their nested + # elements) live, and where the chain's navigation path is registered. The chain always terminates at + # an indexed type (enforced when it is resolved), so this is never `nil`. + def root_index + root_relationship.parent_type.index_def # : Index + end + + # Records this chain's navigation path on its root index, so the painless script can locate the + # nested element to update at index time. + def register_on_root_index + root_index.register_sourced_from_nested_paths(qualified_relationship, sourced_from_nested_paths) + end + # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) def qualified_relationship (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb new file mode 100644 index 000000000..2f806f0e1 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb @@ -0,0 +1,66 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Shared logic for resolving a set of `sourced_from` fields into the params map that pulls each field's + # value from its source path on the related type. Used by both `UpdateTargetResolver` (top-level + # `sourced_from`) and `NestedUpdateTargetResolver` (nested `sourced_from`), which resolve the same way + # but build different update targets around the result. + # + # Hosts must provide `object_type`, `related_type`, `sourced_fields`, and `field_path_resolver`. + # + # @private + module SourcedFieldParamsResolver + # Resolves `sourced_fields` into a `[field_name_in_index => DynamicParam]` map, validating each field + # against its source. Returns a tuple of the params map and a list of any errors. + def resolve_sourced_field_params + errors = [] # : ::Array[::String] + + field_params = sourced_fields.filter_map do |field| + field_source = field.source # : SchemaElements::FieldSource + + referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| + !parent_field.type.list? + end + + if referenced_field_path.nil? + explanation = + if field_source.field_path.include?(".") + "could not be resolved: some parts do not exist on their respective types as non-list fields" + else + "does not exist as an indexing field" + end + + errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." + nil + elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null + errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of its source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." + nil + elsif field.type.non_null? + errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." + nil + else + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: referenced_field_path.path_in_index, + cardinality: :one + ) + + [field.name_in_index, param] + end + end.to_h + + [field_params, errors] + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb index 3e1769e46..c1a0e1110 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb @@ -7,6 +7,7 @@ # frozen_string_literal: true require "elastic_graph/errors" +require "elastic_graph/schema_definition/indexing/nested_update_target_resolver" require "elastic_graph/schema_definition/indexing/relationship_chain_resolver" require "elastic_graph/schema_definition/indexing/relationship_resolver" require "elastic_graph/schema_definition/indexing/update_target_resolver" @@ -46,12 +47,23 @@ def resolve private def resolve_for_type(object_type, &error_reporter) - fields_with_sources_by_relationship_name = sourced_fields_by_relationship_name(object_type) + resolve_top_level_update_targets(object_type, &error_reporter) + + resolve_nested_update_targets(object_type, &error_reporter) + end + + # Resolves the update targets for this type's own `sourced_from` fields (the non-nested case): one per + # relationship that backs `sourced_from` fields, each keyed by the related type that publishes the events. + def resolve_top_level_update_targets(object_type, &error_reporter) + # Skip unindexed types: they produce no top-level targets, and resolving `fields_with_sources` on an + # unindexed apollo `_Entity` union can raise when entity types share a field name across mapping types. + empty_fields_by_relationship = {} # : ::Hash[::String, ::Array[SchemaElements::Field]] + sourced_fields_by_relationship_name = + object_type.own_index_def ? group_sourced_fields_by_relationship_name(object_type) : empty_fields_by_relationship defined_relationships = object_type.relationships_by_name.keys - results = (defined_relationships | fields_with_sources_by_relationship_name.keys).filter_map do |relationship_name| + (defined_relationships | sourced_fields_by_relationship_name.keys).filter_map do |relationship_name| empty_fields = [] # : ::Array[SchemaElements::Field] - sourced_fields = fields_with_sources_by_relationship_name.fetch(relationship_name) { empty_fields } + sourced_fields = sourced_fields_by_relationship_name.fetch(relationship_name) { empty_fields } relationship_resolver = RelationshipResolver.new( schema_def_state: @schema_def_state, object_type: object_type, @@ -63,18 +75,22 @@ def resolve_for_type(object_type, &error_reporter) yield :relationship, relationship_error if relationship_error if object_type.own_index_def && resolved_relationship && sourced_fields.any? - resolve_update_target(object_type, resolved_relationship, sourced_fields, &error_reporter) + resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields, &error_reporter) end end + end - # Resolve any `parent_relationship` chains on this type, surfacing configuration errors and - # registering the resolved path segments on the root index. - resolve_relationship_chains(object_type, &error_reporter) - - results + # Resolves the update targets for this type's nested `sourced_from` fields: each `parent_relationship` + # chain that backs `sourced_from` fields registers its navigation path on the root index and produces a + # nested update target on the root indexed type. + def resolve_nested_update_targets(object_type, &error_reporter) + resolve_sourced_fields_and_chains(object_type, &error_reporter).filter_map do |sourced_fields, resolved_chain| + resolved_chain.register_on_root_index + resolve_nested_update_target(object_type, sourced_fields, resolved_chain, &error_reporter) + end end - def resolve_update_target(object_type, resolved_relationship, sourced_fields) + def resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields) update_target_resolver = UpdateTargetResolver.new( object_type: object_type, resolved_relationship: resolved_relationship, @@ -98,46 +114,52 @@ def resolve_update_target(object_type, resolved_relationship, sourced_fields) [resolved_relationship.related_type.name, update_target] if update_target end - def resolve_relationship_chains(object_type) - relationships_with_parent_ref = object_type.relationships_by_name.each_value.select(&:parent_ref) - return if relationships_with_parent_ref.empty? + def group_sourced_fields_by_relationship_name(object_type) + object_type.fields_with_sources.group_by { |f| (_ = f.source).relationship_name } + end - # The set of relationship names this type's `sourced_from` fields draw their data through. We use - # `fields_with_sources` (rather than `sourced_fields_by_relationship_name`) because it works for - # non-indexed (embedded) types — which is exactly where nested `sourced_from` fields live. - relationship_names_with_sourced_fields = object_type.fields_with_sources.map { |f| (_ = f.source).relationship_name }.to_set + # Resolves every `parent_relationship` chain on this type, surfacing configuration errors. Returns + # `[sourced_fields, resolved_chain]` for each chain that resolved cleanly *and* backs `sourced_from` + # fields — the pairings that produce nested update targets. + def resolve_sourced_fields_and_chains(object_type) + relationships_with_parent_ref = object_type.relationships_by_name.each_value.select(&:parent_ref) + empty_results = [] # : ::Array[[::Array[SchemaElements::Field], ResolvedRelationshipChain]] + return empty_results if relationships_with_parent_ref.empty? - chain_resolver = RelationshipChainResolver.new(schema_def_state: @schema_def_state) + sourced_fields_by_relationship_name = group_sourced_fields_by_relationship_name(object_type) - relationships_with_parent_ref.each do |relationship| - # Every `parent_relationship` chain is resolved so its configuration errors are surfaced, even for - # relationships that don't themselves back any `sourced_from` field. - resolved_chain, chain_errors = chain_resolver.resolve(relationship) + relationships_with_parent_ref.filter_map do |relationship| + # Resolve every chain (even those backing no `sourced_from` field) so configuration errors surface. + resolved_chain, chain_errors = relationship_chain_resolver.resolve(relationship) chain_errors.each { |error| yield :sourced_field, error } next unless resolved_chain - # Only register paths for relationships that back `sourced_from` fields. A pure link in a longer - # chain has no nested data of its own; its navigation lives in the leaf relationship's chain. - next unless relationship_names_with_sourced_fields.include?(relationship.name) - - # Register the resolved chain on its root index, which records the path segments keyed by the - # chain's qualified relationship. - root_index = resolved_chain.root_relationship.parent_type.index_def # : Index - root_index.register_resolved_relationship_chain(resolved_chain) + # Only chains backing `sourced_from` fields produce a target; pure links carry no nested data. + sourced_fields = sourced_fields_by_relationship_name[relationship.name] + [sourced_fields, resolved_chain] if sourced_fields end end - def sourced_fields_by_relationship_name(object_type) - if object_type.own_index_def.nil? - # For now, only indexed types can have `sourced_from` fields, and resolving `fields_with_sources` on an unindexed union type - # such as `_Entity` when we are using apollo can lead to exceptions when multiple entity types have the same field name - # that use different mapping types. - {} # : ::Hash[::String, ::Array[SchemaElements::Field]] - else - object_type - .fields_with_sources - .group_by { |f| (_ = f.source).relationship_name } - end + # A single resolver shared across all object types so its per-parent-type field cache survives the + # whole resolve pass — parent types recur across chains from different leaf types. + def relationship_chain_resolver + @relationship_chain_resolver ||= RelationshipChainResolver.new(schema_def_state: @schema_def_state) + end + + def resolve_nested_update_target(object_type, sourced_fields, resolved_chain) + nested_update_target_resolver = NestedUpdateTargetResolver.new( + object_type: object_type, + sourced_fields: sourced_fields, + resolved_chain: resolved_chain, + field_path_resolver: @schema_def_state.field_path_resolver, + schema_def_state: @schema_def_state + ) + + update_target, errors = nested_update_target_resolver.resolve + errors.each { |error| yield :sourced_field, error } + + # The update target lives on the source type — its events drive updates to the nested element. + [resolved_chain.leaf_relationship.related_type.name, update_target] if update_target end def raise_if_errors(sourced_field_errors, relationship_errors) diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb index 3f456d330..56c36654d 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb @@ -7,6 +7,7 @@ # frozen_string_literal: true require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_definition/indexing/sourced_field_params_resolver" require "elastic_graph/schema_definition/indexing/update_target_factory" module ElasticGraph @@ -18,6 +19,8 @@ module Indexing # # @private class UpdateTargetResolver + include SourcedFieldParamsResolver + def initialize( object_type:, resolved_relationship:, @@ -36,7 +39,7 @@ def initialize( # Returns a tuple of the `update_target` (if valid), and a list of errors. def resolve relationship_errors = validate_relationship - top_level_fields_params, top_level_fields_params_errors = resolve_top_level_fields_params + top_level_fields_params, top_level_fields_params_errors = resolve_sourced_field_params routing_value_source, routing_error = resolve_field_source(RoutingSourceAdapter) rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(RolloverTimestampSourceAdapter) equivalent_field_errors = resolved_relationship.relationship.validate_equivalent_fields(field_path_resolver) @@ -90,47 +93,9 @@ def relationship_error_prefix "`#{object_type.name}.#{resolved_relationship.relationship_name}` #{sourced_fields_description}" end - # Resolves the `sourced_fields` into a top-level fields params map, validating them along the way. - # - # Returns a tuple of the top-level fields params and a list of any errors that occurred during resolution. - def resolve_top_level_fields_params - related_type = resolved_relationship.related_type - errors = [] # : ::Array[::String] - - top_level_fields_params = sourced_fields.filter_map do |field| - field_source = field.source # : SchemaElements::FieldSource - - referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| - !parent_field.type.list? - end - - if referenced_field_path.nil? - explanation = - if field_source.field_path.include?(".") - "could not be resolved: some parts do not exist on their respective types as non-list fields" - else - "does not exist as an indexing field" - end - - errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." - nil - elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null - errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of it's source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." - nil - elsif field.type.non_null? - errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." - nil - else - param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( - source_path: referenced_field_path.path_in_index, - cardinality: :one - ) - - [field.name_in_index, param] - end - end.to_h - - [top_level_fields_params, errors] + # The related type whose source events feed this update target — where `sourced_from` fields are resolved. + def related_type + resolved_relationship.related_type end # Helper method that assists with resolving `routing_value_source` and `rollover_timestamp_value_source`. diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs index 39ae5957c..1f3eef1da 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs @@ -14,7 +14,7 @@ module ElasticGraph def to_index_config: () -> ::Hash[::String, untyped] def to_index_template_config: () -> ::Hash[::String, untyped] def runtime_metadata: () -> SchemaArtifacts::RuntimeMetadata::IndexDefinition - def register_resolved_relationship_chain: (ResolvedRelationshipChain) -> void + def register_sourced_from_nested_paths: (::String, ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment]) -> void private diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs new file mode 100644 index 000000000..a3fe78135 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs @@ -0,0 +1,58 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class NestedUpdateTargetResolver + include SourcedFieldParamsResolver + + def initialize: ( + object_type: indexableType, + sourced_fields: ::Array[SchemaElements::Field], + resolved_chain: ResolvedRelationshipChain, + field_path_resolver: SchemaElements::FieldPath::Resolver, + schema_def_state: State, + ) -> void + + def resolve: () -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] + + # Public to satisfy the `_SourcedFieldParamsHost` interface required by `SourcedFieldParamsResolver`. + attr_reader object_type: indexableType + attr_reader sourced_fields: ::Array[SchemaElements::Field] + attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver + + @related_type: indexableType + def related_type: () -> indexableType + + private + + attr_reader resolved_chain: ResolvedRelationshipChain + attr_reader schema_def_state: State + + def relationship: () -> SchemaElements::Relationship + def root_relationship: () -> SchemaElements::Relationship + def root_type: () -> indexableType + def root_index: () -> Index + + def validate_relationship: () -> ::Array[::String] + def build_path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + def resolve_field_source: (_FieldSourceAdapter) -> [::String?, ::String?] + def validate_has_had_multiple_sources: () -> ::Array[::String] + + interface _FieldSourceAdapter + def get_field_source: (SchemaElements::Relationship, Index) { + (::String) -> bot + } -> ::String? + + def cannot_update_reason: (indexableType, ::String) -> ::String + end + + module RoutingSourceAdapter + extend _FieldSourceAdapter + end + + module RolloverTimestampSourceAdapter + extend _FieldSourceAdapter + end + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs index 75b80ece0..67a4e3371 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs @@ -14,6 +14,8 @@ module ElasticGraph end class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType + def root_index: () -> Index + def register_on_root_index: () -> void def qualified_relationship: () -> ::String def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs new file mode 100644 index 000000000..7e826896e --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs @@ -0,0 +1,19 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + interface _SourcedFieldParamsHost + def object_type: () -> indexableType + def related_type: () -> indexableType + def sourced_fields: () -> ::Array[SchemaElements::Field] + def field_path_resolver: () -> SchemaElements::FieldPath::Resolver + end + + module SourcedFieldParamsResolver : _SourcedFieldParamsHost + def resolve_sourced_field_params: () -> [ + ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], + ::Array[::String] + ] + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs index c9c2ca72b..4b5bdca9e 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs @@ -9,10 +9,16 @@ module ElasticGraph private + @relationship_chain_resolver: RelationshipChainResolver + def resolve_for_type: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] - def resolve_update_target: (indexableType, ResolvedRelationship, ::Array[SchemaElements::Field]) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? - def resolve_relationship_chains: (indexableType) { (::Symbol, ::String) -> void } -> void - def sourced_fields_by_relationship_name: (indexableType) -> ::Hash[::String, ::Array[SchemaElements::Field]] + def resolve_top_level_update_targets: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + def resolve_nested_update_targets: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + def resolve_top_level_update_target: (indexableType, ResolvedRelationship, ::Array[SchemaElements::Field]) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? + def group_sourced_fields_by_relationship_name: (indexableType) -> ::Hash[::String, ::Array[SchemaElements::Field]] + def resolve_sourced_fields_and_chains: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::Array[SchemaElements::Field], ResolvedRelationshipChain]] + def relationship_chain_resolver: () -> RelationshipChainResolver + def resolve_nested_update_target: (indexableType, ::Array[SchemaElements::Field], ResolvedRelationshipChain) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? def raise_if_errors: (::Array[::String], ::Array[::String]) -> void end end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs index 668331f7c..2b456ac3c 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs @@ -2,6 +2,8 @@ module ElasticGraph module SchemaDefinition module Indexing class UpdateTargetResolver + include SourcedFieldParamsResolver + def initialize: ( object_type: indexableType, resolved_relationship: ResolvedRelationship, @@ -11,21 +13,19 @@ module ElasticGraph def resolve: () -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] - private - + # Public to satisfy the `_SourcedFieldParamsHost` interface required by `SourcedFieldParamsResolver`. attr_reader object_type: indexableType - attr_reader resolved_relationship: ResolvedRelationship attr_reader sourced_fields: ::Array[SchemaElements::Field] attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver + def related_type: () -> indexableType + + private + + attr_reader resolved_relationship: ResolvedRelationship def validate_relationship: () -> ::Array[::String] def relationship_error_prefix: () -> ::String - def resolve_top_level_fields_params: () -> [ - ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], - ::Array[::String] - ] - def resolve_field_source: (_FieldSourceAdapter) -> [::String?, ::String?] interface _FieldSourceAdapter diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb index fa39065f9..ee9c300c5 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb @@ -982,7 +982,7 @@ def build_count_paths_from_mapping(mapping) f.mapping type: "object" end t.relates_to_many "statLines", "StatLine", via: "leagueId", dir: :in, indexing_only: true - t.index "leagues" + t.index("leagues") { |i| i.has_had_multiple_sources! } end s.object_type "Team" do |t| diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb index 7cf9c3a43..eb5a04ead 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb @@ -1196,8 +1196,8 @@ def expect_widget_update_target_with( end end }.to raise_error a_string_including( - "1. The type of `Widget.workspace_name` is `DateTime`, but the type of it's source (`WidgetWorkspace.name`) is `String`. These must agree to use `sourced_from`.", - "2. The type of `Widget.workspace_created_at` is `String`, but the type of it's source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." + "1. The type of `Widget.workspace_name` is `DateTime`, but the type of its source (`WidgetWorkspace.name`) is `String`. These must agree to use `sourced_from`.", + "2. The type of `Widget.workspace_created_at` is `String`, but the type of its source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." ) end @@ -1255,8 +1255,8 @@ def expect_widget_update_target_with( end end }.to raise_error a_string_including( - "1. The type of `Widget.workspace_name` is `String`, but the type of it's source (`WidgetWorkspace.name`) is `[String]`. These must agree to use `sourced_from`.", - "2. The type of `Widget.workspace_created_at` is `[DateTime]`, but the type of it's source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." + "1. The type of `Widget.workspace_name` is `String`, but the type of its source (`WidgetWorkspace.name`) is `[String]`. These must agree to use `sourced_from`.", + "2. The type of `Widget.workspace_created_at` is `[DateTime]`, but the type of its source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." ) end @@ -1298,35 +1298,178 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end end - describe "`parent_relationship` validations" do - it "does not raise an error for a valid `parent_relationship` configuration" do - expect { nested_sourced_from_schema }.not_to raise_error + describe "nested `sourced_from` update targets" do + it "dumps a nested update target on the source type, keyed by the qualified relationship" do + targets = nested_update_targets_by_relationship(nested_sourced_from_schema(fetch: "StatLine")) + + expect(targets.keys).to contain_exactly("players.statLine") + target = targets.fetch("players.statLine") + expect(target.type).to eq "Team" + expect(target.relationship).to eq "players.statLine" + expect(target.script_id).to eq(INDEX_DATA_UPDATE_SCRIPT_ID) + expect(target.id_source).to eq "teamId" + expect(target.routing_value_source).to eq(nil) + expect(target.rollover_timestamp_value_source).to eq(nil) + expect(target.top_level_fields_params).to eq({}) + expect(target.sourced_from_nested_params.field_params).to eq( + "goals" => dynamic_param_with(source_path: "goals", cardinality: :one) + ) + expect(target.sourced_from_nested_params.path_identifier_params).to eq( + "playerId" => dynamic_param_with(source_path: "playerId", cardinality: :one) + ) end - it "works with a non-list (object) embedding field" do - expect { - nested_sourced_from_schema(players_field: "Player!") - }.not_to raise_error + it "omits `path_identifier_params` for a non-list (object) embedding field, since there is no element to match" do + metadata = nested_sourced_from_schema(fetch: "StatLine", players_field: "Player!") + target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") + + expect(target.sourced_from_nested_params.path_identifier_params).to eq({}) end - it "accepts an explicit `parent_field_name:` to identify the embedding field" do - expect { - nested_sourced_from_schema( - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "players" } + context "on a root type that uses custom routing" do + it "determines the `routing_value_source` from an `equivalent_field` configured on the root relationship" do + metadata = nested_sourced_from_schema( + fetch: "StatLine", + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" }, + on_statlines_relationship: ->(r) { + # A nested routing source forces the field-path resolver to descend through the parent field. + r.equivalent_field "stats.owner_id", locally_named: "team_owner_id" + }, + on_statline: ->(t) { t.field "stats", "StatLineStats" } + ) do |s| + s.object_type "StatLineStats" do |t| + t.field "owner_id", "ID" + end + end + + target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") + expect(target.routing_value_source).to eq "stats.owner_id" + end + + it "leaves `routing_value_source` nil when the equivalent field is a `graphql_only` field with no indexing path" do + metadata = nested_sourced_from_schema( + fetch: "StatLine", + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" }, + on_statlines_relationship: ->(r) { r.equivalent_field "owner_id", locally_named: "team_owner_id" }, + on_statline: ->(t) { t.field "owner_id", "ID", graphql_only: true } + ) + + target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") + expect(target.routing_value_source).to eq(nil) + end + + it "raises a clear error when no `equivalent_field` is configured for the custom routing field" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "Cannot update `Team` documents with nested data from related `statLine` events", + "`Team` uses custom shard routing but we don't know what `statLines` field to use to route the `Team` update requests", + "add a call like this to the `Team.statLines` relationship definition", + '`rel.equivalent_field "[StatLine field]", locally_named: "team_owner_id"`' + ) + end + end + + context "on a root type that uses a rollover index" do + it "determines the `rollover_timestamp_value_source` from an `equivalent_field` configured on the root relationship" do + metadata = nested_sourced_from_schema( + fetch: "StatLine", + on_team: ->(t) { t.field "team_created_at", "DateTime" }, + on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" }, + on_statlines_relationship: ->(r) { r.equivalent_field "created_at", locally_named: "team_created_at" }, + on_statline: ->(t) { t.field "created_at", "DateTime" } + ) + + target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") + expect(target.rollover_timestamp_value_source).to eq "created_at" + end + + it "raises a clear error when no `equivalent_field` is configured for the rollover timestamp field" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_created_at", "DateTime" }, + on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "Cannot update `Team` documents with nested data from related `statLine` events", + "`Team` uses a rollover index but we don't know what `statLines` timestamp field to use to select an index for the `Team` update requests", + "add a call like this to the `Team.statLines` relationship definition", + '`rel.equivalent_field "[StatLine field]", locally_named: "team_created_at"`' + ) + end + end + + describe "validations" do + it "raises an error when the root index has not been configured with `has_had_multiple_sources!`" do + expect { + nested_sourced_from_schema(multiple_sources: false) + }.to raise_error Errors::SchemaError, a_string_including( + "Type `Team` has nested `sourced_from` fields (via `Player.statLine`) but its index `teams` has not been configured with `has_had_multiple_sources!`" + ) + end + + it "raises an error when the leaf relationship is `relates_to_many`", :dont_validate_graphql_schema do + expect { + object_type_metadata_for "Team" do |s| + s.object_type "Team" do |t| + t.field "id", "ID!" + t.field "players", "[Player!]!" do |f| + f.mapping type: "object" + end + t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true + t.index("teams") { |i| i.has_had_multiple_sources! } + end + + s.object_type "Player" do |t| + t.field "id", "ID!" + t.field "goals", "Int" do |f| + f.sourced_from "statLines", "goals" + end + t.relates_to_many "statLines", "StatLine", via: "playerId", dir: :in, indexing_only: true do |r| + r.parent_relationship "Team", "statLines" + end + end + + s.object_type "StatLine" do |t| + t.field "id", "ID!" + t.field "teamId", "ID" + t.field "playerId", "ID" + t.field "goals", "Int" + t.index "stat_lines" + end + end + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLines` is a `relates_to_many` relationship, but nested `sourced_from` is only supported on a `relates_to_one` relationship." ) - }.not_to raise_error + end + end + end + + describe "`parent_relationship` validations" do + it "accepts an explicit `parent_field_name:` to identify the embedding field" do + metadata = nested_sourced_from_schema( + fetch: "StatLine", + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "players" } + ) + + expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("players.statLine") end it "discovers an embedding field declared with `indexing_only: true`" do # An `indexing_only: true` field is absent from `graphql_fields_by_name` but present in # `indexing_fields_by_name_in_index`, so this only resolves when the latter is used. - expect { - nested_sourced_from_schema(players_field: nil, on_team: ->(t) { - t.field "players", "[Player!]!", indexing_only: true do |f| - f.mapping type: "object" - end - }) - }.not_to raise_error + metadata = nested_sourced_from_schema(fetch: "StatLine", players_field: nil, on_team: ->(t) { + t.field "players", "[Player!]!", indexing_only: true do |f| + f.mapping type: "object" + end + }) + + expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("players.statLine") end it "raises an error when `parent_relationship` is called twice on the same relationship" do @@ -1393,16 +1536,16 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) # League -> Team -> Player -> GameAppearance is a 4-level `parent_relationship` chain (leaf # to root: GameAppearance's relationship walks up through Player and Team to the indexed # League). All four relationships relate to the same `StatLine` source type, so the chain is - # valid. This can only pass if `resolve_chain` actually recurses through every level. - expect { - object_type_metadata_for "League" do |s| + # valid. This can only resolve correctly if `resolve_chain` recurses through every level. + metadata = + object_type_metadata_for "StatLine" do |s| s.object_type "League" do |t| t.field "id", "ID!" t.field "teams", "[Team!]!" do |f| f.mapping type: "object" end t.relates_to_many "statLines", "StatLine", via: "leagueId", dir: :in, indexing_only: true - t.index "leagues" + t.index("leagues") { |i| i.has_had_multiple_sources! } end s.object_type "Team" do |t| @@ -1445,7 +1588,10 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) t.index "stat_lines" end end - }.not_to raise_error + + # The qualified relationship spans every embedding field from the root index down to the leaf, + # proving the chain recursed through all four levels. + expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("teams.players.gameAppearances.statLine") end it "raises an error when the parent type does not exist" do @@ -1526,16 +1672,19 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end it "uses `parent_field_name:` to disambiguate when multiple embedding fields exist" do - expect { - nested_sourced_from_schema( - on_team: ->(t) { - t.field "bench_players", "[Player!]!" do |f| - f.mapping type: "object" - end - }, - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "bench_players" } - ) - }.not_to raise_error + metadata = nested_sourced_from_schema( + fetch: "StatLine", + on_team: ->(t) { + t.field "bench_players", "[Player!]!" do |f| + f.mapping type: "object" + end + }, + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "bench_players" } + ) + + # The qualified relationship reflects `bench_players`, confirming disambiguation chose that field + # rather than the also-eligible `players`. + expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("bench_players.statLine") end it "raises an error when an explicit `parent_field_name:` references a non-existent field" do @@ -1547,59 +1696,79 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) "`Player.statLine` references field `Team.nonExistentField` via `parent_relationship`, but that field does not exist." ) end + end - def nested_sourced_from_schema( - on_team: nil, - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines" }, - player_indexing_only: true, - players_field: "[Player!]!", - index_teams: true, - index_players: false - ) - object_type_metadata_for "Team" do |s| - s.object_type "Team" do |t| - t.field "id", "ID!" - - if players_field - t.field "players", players_field do |f| - f.mapping type: "object" if players_field.start_with?("[") - end - end + # Projects a source type's update targets to a `{qualified_relationship => target}` map, excluding the + # type's own `__self` target — leaving one entry per `parent_relationship` chain backing `sourced_from`. + def nested_update_targets_by_relationship(metadata) + metadata + .update_targets + .reject { |t| t.relationship == SELF_RELATIONSHIP_NAME } + .to_h { |t| [t.relationship, t] } + end - t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true - on_team&.call(t) + def nested_sourced_from_schema( + fetch: "Team", + on_team: nil, + on_statlines_relationship: nil, + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines" }, + player_indexing_only: true, + players_field: "[Player!]!", + index_teams: true, + index_players: false, + multiple_sources: true, + on_teams_index: nil, + on_statline: nil + ) + object_type_metadata_for fetch do |s| + s.object_type "Team" do |t| + t.field "id", "ID!" - if index_teams - t.index("teams") { |i| i.has_had_multiple_sources! } + if players_field + t.field "players", players_field do |f| + f.mapping type: "object" if players_field.start_with?("[") end end - s.object_type "Player" do |t| - t.field "id", "ID!" + t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true do |r| + on_statlines_relationship&.call(r) + end + on_team&.call(t) - t.field "goals", "Int" do |f| - f.sourced_from "statLine", "goals" + if index_teams + t.index("teams") do |i| + i.has_had_multiple_sources! if multiple_sources + on_teams_index&.call(i) end + end + end - t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: player_indexing_only do |r| - on_player_relationship.call(r) - end + s.object_type "Player" do |t| + t.field "id", "ID!" - if index_players - t.index("players") { |i| i.has_had_multiple_sources! } - end + t.field "goals", "Int" do |f| + f.sourced_from "statLine", "goals" + end + + t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: player_indexing_only do |r| + on_player_relationship.call(r) end - s.object_type "StatLine" do |t| - t.field "id", "ID!" - t.field "teamId", "ID" - t.field "playerId", "ID" - t.field "goals", "Int" - t.index "stat_lines" + if index_players + t.index("players") { |i| i.has_had_multiple_sources! } end + end - yield s if block_given? + s.object_type "StatLine" do |t| + t.field "id", "ID!" + t.field "teamId", "ID" + t.field "playerId", "ID" + t.field "goals", "Int" + on_statline&.call(t) + t.index "stat_lines" end + + yield s if block_given? end end From 781f5979fd32d2d8fef7ad11929c1020c8100343 Mon Sep 17 00:00:00 2001 From: ellisandrews-toast Date: Thu, 11 Jun 2026 14:52:05 -0400 Subject: [PATCH 2/4] Address PR review: share update-target resolution logic and standardize sourced_from validation --- AGENTS.md | 1 + .../indexing/nested_update_target_resolver.rb | 151 +++++---------- .../indexing/relationship_chain_resolver.rb | 62 ++++-- .../indexing/sourced_field_params_resolver.rb | 66 ------- .../sourced_from_update_targets_resolver.rb | 16 +- .../top_level_update_target_resolver.rb | 110 +++++++++++ .../indexing/update_target_factory.rb | 6 +- .../indexing/update_target_resolver.rb | 160 ---------------- .../update_target_resolver_support.rb | 178 +++++++++++++++++ .../nested_update_target_resolver.rbs | 34 +--- .../indexing/relationship_chain_resolver.rbs | 20 +- .../sourced_field_params_resolver.rbs | 19 -- ...s => top_level_update_target_resolver.rbs} | 34 +--- .../indexing/update_target_factory.rbs | 4 +- .../update_target_resolver_support.rbs | 47 +++++ .../update_targets_spec.rb | 181 ++++++++++++------ 16 files changed, 578 insertions(+), 511 deletions(-) delete mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb create mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb delete mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb create mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb delete mode 100644 elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs rename elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/{update_target_resolver.rbs => top_level_update_target_resolver.rbs} (52%) create mode 100644 elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs diff --git a/AGENTS.md b/AGENTS.md index 141992018..ca449b19d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -209,6 +209,7 @@ Custom gems can be added via `Gemfile-custom` (see `Gemfile-custom.example`), th - Use `expect_to_return_non_nil_values_from_all_attributes` to test wrapper classes (like `WarehouseLambda`, `GraphQL`, `Indexer`, etc.). This automatically exercises every zero-argument method and verifies all dependencies are built successfully. - Use the `:capture_logs` RSpec tag instead of logger test doubles for verifying log output. Access logs with `logged_jsons_of_type(message_type)`. - Use `build_*` helper methods from `spec/support/builds_*.rb` to construct test objects. These helpers provide sensible defaults while allowing selective overrides for testing specific scenarios. +- Only tag a spec with `:dont_validate_graphql_schema` when it's actually required for the test to pass under `VALIDATE_GRAPHQL_SCHEMAS=1` (the tag skips that validation). First try to fix the test's schema so it produces a valid GraphQL schema while still exercising what the test is meant to exercise; reach for the tag only when that isn't possible. Verify by running the spec with `VALIDATE_GRAPHQL_SCHEMAS=1`. ## Important Patterns diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb index 7dac9c380..47e20828b 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb @@ -8,21 +8,21 @@ require "elastic_graph/schema_artifacts/runtime_metadata/params" require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params" -require "elastic_graph/schema_definition/indexing/sourced_field_params_resolver" require "elastic_graph/schema_definition/indexing/update_target_factory" +require "elastic_graph/schema_definition/indexing/update_target_resolver_support" module ElasticGraph module SchemaDefinition module Indexing - # Responsible for resolving a nested `parent_relationship` chain and a set of `sourced_from` - # fields into an `UpdateTarget` that instructs the indexer to update a nested element within - # the root indexed type when a source event arrives. This is the nested analog of - # `UpdateTargetResolver`, which handles the top-level (non-nested) `sourced_from` case. + # Resolves a relationship and a set of `sourced_from` fields into an `UpdateTarget` that instructs the + # indexer how to update a type from the related type's source events. This handles the *nested* case, + # where the `sourced_from` fields live on a type embedded within an indexed type (reached via a + # `parent_relationship` chain) and the target updates the root indexed type the embedded type nests + # within. (The *top-level* case—`sourced_from` fields directly on an indexed type—is handled by + # `TopLevelUpdateTargetResolver`.) # # @private class NestedUpdateTargetResolver - include SourcedFieldParamsResolver - def initialize( object_type:, sourced_fields:, @@ -42,13 +42,21 @@ def initialize( # # Returns a tuple of the `update_target` (if valid) and a list of errors. def resolve - relationship_errors = validate_relationship - field_params, field_params_errors = resolve_sourced_field_params - routing_value_source, routing_error = resolve_field_source(RoutingSourceAdapter) - rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(RolloverTimestampSourceAdapter) - has_had_multiple_sources_errors = validate_has_had_multiple_sources - - all_errors = relationship_errors + field_params_errors + has_had_multiple_sources_errors + + relationship_errors = validate_relationships + field_params, field_params_errors = UpdateTargetResolverSupport.resolve_sourced_field_params( + object_type: object_type, + related_type: related_type, + sourced_fields: sourced_fields, + field_path_resolver: field_path_resolver + ) + routing_value_source, routing_error = resolve_field_source(UpdateTargetResolverSupport::RoutingSourceAdapter) + rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(UpdateTargetResolverSupport::RolloverTimestampSourceAdapter) + # Routing/rollover values resolve from `equivalent_field`s on the root relationship, so they are + # validated there (matching how `TopLevelUpdateTargetResolver` validates its own relationship). + equivalent_field_errors = root_relationship.validate_equivalent_fields(field_path_resolver) + has_had_multiple_sources_errors = UpdateTargetResolverSupport.validate_has_had_multiple_sources(root_index, root_type, relationship) + + all_errors = relationship_errors + field_params_errors + equivalent_field_errors + has_had_multiple_sources_errors + [routing_error, rollover_timestamp_error].compact if all_errors.empty? @@ -56,10 +64,9 @@ def resolve type: root_type.name, relationship: resolved_chain.qualified_relationship, id_source: root_relationship.foreign_key, - top_level_fields_params: {}, sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams.new( field_params: field_params, - path_identifier_params: build_path_identifier_params + path_identifier_params: resolved_chain.path_identifier_params ), routing_value_source: routing_value_source, rollover_timestamp_value_source: rollover_timestamp_value_source @@ -95,101 +102,31 @@ def related_type @related_type ||= schema_def_state.object_types_by_name.fetch(relationship.related_type.unwrap_non_null.name) end - # Applies validations specific to relationships backing nested `sourced_from` fields. - def validate_relationship - errors = [] # : ::Array[::String] - - if relationship.many? - errors << "`#{object_type.name}.#{relationship.name}` is a `relates_to_many` relationship, but nested " \ - "`sourced_from` is only supported on a `relates_to_one` relationship." - end - - errors - end - - # Builds the params identifying which nested element to update: one entry per list segment in the - # chain, pulling the matching value from the segment's foreign key on the source event. Object - # segments have no ambiguity, so they contribute no identifier. - def build_path_identifier_params - resolved_chain.path_segments.filter_map do |segment| - source_field = segment.source_field_name - next unless source_field - - param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( - source_path: source_field, - cardinality: :one - ) + # Applies validations on the relationships backing nested `sourced_from` fields. Only the leaf must be + # `relates_to_one` (it's where a value is sourced through), but every relationship in the chain joins on + # a foreign key that routes the source event, so each must be routable (inbound foreign key, no filter). + def validate_relationships + leaf_prefix = UpdateTargetResolverSupport.relationship_error_prefix(relationship, sourced_fields) - [source_field, param] - end.to_h - end - - # Resolves `routing_value_source` and `rollover_timestamp_value_source` against the root - # relationship and root index, using an `adapter` for the differences between the two cases. - # - # Returns a tuple of the resolved source (if successful) and an error (if invalid). - def resolve_field_source(adapter) - field_source_graphql_path_string = adapter.get_field_source(root_relationship, root_index) do |local_need| - # The update is triggered by the leaf relationship's source events (`relationship`), but routing and - # rollover are resolved through — and `equivalent_field` is configured on — the root relationship. - error = "Cannot update `#{root_type.name}` documents with nested data from related `#{relationship.name}` " \ - "events, because #{adapter.cannot_update_reason(root_type, root_relationship.name)}. To fix it, add a call " \ - "like this to the `#{root_type.name}.#{root_relationship.name}` relationship definition: `rel.equivalent_field " \ - "\"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." - - return [nil, error] - end - - if field_source_graphql_path_string - field_path = field_path_resolver.resolve_public_path(related_type, field_source_graphql_path_string) do |parent_field| - !parent_field.type.list? + UpdateTargetResolverSupport.validate_relationship_cardinality(relationship, error_prefix: leaf_prefix) + + resolved_chain.relationships.flat_map do |chain_relationship| + error_prefix = UpdateTargetResolverSupport.relationship_error_prefix(chain_relationship, sourced_fields) + UpdateTargetResolverSupport.validate_relationship_routability(chain_relationship, error_prefix: error_prefix) end - - [field_path&.path_in_index, nil] - else - [nil, nil] - end - end - - # Validates that `has_had_multiple_sources!` has been configured on the root index, since nested - # `sourced_from` makes the root index multi-sourced. - def validate_has_had_multiple_sources - return [] if root_index.has_had_multiple_sources_flag - - ["Type `#{root_type.name}` has nested `sourced_from` fields (via `#{object_type.name}.#{relationship.name}`) but " \ - "its index `#{root_index.name}` has not been configured with `has_had_multiple_sources!`. To resolve this, add " \ - "`i.has_had_multiple_sources!` within the `t.index \"#{root_index.name}\"` block. This flag is required because " \ - "indices with multiple sources can contain incomplete documents, and ElasticGraph needs to know this to apply " \ - "proper filtering. Once set, this flag should remain even if you later remove all `sourced_from` fields, as the " \ - "index may still contain historical incomplete documents."] - end - - # Adapter for the `routing_value_source` case for use by `resolve_field_source`. - # - # @private - module RoutingSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.routing_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(root_type, relationship_name) - "`#{root_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ - "to route the `#{root_type.name}` update requests" - end end - # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. - # - # @private - module RolloverTimestampSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.rollover_timestamp_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(root_type, relationship_name) - "`#{root_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ - "to select an index for the `#{root_type.name}` update requests" - end + # Resolves a routing/rollover field source via the shared helper, supplying the root type, index, and + # relationship — the update target updates the root indexed type via the root relationship, so routing + # and rollover (and the `equivalent_field` config) are resolved there. + def resolve_field_source(adapter) + UpdateTargetResolverSupport.resolve_field_source( + adapter, + relationship: root_relationship, + index_def: root_index, + related_type: related_type, + field_path_resolver: field_path_resolver, + updated_type: root_type + ) end end end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb index 623adba22..47c175c95 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb @@ -7,7 +7,9 @@ # frozen_string_literal: true require "elastic_graph/errors" +require "elastic_graph/schema_artifacts/runtime_metadata/params" require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment" +require "elastic_graph/support/memoizable_data" module ElasticGraph module SchemaDefinition @@ -15,11 +17,20 @@ module Indexing # The result of resolving a relationship chain. # # @private - class ResolvedRelationshipChain < ::Data.define( - :root_relationship, # Relationship the chain terminated at on the root indexed type - :leaf_relationship, # Relationship the chain was resolved from — backs `sourced_from` field(s) - :path_segments # Array - the embedding fields to descend, ordered root-to-leaf + class ResolvedRelationshipChain < Support::MemoizableData.define( + :relationships, # Array - every relationship in the chain, ordered root-to-leaf + :path_segments # Array - the embedding fields to descend, ordered root-to-leaf ) + # The relationship the chain terminated at on the root indexed type. + def root_relationship + relationships.first + end + + # The relationship the chain was resolved from — backs the `sourced_from` field(s). + def leaf_relationship + relationships.last + end + # The index the chain terminates at — where the root indexed type's documents (and their nested # elements) live, and where the chain's navigation path is registered. The chain always terminates at # an indexed type (enforced when it is resolved), so this is never `nil`. @@ -35,14 +46,15 @@ def register_on_root_index # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) def qualified_relationship - (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") + @qualified_relationship ||= + (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") end # The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for # each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment` # for each object embedding field. def sourced_from_nested_paths - path_segments.map do |segment| + @sourced_from_nested_paths ||= path_segments.map do |segment| if (source_field = segment.source_field_name) SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( field: segment.field.name_in_index, @@ -55,6 +67,19 @@ def sourced_from_nested_paths end end end + + # The params identifying which nested element to update at each level: one entry per list segment, + # pulling the matching value from the segment's foreign key on the source event. Object segments have no + # ambiguity, so they contribute no identifier. + def path_identifier_params + @path_identifier_params ||= path_segments.filter_map do |segment| + source_field = segment.source_field_name + next unless source_field + + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: source_field, cardinality: :one) + [source_field, param] + end.to_h + end end # Describes how to navigate from a parent type into a nested child element. @@ -95,11 +120,11 @@ def initialize(schema_def_state:) def resolve(starting_relationship) errors = [] # : ::Array[::String] path_segments = [] # : ::Array[PathSegment] - visited_relationships = Set[starting_relationship] + relationships = [] # : ::Array[SchemaElements::Relationship] # resolve_chain returns the chain's root relationship (the one with no parent_ref), or nil # if it hit an error walking the chain (in which case the error is already recorded). - root_relationship = resolve_chain(starting_relationship, path_segments, errors, visited_relationships) + root_relationship = resolve_chain(starting_relationship, path_segments, relationships, errors) return [nil, errors] unless root_relationship # A valid chain must terminate at a relationship defined on an indexed type. @@ -112,8 +137,7 @@ def resolve(starting_relationship) end resolved_chain = ResolvedRelationshipChain.new( - root_relationship: root_relationship, - leaf_relationship: starting_relationship, + relationships: relationships.reverse, # reverse so root-to-leaf order path_segments: path_segments.reverse # reverse so root-to-leaf order ) @@ -122,25 +146,27 @@ def resolve(starting_relationship) private - # Recursively walks from leaf to root, building path segments in reverse. Returns the root - # relationship (the one with no parent_ref) on success, or nil if an error was encountered. - def resolve_chain(current_rel, path_segments, errors, visited_relationships) + # Recursively walks from leaf to root, collecting relationships and building path segments in reverse. + # Returns the root relationship (the one with no parent_ref) on success, or nil if an error was + # encountered. + def resolve_chain(current_rel, path_segments, relationships, errors) + relationships << current_rel + parent_ref = current_rel.parent_ref return current_rel unless parent_ref - parent_rel = resolve_parent_ref(current_rel, parent_ref, errors, visited_relationships) + parent_rel = resolve_parent_ref(current_rel, parent_ref, relationships, errors) return nil unless parent_rel build_path_segment(current_rel, parent_rel.parent_type, path_segments, errors) return nil if errors.any? - visited_relationships.add(parent_rel) - resolve_chain(parent_rel, path_segments, errors, visited_relationships) + resolve_chain(parent_rel, path_segments, relationships, errors) end # Resolves a parent_ref into the concrete parent relationship. # Returns the parent relationship on success, or appends to errors and returns nil. - def resolve_parent_ref(current_rel, ref, errors, visited_relationships) + def resolve_parent_ref(current_rel, ref, relationships, errors) unless current_rel.indexing_only errors << "#{rel_description(current_rel)} uses `parent_relationship` but is not declared with " \ "`indexing_only: true`. Relationships with `parent_relationship` must be indexing-only." @@ -162,7 +188,7 @@ def resolve_parent_ref(current_rel, ref, errors, visited_relationships) return nil end - if visited_relationships.include?(parent_rel) + if relationships.include?(parent_rel) errors << "#{rel_description(current_rel)} creates a circular `parent_relationship` chain " \ "— `#{parent_type.name}.#{ref.relationship_name}` was already visited. The chain must terminate at a root indexed type." return nil diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb deleted file mode 100644 index 2f806f0e1..000000000 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rb +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2024 - 2026 Block, Inc. -# -# Use of this source code is governed by an MIT-style -# license that can be found in the LICENSE file or at -# https://opensource.org/licenses/MIT. -# -# frozen_string_literal: true - -require "elastic_graph/schema_artifacts/runtime_metadata/params" - -module ElasticGraph - module SchemaDefinition - module Indexing - # Shared logic for resolving a set of `sourced_from` fields into the params map that pulls each field's - # value from its source path on the related type. Used by both `UpdateTargetResolver` (top-level - # `sourced_from`) and `NestedUpdateTargetResolver` (nested `sourced_from`), which resolve the same way - # but build different update targets around the result. - # - # Hosts must provide `object_type`, `related_type`, `sourced_fields`, and `field_path_resolver`. - # - # @private - module SourcedFieldParamsResolver - # Resolves `sourced_fields` into a `[field_name_in_index => DynamicParam]` map, validating each field - # against its source. Returns a tuple of the params map and a list of any errors. - def resolve_sourced_field_params - errors = [] # : ::Array[::String] - - field_params = sourced_fields.filter_map do |field| - field_source = field.source # : SchemaElements::FieldSource - - referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| - !parent_field.type.list? - end - - if referenced_field_path.nil? - explanation = - if field_source.field_path.include?(".") - "could not be resolved: some parts do not exist on their respective types as non-list fields" - else - "does not exist as an indexing field" - end - - errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." - nil - elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null - errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of its source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." - nil - elsif field.type.non_null? - errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." - nil - else - param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( - source_path: referenced_field_path.path_in_index, - cardinality: :one - ) - - [field.name_in_index, param] - end - end.to_h - - [field_params, errors] - end - end - end - end -end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb index c1a0e1110..eed82f0b2 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb @@ -10,7 +10,7 @@ require "elastic_graph/schema_definition/indexing/nested_update_target_resolver" require "elastic_graph/schema_definition/indexing/relationship_chain_resolver" require "elastic_graph/schema_definition/indexing/relationship_resolver" -require "elastic_graph/schema_definition/indexing/update_target_resolver" +require "elastic_graph/schema_definition/indexing/top_level_update_target_resolver" module ElasticGraph module SchemaDefinition @@ -91,26 +91,16 @@ def resolve_nested_update_targets(object_type, &error_reporter) end def resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields) - update_target_resolver = UpdateTargetResolver.new( + top_level_update_target_resolver = TopLevelUpdateTargetResolver.new( object_type: object_type, resolved_relationship: resolved_relationship, sourced_fields: sourced_fields, field_path_resolver: @schema_def_state.field_path_resolver ) - update_target, errors = update_target_resolver.resolve + update_target, errors = top_level_update_target_resolver.resolve errors.each { |error| yield :sourced_field, error } - # Validate that has_had_multiple_sources! has been called when sourced_from is used - index_def = object_type.own_index_def # : Index - unless index_def.has_had_multiple_sources_flag - yield :sourced_field, "Type `#{object_type.name}` uses `sourced_from` fields but its index `#{index_def.name}` " \ - "has not been configured with `has_had_multiple_sources!`. To resolve this, add `i.has_had_multiple_sources!` within the " \ - "`t.index \"#{index_def.name}\"` block. This flag is required because indices with multiple sources can contain " \ - "incomplete documents, and ElasticGraph needs to know this to apply proper filtering. Once set, this flag should remain even " \ - "if you later remove all `sourced_from` fields, as the index may still contain historical incomplete documents." - end - [resolved_relationship.related_type.name, update_target] if update_target end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb new file mode 100644 index 000000000..f7a7d7dea --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb @@ -0,0 +1,110 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_definition/indexing/update_target_factory" +require "elastic_graph/schema_definition/indexing/update_target_resolver_support" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Resolves a relationship and a set of `sourced_from` fields into an `UpdateTarget` that instructs the + # indexer how to update a type from the related type's source events. This handles the *top-level* case, + # where the `sourced_from` fields live directly on an indexed type and the target updates that same + # indexed type. (The *nested* case—`sourced_from` fields on a type embedded within an indexed type—is + # handled by `NestedUpdateTargetResolver`.) + # + # @private + class TopLevelUpdateTargetResolver + def initialize( + object_type:, + resolved_relationship:, + sourced_fields:, + field_path_resolver: + ) + @object_type = object_type + @resolved_relationship = resolved_relationship + @sourced_fields = sourced_fields + @field_path_resolver = field_path_resolver + end + + # Resolves the `object_type`, `resolved_relationship`, and `sourced_fields` into an `UpdateTarget`, validating + # that everything is defined correctly. + # + # Returns a tuple of the `update_target` (if valid), and a list of errors. + def resolve + relationship_errors = validate_relationship + top_level_fields_params, top_level_fields_params_errors = UpdateTargetResolverSupport.resolve_sourced_field_params( + object_type: object_type, + related_type: related_type, + sourced_fields: sourced_fields, + field_path_resolver: field_path_resolver + ) + routing_value_source, routing_error = resolve_field_source(UpdateTargetResolverSupport::RoutingSourceAdapter) + rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(UpdateTargetResolverSupport::RolloverTimestampSourceAdapter) + equivalent_field_errors = resolved_relationship.relationship.validate_equivalent_fields(field_path_resolver) + index_def = object_type.own_index_def # : Index + has_had_multiple_sources_errors = UpdateTargetResolverSupport.validate_has_had_multiple_sources( + index_def, object_type, resolved_relationship.relationship + ) + + all_errors = relationship_errors + top_level_fields_params_errors + equivalent_field_errors + + has_had_multiple_sources_errors + [routing_error, rollover_timestamp_error].compact + + if all_errors.empty? + update_target = UpdateTargetFactory.new_normal_indexing_update_target( + type: object_type.name, + relationship: resolved_relationship.relationship_name, + id_source: resolved_relationship.relation_metadata.foreign_key, + top_level_fields_params: top_level_fields_params, + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source + ) + end + + [update_target, all_errors] + end + + private + + # @dynamic object_type, resolved_relationship, sourced_fields, field_path_resolver + attr_reader :object_type, :resolved_relationship, :sourced_fields, :field_path_resolver + + # Applies additional validations (beyond what `RelationshipResolver` applies) on relationships that are + # used by `sourced_from` fields. + def validate_relationship + relationship = resolved_relationship.relationship + error_prefix = UpdateTargetResolverSupport.relationship_error_prefix(relationship, sourced_fields) + + UpdateTargetResolverSupport.validate_relationship_cardinality(relationship, error_prefix: error_prefix) + + UpdateTargetResolverSupport.validate_relationship_routability(relationship, error_prefix: error_prefix) + end + + # The related type whose source events feed this update target — where `sourced_from` fields are resolved. + def related_type + resolved_relationship.related_type + end + + # Resolves a routing/rollover field source via the shared helper, supplying the top-level type, index, + # and relationship. + def resolve_field_source(adapter) + index_def = object_type.own_index_def # : Index + + UpdateTargetResolverSupport.resolve_field_source( + adapter, + relationship: resolved_relationship.relationship, + index_def: index_def, + related_type: related_type, + field_path_resolver: field_path_resolver, + updated_type: object_type + ) + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb index ea4e267d2..2f279fb68 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb @@ -6,6 +6,8 @@ # # frozen_string_literal: true +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params" + module ElasticGraph module SchemaDefinition module Indexing @@ -16,10 +18,10 @@ def self.new_normal_indexing_update_target( type:, relationship:, id_source:, - top_level_fields_params:, routing_value_source:, rollover_timestamp_value_source:, - sourced_from_nested_params: + top_level_fields_params: {}, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY ) SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: type, diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb deleted file mode 100644 index 56c36654d..000000000 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright 2024 - 2026 Block, Inc. -# -# Use of this source code is governed by an MIT-style -# license that can be found in the LICENSE file or at -# https://opensource.org/licenses/MIT. -# -# frozen_string_literal: true - -require "elastic_graph/schema_artifacts/runtime_metadata/params" -require "elastic_graph/schema_definition/indexing/sourced_field_params_resolver" -require "elastic_graph/schema_definition/indexing/update_target_factory" - -module ElasticGraph - module SchemaDefinition - module Indexing - # Responsible for resolving a relationship and a set of `sourced_from` fields into an `UpdateTarget` - # that contains the instructions for how the primary type should be updated from the related type's - # source events. - # - # @private - class UpdateTargetResolver - include SourcedFieldParamsResolver - - def initialize( - object_type:, - resolved_relationship:, - sourced_fields:, - field_path_resolver: - ) - @object_type = object_type - @resolved_relationship = resolved_relationship - @sourced_fields = sourced_fields - @field_path_resolver = field_path_resolver - end - - # Resolves the `object_type`, `resolved_relationship`, and `sourced_fields` into an `UpdateTarget`, validating - # that everything is defined correctly. - # - # Returns a tuple of the `update_target` (if valid), and a list of errors. - def resolve - relationship_errors = validate_relationship - top_level_fields_params, top_level_fields_params_errors = resolve_sourced_field_params - routing_value_source, routing_error = resolve_field_source(RoutingSourceAdapter) - rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(RolloverTimestampSourceAdapter) - equivalent_field_errors = resolved_relationship.relationship.validate_equivalent_fields(field_path_resolver) - - all_errors = relationship_errors + top_level_fields_params_errors + equivalent_field_errors + [routing_error, rollover_timestamp_error].compact - - if all_errors.empty? - update_target = UpdateTargetFactory.new_normal_indexing_update_target( - type: object_type.name, - relationship: resolved_relationship.relationship_name, - id_source: resolved_relationship.relation_metadata.foreign_key, - top_level_fields_params: top_level_fields_params, - sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY, - routing_value_source: routing_value_source, - rollover_timestamp_value_source: rollover_timestamp_value_source - ) - end - - [update_target, all_errors] - end - - private - - # @dynamic object_type, resolved_relationship, sourced_fields, field_path_resolver - attr_reader :object_type, :resolved_relationship, :sourced_fields, :field_path_resolver - - # Applies additional validations (beyond what `RelationshipResolver` applies) on relationships that are - # used by `sourced_from` fields. - def validate_relationship - errors = [] # : ::Array[::String] - - if resolved_relationship.relationship.many? - errors << "#{relationship_error_prefix} is a `relates_to_many` relationship, but `sourced_from` is only supported on a `relates_to_one` relationship." - end - - relation_metadata = resolved_relationship.relation_metadata - if relation_metadata.direction == :out - errors << "#{relationship_error_prefix} has an outbound foreign key (`dir: :out`), but `sourced_from` is only supported via inbound foreign key (`dir: :in`) relationships." - end - - unless relation_metadata.additional_filter.empty? - errors << "#{relationship_error_prefix} is a `relationship` using an `additional_filter` but `sourced_from` is not supported on relationships with `additional_filter`." - end - - errors - end - - # Helper method for building the prefix of relationship-related error messages. - def relationship_error_prefix - sourced_fields_description = "(referenced from `sourced_from` on field(s): #{sourced_fields.map { |f| "`#{f.name}`" }.join(", ")})" - "`#{object_type.name}.#{resolved_relationship.relationship_name}` #{sourced_fields_description}" - end - - # The related type whose source events feed this update target — where `sourced_from` fields are resolved. - def related_type - resolved_relationship.related_type - end - - # Helper method that assists with resolving `routing_value_source` and `rollover_timestamp_value_source`. - # Uses an `adapter` for the differences in these two cases. - # - # Returns a tuple of the resolved source (if successful) and an error (if invalid). - def resolve_field_source(adapter) - index_def = object_type.own_index_def # : Index - - field_source_graphql_path_string = adapter.get_field_source(resolved_relationship.relationship, index_def) do |local_need| - relationship_name = resolved_relationship.relationship_name - - error = "Cannot update `#{object_type.name}` documents with data from related `#{relationship_name}` events, " \ - "because #{adapter.cannot_update_reason(object_type, relationship_name)}. To fix it, add a call like this to the " \ - "`#{object_type.name}.#{relationship_name}` relationship definition: `rel.equivalent_field " \ - "\"[#{resolved_relationship.related_type.name} field]\", locally_named: \"#{local_need}\"`." - - return [nil, error] - end - - if field_source_graphql_path_string - field_path = field_path_resolver.resolve_public_path(resolved_relationship.related_type, field_source_graphql_path_string) do |parent_field| - !parent_field.type.list? - end - - [field_path&.path_in_index, nil] - else - [nil, nil] - end - end - - # Adapter for the `routing_value_source` case for use by `resolve_field_source`. - # - # @private - module RoutingSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.routing_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(object_type, relationship_name) - "`#{object_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ - "to route the `#{object_type.name}` update requests" - end - end - - # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. - # - # @private - module RolloverTimestampSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.rollover_timestamp_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(object_type, relationship_name) - "`#{object_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ - "to select an index for the `#{object_type.name}` update requests" - end - end - end - end - end -end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb new file mode 100644 index 000000000..930b8574d --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb @@ -0,0 +1,178 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Shared logic used by both `TopLevelUpdateTargetResolver` (top-level `sourced_from`) and + # `NestedUpdateTargetResolver` (nested `sourced_from`) to build their `UpdateTarget`s. The two resolvers + # differ in *which* type, relationship, and index they operate on, but resolve `sourced_from` fields, + # routing, and rollover the same way—so that common logic lives here. + # + # @private + module UpdateTargetResolverSupport + # Resolves `sourced_fields` into a `[field_name_in_index => DynamicParam]` map, validating each field + # against its source on `related_type`. Returns a tuple of the params map and a list of any errors. + def self.resolve_sourced_field_params(object_type:, related_type:, sourced_fields:, field_path_resolver:) + errors = [] # : ::Array[::String] + + field_params = sourced_fields.filter_map do |field| + field_source = field.source # : SchemaElements::FieldSource + + referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| + !parent_field.type.list? + end + + if referenced_field_path.nil? + explanation = + if field_source.field_path.include?(".") + "could not be resolved: some parts do not exist on their respective types as non-list fields" + else + "does not exist as an indexing field" + end + + errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." + nil + elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null + errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of its source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." + nil + elsif field.type.non_null? + errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." + nil + else + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: referenced_field_path.path_in_index, + cardinality: :one + ) + + [field.name_in_index, param] + end + end.to_h + + [field_params, errors] + end + + # Resolves the `routing_value_source` or `rollover_timestamp_value_source` for an `UpdateTarget`, using + # an `adapter` for the differences between the two cases. The value is drawn from `relationship`'s + # `equivalent_field` configuration and resolved to an indexing path on `related_type`. `index_def` is the + # index needing the value, and `updated_type` is the indexed type being updated (used in error messages). + # + # Returns a tuple of the resolved source (if successful) and an error (if invalid). + def self.resolve_field_source(adapter, relationship:, index_def:, related_type:, field_path_resolver:, updated_type:) + field_source_graphql_path_string = adapter.get_field_source(relationship, index_def) do |local_need| + error = "Cannot update `#{updated_type.name}` documents with data from related `#{relationship.name}` events, " \ + "because #{adapter.cannot_update_reason(updated_type, relationship.name)}. To fix it, add a call like this to the " \ + "`#{updated_type.name}.#{relationship.name}` relationship definition: `rel.equivalent_field " \ + "\"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." + + return [nil, error] + end + + if field_source_graphql_path_string + field_path = field_path_resolver.resolve_public_path(related_type, field_source_graphql_path_string) do |parent_field| + !parent_field.type.list? + end + + [field_path&.path_in_index, nil] + else + [nil, nil] + end + end + + # Validates that `relationship` is `relates_to_one`, since a `sourced_from` field copies a value from a + # single source record and a `relates_to_many` relationship has no single value to copy. `error_prefix` + # identifies the offending relationship (and the `sourced_from` fields that depend on it) in the message. + # + # Returns a list of any errors found. + def self.validate_relationship_cardinality(relationship, error_prefix:) + return [] unless relationship.many? + + ["#{error_prefix} is a `relates_to_many` relationship, but `sourced_from` is only supported on a " \ + "`relates_to_one` relationship."] + end + + # Validates that `relationship` can route `sourced_from` source events to the documents they update: it + # must use an inbound foreign key (so the event carries the key) and no `additional_filter` (which the + # `sourced_from` update path ignores, so a filtered relationship would silently mismatch). `error_prefix` + # identifies the offending relationship (and the `sourced_from` fields that depend on it) in the messages. + # + # Returns a list of any errors found. + def self.validate_relationship_routability(relationship, error_prefix:) + errors = [] # : ::Array[::String] + relation_metadata = relationship.runtime_metadata # : SchemaArtifacts::RuntimeMetadata::Relation + + if relation_metadata.direction == :out + errors << "#{error_prefix} has an outbound foreign key (`dir: :out`), but `sourced_from` is only " \ + "supported via inbound foreign key (`dir: :in`) relationships." + end + + unless relation_metadata.additional_filter.empty? + errors << "#{error_prefix} uses an `additional_filter`, but `sourced_from` is not supported on " \ + "relationships with `additional_filter`." + end + + errors + end + + # Builds the prefix of a relationship-related `sourced_from` error: the `Type.relationship` description, + # followed by the `sourced_from` fields that depend on it (so the author knows what's affected). Only + # called when there are `sourced_fields` (a relationship with none produces no update target to validate). + def self.relationship_error_prefix(relationship, sourced_fields) + fields_description = sourced_fields.map { |f| "`#{f.name}`" }.join(", ") + "`#{relationship.parent_type.name}.#{relationship.name}` (referenced from `sourced_from` on field(s): #{fields_description})" + end + + # Validates that `index` (which `type` writes to, sourcing data via `relationship`) has been configured + # with `has_had_multiple_sources!`. `sourced_from` makes an index multi-sourced, and ElasticGraph needs + # that flag to filter incomplete documents correctly. + # + # Returns a list of any errors found. + def self.validate_has_had_multiple_sources(index, type, relationship) + return [] if index.has_had_multiple_sources_flag + + ["Type `#{type.name}` has `sourced_from` fields (via `#{relationship.parent_type.name}.#{relationship.name}`) but " \ + "its index `#{index.name}` has not been configured with `has_had_multiple_sources!`. To resolve this, add " \ + "`i.has_had_multiple_sources!` within the `t.index \"#{index.name}\"` block. This flag is required because " \ + "indices with multiple sources can contain incomplete documents, and ElasticGraph needs to know this to apply " \ + "proper filtering. Once set, this flag should remain even if you later remove all `sourced_from` fields, as the " \ + "index may still contain historical incomplete documents."] + end + + # Adapter for the `routing_value_source` case for use by `resolve_field_source`. + # + # @private + module RoutingSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.routing_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(updated_type, relationship_name) + "`#{updated_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ + "to route the `#{updated_type.name}` update requests" + end + end + + # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. + # + # @private + module RolloverTimestampSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.rollover_timestamp_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(updated_type, relationship_name) + "`#{updated_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ + "to select an index for the `#{updated_type.name}` update requests" + end + end + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs index a3fe78135..e3f24a780 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs @@ -2,8 +2,6 @@ module ElasticGraph module SchemaDefinition module Indexing class NestedUpdateTargetResolver - include SourcedFieldParamsResolver - def initialize: ( object_type: indexableType, sourced_fields: ::Array[SchemaElements::Field], @@ -14,44 +12,24 @@ module ElasticGraph def resolve: () -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] - # Public to satisfy the `_SourcedFieldParamsHost` interface required by `SourcedFieldParamsResolver`. + private + attr_reader object_type: indexableType attr_reader sourced_fields: ::Array[SchemaElements::Field] attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver + attr_reader resolved_chain: ResolvedRelationshipChain + attr_reader schema_def_state: State @related_type: indexableType def related_type: () -> indexableType - private - - attr_reader resolved_chain: ResolvedRelationshipChain - attr_reader schema_def_state: State - def relationship: () -> SchemaElements::Relationship def root_relationship: () -> SchemaElements::Relationship def root_type: () -> indexableType def root_index: () -> Index - def validate_relationship: () -> ::Array[::String] - def build_path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] - def resolve_field_source: (_FieldSourceAdapter) -> [::String?, ::String?] - def validate_has_had_multiple_sources: () -> ::Array[::String] - - interface _FieldSourceAdapter - def get_field_source: (SchemaElements::Relationship, Index) { - (::String) -> bot - } -> ::String? - - def cannot_update_reason: (indexableType, ::String) -> ::String - end - - module RoutingSourceAdapter - extend _FieldSourceAdapter - end - - module RolloverTimestampSourceAdapter - extend _FieldSourceAdapter - end + def validate_relationships: () -> ::Array[::String] + def resolve_field_source: (UpdateTargetResolverSupport::_FieldSourceAdapter) -> [::String?, ::String?] end end end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs index 67a4e3371..c88a6f208 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs @@ -2,22 +2,26 @@ module ElasticGraph module SchemaDefinition module Indexing class ResolvedRelationshipChainSuperType - attr_reader root_relationship: SchemaElements::Relationship - attr_reader leaf_relationship: SchemaElements::Relationship + attr_reader relationships: ::Array[SchemaElements::Relationship] attr_reader path_segments: ::Array[PathSegment] def initialize: ( - root_relationship: SchemaElements::Relationship, - leaf_relationship: SchemaElements::Relationship, + relationships: ::Array[SchemaElements::Relationship], path_segments: ::Array[PathSegment] ) -> void end class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType + def root_relationship: () -> SchemaElements::Relationship + def leaf_relationship: () -> SchemaElements::Relationship def root_index: () -> Index def register_on_root_index: () -> void + @qualified_relationship: ::String def qualified_relationship: () -> ::String + @sourced_from_nested_paths: ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] + @path_identifier_params: ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + def path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] end class PathSegment @@ -42,15 +46,15 @@ module ElasticGraph def resolve_chain: ( SchemaElements::Relationship, ::Array[PathSegment], - ::Array[::String], - ::Set[SchemaElements::Relationship] + ::Array[SchemaElements::Relationship], + ::Array[::String] ) -> SchemaElements::Relationship? def resolve_parent_ref: ( SchemaElements::Relationship, SchemaElements::Relationship::ParentRef, - ::Array[::String], - ::Set[SchemaElements::Relationship] + ::Array[SchemaElements::Relationship], + ::Array[::String] ) -> SchemaElements::Relationship? def build_path_segment: ( diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs deleted file mode 100644 index 7e826896e..000000000 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_field_params_resolver.rbs +++ /dev/null @@ -1,19 +0,0 @@ -module ElasticGraph - module SchemaDefinition - module Indexing - interface _SourcedFieldParamsHost - def object_type: () -> indexableType - def related_type: () -> indexableType - def sourced_fields: () -> ::Array[SchemaElements::Field] - def field_path_resolver: () -> SchemaElements::FieldPath::Resolver - end - - module SourcedFieldParamsResolver : _SourcedFieldParamsHost - def resolve_sourced_field_params: () -> [ - ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], - ::Array[::String] - ] - end - end - end -end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs similarity index 52% rename from elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs rename to elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs index 2b456ac3c..c6d190283 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs @@ -1,9 +1,7 @@ module ElasticGraph module SchemaDefinition module Indexing - class UpdateTargetResolver - include SourcedFieldParamsResolver - + class TopLevelUpdateTargetResolver def initialize: ( object_type: indexableType, resolved_relationship: ResolvedRelationship, @@ -13,36 +11,16 @@ module ElasticGraph def resolve: () -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] - # Public to satisfy the `_SourcedFieldParamsHost` interface required by `SourcedFieldParamsResolver`. - attr_reader object_type: indexableType - attr_reader sourced_fields: ::Array[SchemaElements::Field] - attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver - def related_type: () -> indexableType - private + attr_reader object_type: indexableType attr_reader resolved_relationship: ResolvedRelationship + attr_reader sourced_fields: ::Array[SchemaElements::Field] + attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver def validate_relationship: () -> ::Array[::String] - def relationship_error_prefix: () -> ::String - - def resolve_field_source: (_FieldSourceAdapter) -> [::String?, ::String?] - - interface _FieldSourceAdapter - def get_field_source: (SchemaElements::Relationship, Index) { - (::String) -> bot - } -> ::String? - - def cannot_update_reason: (indexableType, ::String) -> ::String - end - - module RoutingSourceAdapter - extend _FieldSourceAdapter - end - - module RolloverTimestampSourceAdapter - extend _FieldSourceAdapter - end + def related_type: () -> indexableType + def resolve_field_source: (UpdateTargetResolverSupport::_FieldSourceAdapter) -> [::String?, ::String?] end end end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs index b8769fe76..025f7d51a 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs @@ -6,10 +6,10 @@ module ElasticGraph type: ::String, relationship: ::String, id_source: ::String, - top_level_fields_params: SchemaArtifacts::RuntimeMetadata::paramsHash, routing_value_source: ::String?, rollover_timestamp_value_source: ::String?, - sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams + ?top_level_fields_params: SchemaArtifacts::RuntimeMetadata::paramsHash, + ?sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams ) -> SchemaArtifacts::RuntimeMetadata::UpdateTarget private diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs new file mode 100644 index 000000000..ecfa7eacd --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs @@ -0,0 +1,47 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + module UpdateTargetResolverSupport + def self.resolve_sourced_field_params: ( + object_type: indexableType, + related_type: indexableType, + sourced_fields: ::Array[SchemaElements::Field], + field_path_resolver: SchemaElements::FieldPath::Resolver + ) -> [ + ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], + ::Array[::String] + ] + + def self.resolve_field_source: ( + _FieldSourceAdapter, + relationship: SchemaElements::Relationship, + index_def: Index, + related_type: indexableType, + field_path_resolver: SchemaElements::FieldPath::Resolver, + updated_type: indexableType + ) -> [::String?, ::String?] + + def self.validate_relationship_cardinality: (SchemaElements::Relationship, error_prefix: ::String) -> ::Array[::String] + def self.validate_relationship_routability: (SchemaElements::Relationship, error_prefix: ::String) -> ::Array[::String] + def self.relationship_error_prefix: (SchemaElements::Relationship, ::Array[SchemaElements::Field]) -> ::String + def self.validate_has_had_multiple_sources: (Index, indexableType, SchemaElements::Relationship) -> ::Array[::String] + + interface _FieldSourceAdapter + def get_field_source: (SchemaElements::Relationship, Index) { + (::String) -> bot + } -> ::String? + + def cannot_update_reason: (indexableType, ::String) -> ::String + end + + module RoutingSourceAdapter + extend _FieldSourceAdapter + end + + module RolloverTimestampSourceAdapter + extend _FieldSourceAdapter + end + end + end + end +end diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb index eb5a04ead..49a1fb1e2 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb @@ -451,7 +451,8 @@ def expect_widget_update_target_with( id_source:, top_level_fields_params:, relationship:, routing_value_source: nil, - rollover_timestamp_value_source: nil + rollover_timestamp_value_source: nil, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY ) expect(update_targets.count { |t| t.type == "Widget" }).to eq(1) widget_target = update_targets.find { |t| t.type == "Widget" } @@ -464,6 +465,7 @@ def expect_widget_update_target_with( expect(widget_target.routing_value_source).to eq(routing_value_source) expect(widget_target.rollover_timestamp_value_source).to eq(rollover_timestamp_value_source) expect(widget_target.top_level_fields_params).to eq(top_level_fields_params) + expect(widget_target.sourced_from_nested_params).to eq(sourced_from_nested_params) expect(widget_target.metadata_params).to eq(standard_metadata_params(relationship: relationship)) end @@ -491,7 +493,7 @@ def expect_widget_update_target_with( end end }.to raise_error Errors::SchemaError, a_string_including( - "Type `Widget` uses `sourced_from` fields but its index `widgets` has not been configured with `has_had_multiple_sources!`", + "Type `Widget` has `sourced_from` fields (via `Widget.workspace`) but its index `widgets` has not been configured with `has_had_multiple_sources!`", "To resolve this, add `i.has_had_multiple_sources!` within the `t.index \"widgets\"` block", "This flag is required because indices with multiple sources can contain incomplete documents", "Once set, this flag should remain even if you later remove all `sourced_from` fields" @@ -537,7 +539,7 @@ def expect_widget_update_target_with( f.sourced_from "workspace", "created_at" end end - }.to raise_error_about_workspace_relationship("is a `relationship` using an `additional_filter` but `sourced_from` is not supported on relationships with `additional_filter`.") + }.to raise_error_about_workspace_relationship("uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`.") end it "raises an error if the referenced relationship is not defined" do @@ -1300,36 +1302,32 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) describe "nested `sourced_from` update targets" do it "dumps a nested update target on the source type, keyed by the qualified relationship" do - targets = nested_update_targets_by_relationship(nested_sourced_from_schema(fetch: "StatLine")) - - expect(targets.keys).to contain_exactly("players.statLine") - target = targets.fetch("players.statLine") - expect(target.type).to eq "Team" - expect(target.relationship).to eq "players.statLine" - expect(target.script_id).to eq(INDEX_DATA_UPDATE_SCRIPT_ID) - expect(target.id_source).to eq "teamId" - expect(target.routing_value_source).to eq(nil) - expect(target.rollover_timestamp_value_source).to eq(nil) - expect(target.top_level_fields_params).to eq({}) - expect(target.sourced_from_nested_params.field_params).to eq( - "goals" => dynamic_param_with(source_path: "goals", cardinality: :one) - ) - expect(target.sourced_from_nested_params.path_identifier_params).to eq( - "playerId" => dynamic_param_with(source_path: "playerId", cardinality: :one) - ) + expect_statline_update_target_with(nested_sourced_from_schema) end it "omits `path_identifier_params` for a non-list (object) embedding field, since there is no element to match" do - metadata = nested_sourced_from_schema(fetch: "StatLine", players_field: "Player!") - target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") + expect_statline_update_target_with(nested_sourced_from_schema(players_field: "Player!"), path_identifier_params: {}) + end + + it "bundles every `sourced_from` field on the nested type into `field_params`" do + metadata = nested_sourced_from_schema( + on_player: ->(t) { + t.field "assists", "Int" do |f| + f.sourced_from "statLine", "assists" + end + }, + on_statline: ->(t) { t.field "assists", "Int" } + ) - expect(target.sourced_from_nested_params.path_identifier_params).to eq({}) + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "goals", cardinality: :one), + "assists" => dynamic_param_with(source_path: "assists", cardinality: :one) + }) end context "on a root type that uses custom routing" do it "determines the `routing_value_source` from an `equivalent_field` configured on the root relationship" do metadata = nested_sourced_from_schema( - fetch: "StatLine", on_team: ->(t) { t.field "team_owner_id", "ID!" }, on_teams_index: ->(i) { i.route_with "team_owner_id" }, on_statlines_relationship: ->(r) { @@ -1343,21 +1341,20 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end end - target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") - expect(target.routing_value_source).to eq "stats.owner_id" + expect_statline_update_target_with(metadata, routing_value_source: "stats.owner_id") end - it "leaves `routing_value_source` nil when the equivalent field is a `graphql_only` field with no indexing path" do - metadata = nested_sourced_from_schema( - fetch: "StatLine", - on_team: ->(t) { t.field "team_owner_id", "ID!" }, - on_teams_index: ->(i) { i.route_with "team_owner_id" }, - on_statlines_relationship: ->(r) { r.equivalent_field "owner_id", locally_named: "team_owner_id" }, - on_statline: ->(t) { t.field "owner_id", "ID", graphql_only: true } + it "raises a clear error when the equivalent field is a `graphql_only` field with no indexing path" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" }, + on_statlines_relationship: ->(r) { r.equivalent_field "owner_id", locally_named: "team_owner_id" }, + on_statline: ->(t) { t.field "owner_id", "ID", graphql_only: true } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`StatLine.owner_id` (referenced from an `equivalent_field` defined on `Team.statLines`) does not exist" ) - - target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") - expect(target.routing_value_source).to eq(nil) end it "raises a clear error when no `equivalent_field` is configured for the custom routing field" do @@ -1367,7 +1364,7 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) on_teams_index: ->(i) { i.route_with "team_owner_id" } ) }.to raise_error Errors::SchemaError, a_string_including( - "Cannot update `Team` documents with nested data from related `statLine` events", + "Cannot update `Team` documents with data from related `statLines` events", "`Team` uses custom shard routing but we don't know what `statLines` field to use to route the `Team` update requests", "add a call like this to the `Team.statLines` relationship definition", '`rel.equivalent_field "[StatLine field]", locally_named: "team_owner_id"`' @@ -1378,15 +1375,13 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) context "on a root type that uses a rollover index" do it "determines the `rollover_timestamp_value_source` from an `equivalent_field` configured on the root relationship" do metadata = nested_sourced_from_schema( - fetch: "StatLine", on_team: ->(t) { t.field "team_created_at", "DateTime" }, on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" }, on_statlines_relationship: ->(r) { r.equivalent_field "created_at", locally_named: "team_created_at" }, on_statline: ->(t) { t.field "created_at", "DateTime" } ) - target = nested_update_targets_by_relationship(metadata).fetch("players.statLine") - expect(target.rollover_timestamp_value_source).to eq "created_at" + expect_statline_update_target_with(metadata, rollover_timestamp_value_source: "created_at") end it "raises a clear error when no `equivalent_field` is configured for the rollover timestamp field" do @@ -1396,7 +1391,7 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" } ) }.to raise_error Errors::SchemaError, a_string_including( - "Cannot update `Team` documents with nested data from related `statLine` events", + "Cannot update `Team` documents with data from related `statLines` events", "`Team` uses a rollover index but we don't know what `statLines` timestamp field to use to select an index for the `Team` update requests", "add a call like this to the `Team.statLines` relationship definition", '`rel.equivalent_field "[StatLine field]", locally_named: "team_created_at"`' @@ -1405,15 +1400,31 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end describe "validations" do + it "raises an error when the nested `sourced_from` field does not exist on the sourced type" do + expect { + nested_sourced_from_schema(player_goals_source: "nonexistent") + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.goals` has an invalid `sourced_from` argument: `StatLine.nonexistent` does not exist as an indexing field." + ) + end + + it "raises an error when the nested `sourced_from` field's type does not match its source's type" do + expect { + nested_sourced_from_schema(player_goals_type: "String") + }.to raise_error Errors::SchemaError, a_string_including( + "The type of `Player.goals` is `String`, but the type of its source (`StatLine.goals`) is `Int`. These must agree to use `sourced_from`." + ) + end + it "raises an error when the root index has not been configured with `has_had_multiple_sources!`" do expect { nested_sourced_from_schema(multiple_sources: false) }.to raise_error Errors::SchemaError, a_string_including( - "Type `Team` has nested `sourced_from` fields (via `Player.statLine`) but its index `teams` has not been configured with `has_had_multiple_sources!`" + "Type `Team` has `sourced_from` fields (via `Player.statLine`) but its index `teams` has not been configured with `has_had_multiple_sources!`" ) end - it "raises an error when the leaf relationship is `relates_to_many`", :dont_validate_graphql_schema do + it "raises an error when the leaf relationship is `relates_to_many`" do expect { object_type_metadata_for "Team" do |s| s.object_type "Team" do |t| @@ -1444,7 +1455,7 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end end }.to raise_error Errors::SchemaError, a_string_including( - "`Player.statLines` is a `relates_to_many` relationship, but nested `sourced_from` is only supported on a `relates_to_one` relationship." + "`Player.statLines` (referenced from `sourced_from` on field(s): `goals`) is a `relates_to_many` relationship, but `sourced_from` is only supported on a `relates_to_one` relationship." ) end end @@ -1453,23 +1464,22 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) describe "`parent_relationship` validations" do it "accepts an explicit `parent_field_name:` to identify the embedding field" do metadata = nested_sourced_from_schema( - fetch: "StatLine", on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "players" } ) - expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("players.statLine") + expect_statline_update_target_with(metadata) end it "discovers an embedding field declared with `indexing_only: true`" do # An `indexing_only: true` field is absent from `graphql_fields_by_name` but present in # `indexing_fields_by_name_in_index`, so this only resolves when the latter is used. - metadata = nested_sourced_from_schema(fetch: "StatLine", players_field: nil, on_team: ->(t) { + metadata = nested_sourced_from_schema(players_field: nil, on_team: ->(t) { t.field "players", "[Player!]!", indexing_only: true do |f| f.mapping type: "object" end }) - expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("players.statLine") + expect_statline_update_target_with(metadata) end it "raises an error when `parent_relationship` is called twice on the same relationship" do @@ -1639,6 +1649,27 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) ) end + it "raises an error when a relationship in the chain uses an outbound foreign key" do + expect { + nested_sourced_from_schema(player_statline_dir: :out, player_statline_via: "statLineId") + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLine` (referenced from `sourced_from` on field(s): `goals`) has an outbound foreign key (`dir: :out`), but `sourced_from` is only supported via inbound foreign key (`dir: :in`) relationships." + ) + end + + it "raises an error when a relationship in the chain uses an `additional_filter`" do + expect { + nested_sourced_from_schema( + on_player_relationship: ->(r) { + r.parent_relationship "Team", "statLines" + r.additional_filter status: "active" + } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLine` (referenced from `sourced_from` on field(s): `goals`) uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`." + ) + end + it "raises an error when the chain terminates at a non-indexed type" do expect { nested_sourced_from_schema(index_teams: false) @@ -1673,7 +1704,6 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) it "uses `parent_field_name:` to disambiguate when multiple embedding fields exist" do metadata = nested_sourced_from_schema( - fetch: "StatLine", on_team: ->(t) { t.field "bench_players", "[Player!]!" do |f| f.mapping type: "object" @@ -1684,7 +1714,7 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) # The qualified relationship reflects `bench_players`, confirming disambiguation chose that field # rather than the also-eligible `players`. - expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("bench_players.statLine") + expect_statline_update_target_with(metadata, relationship: "bench_players.statLine") end it "raises an error when an explicit `parent_field_name:` references a non-existent field" do @@ -1703,13 +1733,39 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) def nested_update_targets_by_relationship(metadata) metadata .update_targets - .reject { |t| t.relationship == SELF_RELATIONSHIP_NAME } .to_h { |t| [t.relationship, t] } + .except(SELF_RELATIONSHIP_NAME) + end + + # Asserts the full nested `UpdateTarget` `StatLine` events produce for `Team`, with defaults for the + # common `nested_sourced_from_schema` case so each test overrides only the attributes it exercises. + def expect_statline_update_target_with( + metadata, + relationship: "players.statLine", + routing_value_source: nil, + rollover_timestamp_value_source: nil, + field_params: {"goals" => dynamic_param_with(source_path: "goals", cardinality: :one)}, + path_identifier_params: {"playerId" => dynamic_param_with(source_path: "playerId", cardinality: :one)} + ) + targets = nested_update_targets_by_relationship(metadata) + expect(targets.keys).to contain_exactly(relationship) + + target = targets.fetch(relationship) + expect(target.type).to eq "Team" + expect(target.relationship).to eq relationship + expect(target.script_id).to eq(INDEX_DATA_UPDATE_SCRIPT_ID) + expect(target.id_source).to eq "teamId" + expect(target.routing_value_source).to eq(routing_value_source) + expect(target.rollover_timestamp_value_source).to eq(rollover_timestamp_value_source) + expect(target.top_level_fields_params).to eq({}) + expect(target.sourced_from_nested_params.field_params).to eq(field_params) + expect(target.sourced_from_nested_params.path_identifier_params).to eq(path_identifier_params) + expect(target.metadata_params).to eq(standard_metadata_params(relationship: relationship)) end def nested_sourced_from_schema( - fetch: "Team", on_team: nil, + on_player: nil, on_statlines_relationship: nil, on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines" }, player_indexing_only: true, @@ -1718,9 +1774,14 @@ def nested_sourced_from_schema( index_players: false, multiple_sources: true, on_teams_index: nil, - on_statline: nil + on_statline: nil, + player_goals_type: "Int", + player_goals_source: "goals", + player_statline_dir: :in, + player_statline_via: "playerId" ) - object_type_metadata_for fetch do |s| + # `StatLine` is the source type, so its metadata carries the nested update targets we assert on. + object_type_metadata_for "StatLine" do |s| s.object_type "Team" do |t| t.field "id", "ID!" @@ -1730,9 +1791,7 @@ def nested_sourced_from_schema( end end - t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true do |r| - on_statlines_relationship&.call(r) - end + t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true, &on_statlines_relationship on_team&.call(t) if index_teams @@ -1746,14 +1805,16 @@ def nested_sourced_from_schema( s.object_type "Player" do |t| t.field "id", "ID!" - t.field "goals", "Int" do |f| - f.sourced_from "statLine", "goals" + t.field "goals", player_goals_type do |f| + f.sourced_from "statLine", player_goals_source end - t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: player_indexing_only do |r| + t.relates_to_one "statLine", "StatLine", via: player_statline_via, dir: player_statline_dir, indexing_only: player_indexing_only do |r| on_player_relationship.call(r) end + on_player&.call(t) + if index_players t.index("players") { |i| i.has_had_multiple_sources! } end From 754a05461eef5aa2de9507b618bc3cbe128aff6c Mon Sep 17 00:00:00 2001 From: ellisandrews-toast Date: Thu, 11 Jun 2026 15:25:23 -0400 Subject: [PATCH 3/4] Add nested sourced_from tests for nested source paths and name_in_index wiring --- .../update_targets_spec.rb | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb index 49a1fb1e2..4192cee39 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb @@ -1325,6 +1325,37 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) }) end + it "resolves a `sourced_from` field from a nested path on the source type" do + metadata = nested_sourced_from_schema( + player_goals_source: "stats.goals", + on_statline: ->(t) { t.field "stats", "StatLineStats" } + ) do |s| + s.object_type "StatLineStats" do |t| + t.field "goals", "Int" + end + end + + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "stats.goals", cardinality: :one) + }) + end + + it "keys `field_params` by the `sourced_from` field's `name_in_index`" do + metadata = nested_sourced_from_schema( + on_player: ->(t) { + t.field "assists", "Int", name_in_index: "assists_in_index" do |f| + f.sourced_from "statLine", "assists" + end + }, + on_statline: ->(t) { t.field "assists", "Int" } + ) + + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "goals", cardinality: :one), + "assists_in_index" => dynamic_param_with(source_path: "assists", cardinality: :one) + }) + end + context "on a root type that uses custom routing" do it "determines the `routing_value_source` from an `equivalent_field` configured on the root relationship" do metadata = nested_sourced_from_schema( From a2e964df4145a466f8234a86b7f17efc930c18b7 Mon Sep 17 00:00:00 2001 From: ellisandrews-toast Date: Thu, 11 Jun 2026 19:48:23 -0400 Subject: [PATCH 4/4] Address PR feedback --- .../indexing/relationship_chain_resolver.rb | 72 +--------------- .../indexing/resolved_relationship_chain.rb | 85 +++++++++++++++++++ .../sourced_from_update_targets_resolver.rb | 1 + .../indexing/relationship_chain_resolver.rbs | 23 ----- .../indexing/resolved_relationship_chain.rbs | 28 ++++++ .../update_targets_spec.rb | 15 ++++ 6 files changed, 130 insertions(+), 94 deletions(-) create mode 100644 elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb create mode 100644 elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb index 47c175c95..c6f64e7a4 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb @@ -7,81 +7,11 @@ # frozen_string_literal: true require "elastic_graph/errors" -require "elastic_graph/schema_artifacts/runtime_metadata/params" -require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment" -require "elastic_graph/support/memoizable_data" +require "elastic_graph/schema_definition/indexing/resolved_relationship_chain" module ElasticGraph module SchemaDefinition module Indexing - # The result of resolving a relationship chain. - # - # @private - class ResolvedRelationshipChain < Support::MemoizableData.define( - :relationships, # Array - every relationship in the chain, ordered root-to-leaf - :path_segments # Array - the embedding fields to descend, ordered root-to-leaf - ) - # The relationship the chain terminated at on the root indexed type. - def root_relationship - relationships.first - end - - # The relationship the chain was resolved from — backs the `sourced_from` field(s). - def leaf_relationship - relationships.last - end - - # The index the chain terminates at — where the root indexed type's documents (and their nested - # elements) live, and where the chain's navigation path is registered. The chain always terminates at - # an indexed type (enforced when it is resolved), so this is never `nil`. - def root_index - root_relationship.parent_type.index_def # : Index - end - - # Records this chain's navigation path on its root index, so the painless script can locate the - # nested element to update at index time. - def register_on_root_index - root_index.register_sourced_from_nested_paths(qualified_relationship, sourced_from_nested_paths) - end - - # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) - def qualified_relationship - @qualified_relationship ||= - (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") - end - - # The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for - # each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment` - # for each object embedding field. - def sourced_from_nested_paths - @sourced_from_nested_paths ||= path_segments.map do |segment| - if (source_field = segment.source_field_name) - SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( - field: segment.field.name_in_index, - source_field: source_field - ) - else - SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new( - field: segment.field.name_in_index - ) - end - end - end - - # The params identifying which nested element to update at each level: one entry per list segment, - # pulling the matching value from the segment's foreign key on the source event. Object segments have no - # ambiguity, so they contribute no identifier. - def path_identifier_params - @path_identifier_params ||= path_segments.filter_map do |segment| - source_field = segment.source_field_name - next unless source_field - - param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: source_field, cardinality: :one) - [source_field, param] - end.to_h - end - end - # Describes how to navigate from a parent type into a nested child element. # For list fields, `source_field_name` identifies which element to update: the element # whose `id` matches `event[source_field_name]`. We implicitly match on the `id` field diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb new file mode 100644 index 000000000..1f6dc6fc8 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb @@ -0,0 +1,85 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment" +require "elastic_graph/support/memoizable_data" + +module ElasticGraph + module SchemaDefinition + module Indexing + # The result of resolving a relationship chain. + # + # @private + class ResolvedRelationshipChain < Support::MemoizableData.define( + :relationships, # Array - every relationship in the chain, ordered root-to-leaf + :path_segments # Array - the embedding fields to descend, ordered root-to-leaf + ) + # The relationship the chain terminated at on the root indexed type. + def root_relationship + relationships.first + end + + # The relationship the chain was resolved from — backs the `sourced_from` field(s). + def leaf_relationship + relationships.last + end + + # The index the chain terminates at — where the root indexed type's documents (and their nested + # elements) live, and where the chain's navigation path is registered. The chain always terminates at + # an indexed type (enforced when it is resolved), so this is never `nil`. + def root_index + root_relationship.parent_type.index_def # : Index + end + + # Records this chain's navigation path on its root index, so the painless script can locate the + # nested element to update at index time. + def register_on_root_index + root_index.register_sourced_from_nested_paths(qualified_relationship, sourced_from_nested_paths) + end + + # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) + def qualified_relationship + @qualified_relationship ||= + (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") + end + + # The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for + # each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment` + # for each object embedding field. + def sourced_from_nested_paths + @sourced_from_nested_paths ||= path_segments.map do |segment| + if (source_field = segment.source_field_name) + SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( + field: segment.field.name_in_index, + source_field: source_field + ) + else + SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new( + field: segment.field.name_in_index + ) + end + end + end + + # The params identifying which nested element to update at each level: one entry per list segment, + # pulling the matching value from the segment's foreign key on the source event. Object segments have no + # ambiguity, so they contribute no identifier. + def path_identifier_params + @path_identifier_params ||= path_segments.filter_map do |segment| + source_field = segment.source_field_name + next unless source_field + + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: source_field, cardinality: :one) + [source_field, param] + end.to_h + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb index eed82f0b2..e695df3ac 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb @@ -10,6 +10,7 @@ require "elastic_graph/schema_definition/indexing/nested_update_target_resolver" require "elastic_graph/schema_definition/indexing/relationship_chain_resolver" require "elastic_graph/schema_definition/indexing/relationship_resolver" +require "elastic_graph/schema_definition/indexing/resolved_relationship_chain" require "elastic_graph/schema_definition/indexing/top_level_update_target_resolver" module ElasticGraph diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs index c88a6f208..9538dff3b 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs @@ -1,29 +1,6 @@ module ElasticGraph module SchemaDefinition module Indexing - class ResolvedRelationshipChainSuperType - attr_reader relationships: ::Array[SchemaElements::Relationship] - attr_reader path_segments: ::Array[PathSegment] - - def initialize: ( - relationships: ::Array[SchemaElements::Relationship], - path_segments: ::Array[PathSegment] - ) -> void - end - - class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType - def root_relationship: () -> SchemaElements::Relationship - def leaf_relationship: () -> SchemaElements::Relationship - def root_index: () -> Index - def register_on_root_index: () -> void - @qualified_relationship: ::String - def qualified_relationship: () -> ::String - @sourced_from_nested_paths: ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] - def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] - @path_identifier_params: ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] - def path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] - end - class PathSegment attr_reader field: SchemaElements::Field attr_reader source_field_name: ::String? diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs new file mode 100644 index 000000000..b80953630 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs @@ -0,0 +1,28 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class ResolvedRelationshipChainSuperType + attr_reader relationships: ::Array[SchemaElements::Relationship] + attr_reader path_segments: ::Array[PathSegment] + + def initialize: ( + relationships: ::Array[SchemaElements::Relationship], + path_segments: ::Array[PathSegment] + ) -> void + end + + class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType + def root_relationship: () -> SchemaElements::Relationship + def leaf_relationship: () -> SchemaElements::Relationship + def root_index: () -> Index + def register_on_root_index: () -> void + @qualified_relationship: ::String + def qualified_relationship: () -> ::String + @sourced_from_nested_paths: ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] + def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] + @path_identifier_params: ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + def path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + end + end + end +end diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb index 4192cee39..07557cd8a 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb @@ -1701,6 +1701,21 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) ) end + it "raises an error when a non-leaf (root) relationship in the chain uses an `additional_filter`" do + # Routability (inbound FK, no `additional_filter`) must hold for *every* relationship in the chain, + # not just the leaf that backs the `sourced_from` field: each link routes the source event up to the + # root document, so a filtered/outbound link anywhere would silently mismatch. Here we break the + # *root* relationship (`Team.statLines`) — the leaf (`Player.statLine`) remains valid — to prove the + # chain-wide validation covers non-leaf relationships and isn't narrowed to the leaf. + expect { + nested_sourced_from_schema( + on_statlines_relationship: ->(r) { r.additional_filter status: "active" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`Team.statLines` (referenced from `sourced_from` on field(s): `goals`) uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`." + ) + end + it "raises an error when the chain terminates at a non-indexed type" do expect { nested_sourced_from_schema(index_teams: false)