diff --git a/AGENTS.md b/AGENTS.md index 141992018..ca449b19d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -209,6 +209,7 @@ Custom gems can be added via `Gemfile-custom` (see `Gemfile-custom.example`), th - Use `expect_to_return_non_nil_values_from_all_attributes` to test wrapper classes (like `WarehouseLambda`, `GraphQL`, `Indexer`, etc.). This automatically exercises every zero-argument method and verifies all dependencies are built successfully. - Use the `:capture_logs` RSpec tag instead of logger test doubles for verifying log output. Access logs with `logged_jsons_of_type(message_type)`. - Use `build_*` helper methods from `spec/support/builds_*.rb` to construct test objects. These helpers provide sensible defaults while allowing selective overrides for testing specific scenarios. +- Only tag a spec with `:dont_validate_graphql_schema` when it's actually required for the test to pass under `VALIDATE_GRAPHQL_SCHEMAS=1` (the tag skips that validation). First try to fix the test's schema so it produces a valid GraphQL schema while still exercising what the test is meant to exercise; reach for the tag only when that isn't possible. Verify by running the spec with `VALIDATE_GRAPHQL_SCHEMAS=1`. ## Important Patterns diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb index 3618ba386..061f6e5a1 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb @@ -283,10 +283,11 @@ def runtime_metadata ) end - # Registers a resolved `parent_relationship` chain on this index. + # Records the path segments that navigate from this index's documents down to a nested element that + # receives `sourced_from` data, keyed by the qualified relationship backing those fields. # @api private - def register_resolved_relationship_chain(resolved_chain) - sourced_from_nested_paths_by_qualified_relationship[resolved_chain.qualified_relationship] = resolved_chain.sourced_from_nested_paths + def register_sourced_from_nested_paths(qualified_relationship, nested_paths) + sourced_from_nested_paths_by_qualified_relationship[qualified_relationship] = nested_paths end private diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb new file mode 100644 index 000000000..47e20828b --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb @@ -0,0 +1,134 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params" +require "elastic_graph/schema_definition/indexing/update_target_factory" +require "elastic_graph/schema_definition/indexing/update_target_resolver_support" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Resolves a relationship and a set of `sourced_from` fields into an `UpdateTarget` that instructs the + # indexer how to update a type from the related type's source events. This handles the *nested* case, + # where the `sourced_from` fields live on a type embedded within an indexed type (reached via a + # `parent_relationship` chain) and the target updates the root indexed type the embedded type nests + # within. (The *top-level* case—`sourced_from` fields directly on an indexed type—is handled by + # `TopLevelUpdateTargetResolver`.) + # + # @private + class NestedUpdateTargetResolver + def initialize( + object_type:, + sourced_fields:, + resolved_chain:, + field_path_resolver:, + schema_def_state: + ) + @object_type = object_type + @sourced_fields = sourced_fields + @resolved_chain = resolved_chain + @field_path_resolver = field_path_resolver + @schema_def_state = schema_def_state + end + + # Resolves the chain and `sourced_fields` into an `UpdateTarget` on the root indexed type, + # validating everything along the way. + # + # Returns a tuple of the `update_target` (if valid) and a list of errors. + def resolve + relationship_errors = validate_relationships + field_params, field_params_errors = UpdateTargetResolverSupport.resolve_sourced_field_params( + object_type: object_type, + related_type: related_type, + sourced_fields: sourced_fields, + field_path_resolver: field_path_resolver + ) + routing_value_source, routing_error = resolve_field_source(UpdateTargetResolverSupport::RoutingSourceAdapter) + rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(UpdateTargetResolverSupport::RolloverTimestampSourceAdapter) + # Routing/rollover values resolve from `equivalent_field`s on the root relationship, so they are + # validated there (matching how `TopLevelUpdateTargetResolver` validates its own relationship). + equivalent_field_errors = root_relationship.validate_equivalent_fields(field_path_resolver) + has_had_multiple_sources_errors = UpdateTargetResolverSupport.validate_has_had_multiple_sources(root_index, root_type, relationship) + + all_errors = relationship_errors + field_params_errors + equivalent_field_errors + has_had_multiple_sources_errors + + [routing_error, rollover_timestamp_error].compact + + if all_errors.empty? + update_target = UpdateTargetFactory.new_normal_indexing_update_target( + type: root_type.name, + relationship: resolved_chain.qualified_relationship, + id_source: root_relationship.foreign_key, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams.new( + field_params: field_params, + path_identifier_params: resolved_chain.path_identifier_params + ), + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source + ) + end + + [update_target, all_errors] + end + + private + + # @dynamic object_type, sourced_fields, resolved_chain, field_path_resolver, schema_def_state + attr_reader :object_type, :sourced_fields, :resolved_chain, :field_path_resolver, :schema_def_state + + # The leaf relationship the chain was resolved from — the one backing this type's `sourced_from` fields. + def relationship + resolved_chain.leaf_relationship + end + + def root_relationship + resolved_chain.root_relationship + end + + def root_type + root_relationship.parent_type + end + + def root_index + resolved_chain.root_index + end + + def related_type + @related_type ||= schema_def_state.object_types_by_name.fetch(relationship.related_type.unwrap_non_null.name) + end + + # Applies validations on the relationships backing nested `sourced_from` fields. Only the leaf must be + # `relates_to_one` (it's where a value is sourced through), but every relationship in the chain joins on + # a foreign key that routes the source event, so each must be routable (inbound foreign key, no filter). + def validate_relationships + leaf_prefix = UpdateTargetResolverSupport.relationship_error_prefix(relationship, sourced_fields) + + UpdateTargetResolverSupport.validate_relationship_cardinality(relationship, error_prefix: leaf_prefix) + + resolved_chain.relationships.flat_map do |chain_relationship| + error_prefix = UpdateTargetResolverSupport.relationship_error_prefix(chain_relationship, sourced_fields) + UpdateTargetResolverSupport.validate_relationship_routability(chain_relationship, error_prefix: error_prefix) + end + end + + # Resolves a routing/rollover field source via the shared helper, supplying the root type, index, and + # relationship — the update target updates the root indexed type via the root relationship, so routing + # and rollover (and the `equivalent_field` config) are resolved there. + def resolve_field_source(adapter) + UpdateTargetResolverSupport.resolve_field_source( + adapter, + relationship: root_relationship, + index_def: root_index, + related_type: related_type, + field_path_resolver: field_path_resolver, + updated_type: root_type + ) + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb index 1c7c6f772..c6f64e7a4 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rb @@ -7,43 +7,11 @@ # frozen_string_literal: true require "elastic_graph/errors" -require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment" +require "elastic_graph/schema_definition/indexing/resolved_relationship_chain" module ElasticGraph module SchemaDefinition module Indexing - # The result of resolving a relationship chain. - # - # @private - class ResolvedRelationshipChain < ::Data.define( - :root_relationship, # Relationship the chain terminated at on the root indexed type - :leaf_relationship, # Relationship the chain was resolved from — backs `sourced_from` field(s) - :path_segments # Array - the embedding fields to descend, ordered root-to-leaf - ) - # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) - def qualified_relationship - (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") - end - - # The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for - # each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment` - # for each object embedding field. - def sourced_from_nested_paths - path_segments.map do |segment| - if (source_field = segment.source_field_name) - SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( - field: segment.field.name_in_index, - source_field: source_field - ) - else - SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new( - field: segment.field.name_in_index - ) - end - end - end - end - # Describes how to navigate from a parent type into a nested child element. # For list fields, `source_field_name` identifies which element to update: the element # whose `id` matches `event[source_field_name]`. We implicitly match on the `id` field @@ -82,11 +50,11 @@ def initialize(schema_def_state:) def resolve(starting_relationship) errors = [] # : ::Array[::String] path_segments = [] # : ::Array[PathSegment] - visited_relationships = Set[starting_relationship] + relationships = [] # : ::Array[SchemaElements::Relationship] # resolve_chain returns the chain's root relationship (the one with no parent_ref), or nil # if it hit an error walking the chain (in which case the error is already recorded). - root_relationship = resolve_chain(starting_relationship, path_segments, errors, visited_relationships) + root_relationship = resolve_chain(starting_relationship, path_segments, relationships, errors) return [nil, errors] unless root_relationship # A valid chain must terminate at a relationship defined on an indexed type. @@ -99,8 +67,7 @@ def resolve(starting_relationship) end resolved_chain = ResolvedRelationshipChain.new( - root_relationship: root_relationship, - leaf_relationship: starting_relationship, + relationships: relationships.reverse, # reverse so root-to-leaf order path_segments: path_segments.reverse # reverse so root-to-leaf order ) @@ -109,25 +76,27 @@ def resolve(starting_relationship) private - # Recursively walks from leaf to root, building path segments in reverse. Returns the root - # relationship (the one with no parent_ref) on success, or nil if an error was encountered. - def resolve_chain(current_rel, path_segments, errors, visited_relationships) + # Recursively walks from leaf to root, collecting relationships and building path segments in reverse. + # Returns the root relationship (the one with no parent_ref) on success, or nil if an error was + # encountered. + def resolve_chain(current_rel, path_segments, relationships, errors) + relationships << current_rel + parent_ref = current_rel.parent_ref return current_rel unless parent_ref - parent_rel = resolve_parent_ref(current_rel, parent_ref, errors, visited_relationships) + parent_rel = resolve_parent_ref(current_rel, parent_ref, relationships, errors) return nil unless parent_rel build_path_segment(current_rel, parent_rel.parent_type, path_segments, errors) return nil if errors.any? - visited_relationships.add(parent_rel) - resolve_chain(parent_rel, path_segments, errors, visited_relationships) + resolve_chain(parent_rel, path_segments, relationships, errors) end # Resolves a parent_ref into the concrete parent relationship. # Returns the parent relationship on success, or appends to errors and returns nil. - def resolve_parent_ref(current_rel, ref, errors, visited_relationships) + def resolve_parent_ref(current_rel, ref, relationships, errors) unless current_rel.indexing_only errors << "#{rel_description(current_rel)} uses `parent_relationship` but is not declared with " \ "`indexing_only: true`. Relationships with `parent_relationship` must be indexing-only." @@ -149,7 +118,7 @@ def resolve_parent_ref(current_rel, ref, errors, visited_relationships) return nil end - if visited_relationships.include?(parent_rel) + if relationships.include?(parent_rel) errors << "#{rel_description(current_rel)} creates a circular `parent_relationship` chain " \ "— `#{parent_type.name}.#{ref.relationship_name}` was already visited. The chain must terminate at a root indexed type." return nil diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb new file mode 100644 index 000000000..1f6dc6fc8 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rb @@ -0,0 +1,85 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment" +require "elastic_graph/support/memoizable_data" + +module ElasticGraph + module SchemaDefinition + module Indexing + # The result of resolving a relationship chain. + # + # @private + class ResolvedRelationshipChain < Support::MemoizableData.define( + :relationships, # Array - every relationship in the chain, ordered root-to-leaf + :path_segments # Array - the embedding fields to descend, ordered root-to-leaf + ) + # The relationship the chain terminated at on the root indexed type. + def root_relationship + relationships.first + end + + # The relationship the chain was resolved from — backs the `sourced_from` field(s). + def leaf_relationship + relationships.last + end + + # The index the chain terminates at — where the root indexed type's documents (and their nested + # elements) live, and where the chain's navigation path is registered. The chain always terminates at + # an indexed type (enforced when it is resolved), so this is never `nil`. + def root_index + root_relationship.parent_type.index_def # : Index + end + + # Records this chain's navigation path on its root index, so the painless script can locate the + # nested element to update at index time. + def register_on_root_index + root_index.register_sourced_from_nested_paths(qualified_relationship, sourced_from_nested_paths) + end + + # The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain) + def qualified_relationship + @qualified_relationship ||= + (path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".") + end + + # The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for + # each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment` + # for each object embedding field. + def sourced_from_nested_paths + @sourced_from_nested_paths ||= path_segments.map do |segment| + if (source_field = segment.source_field_name) + SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( + field: segment.field.name_in_index, + source_field: source_field + ) + else + SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new( + field: segment.field.name_in_index + ) + end + end + end + + # The params identifying which nested element to update at each level: one entry per list segment, + # pulling the matching value from the segment's foreign key on the source event. Object segments have no + # ambiguity, so they contribute no identifier. + def path_identifier_params + @path_identifier_params ||= path_segments.filter_map do |segment| + source_field = segment.source_field_name + next unless source_field + + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: source_field, cardinality: :one) + [source_field, param] + end.to_h + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb index 3e1769e46..e695df3ac 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rb @@ -7,9 +7,11 @@ # frozen_string_literal: true require "elastic_graph/errors" +require "elastic_graph/schema_definition/indexing/nested_update_target_resolver" require "elastic_graph/schema_definition/indexing/relationship_chain_resolver" require "elastic_graph/schema_definition/indexing/relationship_resolver" -require "elastic_graph/schema_definition/indexing/update_target_resolver" +require "elastic_graph/schema_definition/indexing/resolved_relationship_chain" +require "elastic_graph/schema_definition/indexing/top_level_update_target_resolver" module ElasticGraph module SchemaDefinition @@ -46,12 +48,23 @@ def resolve private def resolve_for_type(object_type, &error_reporter) - fields_with_sources_by_relationship_name = sourced_fields_by_relationship_name(object_type) + resolve_top_level_update_targets(object_type, &error_reporter) + + resolve_nested_update_targets(object_type, &error_reporter) + end + + # Resolves the update targets for this type's own `sourced_from` fields (the non-nested case): one per + # relationship that backs `sourced_from` fields, each keyed by the related type that publishes the events. + def resolve_top_level_update_targets(object_type, &error_reporter) + # Skip unindexed types: they produce no top-level targets, and resolving `fields_with_sources` on an + # unindexed apollo `_Entity` union can raise when entity types share a field name across mapping types. + empty_fields_by_relationship = {} # : ::Hash[::String, ::Array[SchemaElements::Field]] + sourced_fields_by_relationship_name = + object_type.own_index_def ? group_sourced_fields_by_relationship_name(object_type) : empty_fields_by_relationship defined_relationships = object_type.relationships_by_name.keys - results = (defined_relationships | fields_with_sources_by_relationship_name.keys).filter_map do |relationship_name| + (defined_relationships | sourced_fields_by_relationship_name.keys).filter_map do |relationship_name| empty_fields = [] # : ::Array[SchemaElements::Field] - sourced_fields = fields_with_sources_by_relationship_name.fetch(relationship_name) { empty_fields } + sourced_fields = sourced_fields_by_relationship_name.fetch(relationship_name) { empty_fields } relationship_resolver = RelationshipResolver.new( schema_def_state: @schema_def_state, object_type: object_type, @@ -63,81 +76,81 @@ def resolve_for_type(object_type, &error_reporter) yield :relationship, relationship_error if relationship_error if object_type.own_index_def && resolved_relationship && sourced_fields.any? - resolve_update_target(object_type, resolved_relationship, sourced_fields, &error_reporter) + resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields, &error_reporter) end end + end - # Resolve any `parent_relationship` chains on this type, surfacing configuration errors and - # registering the resolved path segments on the root index. - resolve_relationship_chains(object_type, &error_reporter) - - results + # Resolves the update targets for this type's nested `sourced_from` fields: each `parent_relationship` + # chain that backs `sourced_from` fields registers its navigation path on the root index and produces a + # nested update target on the root indexed type. + def resolve_nested_update_targets(object_type, &error_reporter) + resolve_sourced_fields_and_chains(object_type, &error_reporter).filter_map do |sourced_fields, resolved_chain| + resolved_chain.register_on_root_index + resolve_nested_update_target(object_type, sourced_fields, resolved_chain, &error_reporter) + end end - def resolve_update_target(object_type, resolved_relationship, sourced_fields) - update_target_resolver = UpdateTargetResolver.new( + def resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields) + top_level_update_target_resolver = TopLevelUpdateTargetResolver.new( object_type: object_type, resolved_relationship: resolved_relationship, sourced_fields: sourced_fields, field_path_resolver: @schema_def_state.field_path_resolver ) - update_target, errors = update_target_resolver.resolve + update_target, errors = top_level_update_target_resolver.resolve errors.each { |error| yield :sourced_field, error } - # Validate that has_had_multiple_sources! has been called when sourced_from is used - index_def = object_type.own_index_def # : Index - unless index_def.has_had_multiple_sources_flag - yield :sourced_field, "Type `#{object_type.name}` uses `sourced_from` fields but its index `#{index_def.name}` " \ - "has not been configured with `has_had_multiple_sources!`. To resolve this, add `i.has_had_multiple_sources!` within the " \ - "`t.index \"#{index_def.name}\"` block. This flag is required because indices with multiple sources can contain " \ - "incomplete documents, and ElasticGraph needs to know this to apply proper filtering. Once set, this flag should remain even " \ - "if you later remove all `sourced_from` fields, as the index may still contain historical incomplete documents." - end - [resolved_relationship.related_type.name, update_target] if update_target end - def resolve_relationship_chains(object_type) - relationships_with_parent_ref = object_type.relationships_by_name.each_value.select(&:parent_ref) - return if relationships_with_parent_ref.empty? + def group_sourced_fields_by_relationship_name(object_type) + object_type.fields_with_sources.group_by { |f| (_ = f.source).relationship_name } + end - # The set of relationship names this type's `sourced_from` fields draw their data through. We use - # `fields_with_sources` (rather than `sourced_fields_by_relationship_name`) because it works for - # non-indexed (embedded) types — which is exactly where nested `sourced_from` fields live. - relationship_names_with_sourced_fields = object_type.fields_with_sources.map { |f| (_ = f.source).relationship_name }.to_set + # Resolves every `parent_relationship` chain on this type, surfacing configuration errors. Returns + # `[sourced_fields, resolved_chain]` for each chain that resolved cleanly *and* backs `sourced_from` + # fields — the pairings that produce nested update targets. + def resolve_sourced_fields_and_chains(object_type) + relationships_with_parent_ref = object_type.relationships_by_name.each_value.select(&:parent_ref) + empty_results = [] # : ::Array[[::Array[SchemaElements::Field], ResolvedRelationshipChain]] + return empty_results if relationships_with_parent_ref.empty? - chain_resolver = RelationshipChainResolver.new(schema_def_state: @schema_def_state) + sourced_fields_by_relationship_name = group_sourced_fields_by_relationship_name(object_type) - relationships_with_parent_ref.each do |relationship| - # Every `parent_relationship` chain is resolved so its configuration errors are surfaced, even for - # relationships that don't themselves back any `sourced_from` field. - resolved_chain, chain_errors = chain_resolver.resolve(relationship) + relationships_with_parent_ref.filter_map do |relationship| + # Resolve every chain (even those backing no `sourced_from` field) so configuration errors surface. + resolved_chain, chain_errors = relationship_chain_resolver.resolve(relationship) chain_errors.each { |error| yield :sourced_field, error } next unless resolved_chain - # Only register paths for relationships that back `sourced_from` fields. A pure link in a longer - # chain has no nested data of its own; its navigation lives in the leaf relationship's chain. - next unless relationship_names_with_sourced_fields.include?(relationship.name) - - # Register the resolved chain on its root index, which records the path segments keyed by the - # chain's qualified relationship. - root_index = resolved_chain.root_relationship.parent_type.index_def # : Index - root_index.register_resolved_relationship_chain(resolved_chain) + # Only chains backing `sourced_from` fields produce a target; pure links carry no nested data. + sourced_fields = sourced_fields_by_relationship_name[relationship.name] + [sourced_fields, resolved_chain] if sourced_fields end end - def sourced_fields_by_relationship_name(object_type) - if object_type.own_index_def.nil? - # For now, only indexed types can have `sourced_from` fields, and resolving `fields_with_sources` on an unindexed union type - # such as `_Entity` when we are using apollo can lead to exceptions when multiple entity types have the same field name - # that use different mapping types. - {} # : ::Hash[::String, ::Array[SchemaElements::Field]] - else - object_type - .fields_with_sources - .group_by { |f| (_ = f.source).relationship_name } - end + # A single resolver shared across all object types so its per-parent-type field cache survives the + # whole resolve pass — parent types recur across chains from different leaf types. + def relationship_chain_resolver + @relationship_chain_resolver ||= RelationshipChainResolver.new(schema_def_state: @schema_def_state) + end + + def resolve_nested_update_target(object_type, sourced_fields, resolved_chain) + nested_update_target_resolver = NestedUpdateTargetResolver.new( + object_type: object_type, + sourced_fields: sourced_fields, + resolved_chain: resolved_chain, + field_path_resolver: @schema_def_state.field_path_resolver, + schema_def_state: @schema_def_state + ) + + update_target, errors = nested_update_target_resolver.resolve + errors.each { |error| yield :sourced_field, error } + + # The update target lives on the source type — its events drive updates to the nested element. + [resolved_chain.leaf_relationship.related_type.name, update_target] if update_target end def raise_if_errors(sourced_field_errors, relationship_errors) diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb new file mode 100644 index 000000000..f7a7d7dea --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rb @@ -0,0 +1,110 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_definition/indexing/update_target_factory" +require "elastic_graph/schema_definition/indexing/update_target_resolver_support" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Resolves a relationship and a set of `sourced_from` fields into an `UpdateTarget` that instructs the + # indexer how to update a type from the related type's source events. This handles the *top-level* case, + # where the `sourced_from` fields live directly on an indexed type and the target updates that same + # indexed type. (The *nested* case—`sourced_from` fields on a type embedded within an indexed type—is + # handled by `NestedUpdateTargetResolver`.) + # + # @private + class TopLevelUpdateTargetResolver + def initialize( + object_type:, + resolved_relationship:, + sourced_fields:, + field_path_resolver: + ) + @object_type = object_type + @resolved_relationship = resolved_relationship + @sourced_fields = sourced_fields + @field_path_resolver = field_path_resolver + end + + # Resolves the `object_type`, `resolved_relationship`, and `sourced_fields` into an `UpdateTarget`, validating + # that everything is defined correctly. + # + # Returns a tuple of the `update_target` (if valid), and a list of errors. + def resolve + relationship_errors = validate_relationship + top_level_fields_params, top_level_fields_params_errors = UpdateTargetResolverSupport.resolve_sourced_field_params( + object_type: object_type, + related_type: related_type, + sourced_fields: sourced_fields, + field_path_resolver: field_path_resolver + ) + routing_value_source, routing_error = resolve_field_source(UpdateTargetResolverSupport::RoutingSourceAdapter) + rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(UpdateTargetResolverSupport::RolloverTimestampSourceAdapter) + equivalent_field_errors = resolved_relationship.relationship.validate_equivalent_fields(field_path_resolver) + index_def = object_type.own_index_def # : Index + has_had_multiple_sources_errors = UpdateTargetResolverSupport.validate_has_had_multiple_sources( + index_def, object_type, resolved_relationship.relationship + ) + + all_errors = relationship_errors + top_level_fields_params_errors + equivalent_field_errors + + has_had_multiple_sources_errors + [routing_error, rollover_timestamp_error].compact + + if all_errors.empty? + update_target = UpdateTargetFactory.new_normal_indexing_update_target( + type: object_type.name, + relationship: resolved_relationship.relationship_name, + id_source: resolved_relationship.relation_metadata.foreign_key, + top_level_fields_params: top_level_fields_params, + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source + ) + end + + [update_target, all_errors] + end + + private + + # @dynamic object_type, resolved_relationship, sourced_fields, field_path_resolver + attr_reader :object_type, :resolved_relationship, :sourced_fields, :field_path_resolver + + # Applies additional validations (beyond what `RelationshipResolver` applies) on relationships that are + # used by `sourced_from` fields. + def validate_relationship + relationship = resolved_relationship.relationship + error_prefix = UpdateTargetResolverSupport.relationship_error_prefix(relationship, sourced_fields) + + UpdateTargetResolverSupport.validate_relationship_cardinality(relationship, error_prefix: error_prefix) + + UpdateTargetResolverSupport.validate_relationship_routability(relationship, error_prefix: error_prefix) + end + + # The related type whose source events feed this update target — where `sourced_from` fields are resolved. + def related_type + resolved_relationship.related_type + end + + # Resolves a routing/rollover field source via the shared helper, supplying the top-level type, index, + # and relationship. + def resolve_field_source(adapter) + index_def = object_type.own_index_def # : Index + + UpdateTargetResolverSupport.resolve_field_source( + adapter, + relationship: resolved_relationship.relationship, + index_def: index_def, + related_type: related_type, + field_path_resolver: field_path_resolver, + updated_type: object_type + ) + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb index ea4e267d2..2f279fb68 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb @@ -6,6 +6,8 @@ # # frozen_string_literal: true +require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params" + module ElasticGraph module SchemaDefinition module Indexing @@ -16,10 +18,10 @@ def self.new_normal_indexing_update_target( type:, relationship:, id_source:, - top_level_fields_params:, routing_value_source:, rollover_timestamp_value_source:, - sourced_from_nested_params: + top_level_fields_params: {}, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY ) SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: type, diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb deleted file mode 100644 index 3f456d330..000000000 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver.rb +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright 2024 - 2026 Block, Inc. -# -# Use of this source code is governed by an MIT-style -# license that can be found in the LICENSE file or at -# https://opensource.org/licenses/MIT. -# -# frozen_string_literal: true - -require "elastic_graph/schema_artifacts/runtime_metadata/params" -require "elastic_graph/schema_definition/indexing/update_target_factory" - -module ElasticGraph - module SchemaDefinition - module Indexing - # Responsible for resolving a relationship and a set of `sourced_from` fields into an `UpdateTarget` - # that contains the instructions for how the primary type should be updated from the related type's - # source events. - # - # @private - class UpdateTargetResolver - def initialize( - object_type:, - resolved_relationship:, - sourced_fields:, - field_path_resolver: - ) - @object_type = object_type - @resolved_relationship = resolved_relationship - @sourced_fields = sourced_fields - @field_path_resolver = field_path_resolver - end - - # Resolves the `object_type`, `resolved_relationship`, and `sourced_fields` into an `UpdateTarget`, validating - # that everything is defined correctly. - # - # Returns a tuple of the `update_target` (if valid), and a list of errors. - def resolve - relationship_errors = validate_relationship - top_level_fields_params, top_level_fields_params_errors = resolve_top_level_fields_params - routing_value_source, routing_error = resolve_field_source(RoutingSourceAdapter) - rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(RolloverTimestampSourceAdapter) - equivalent_field_errors = resolved_relationship.relationship.validate_equivalent_fields(field_path_resolver) - - all_errors = relationship_errors + top_level_fields_params_errors + equivalent_field_errors + [routing_error, rollover_timestamp_error].compact - - if all_errors.empty? - update_target = UpdateTargetFactory.new_normal_indexing_update_target( - type: object_type.name, - relationship: resolved_relationship.relationship_name, - id_source: resolved_relationship.relation_metadata.foreign_key, - top_level_fields_params: top_level_fields_params, - sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY, - routing_value_source: routing_value_source, - rollover_timestamp_value_source: rollover_timestamp_value_source - ) - end - - [update_target, all_errors] - end - - private - - # @dynamic object_type, resolved_relationship, sourced_fields, field_path_resolver - attr_reader :object_type, :resolved_relationship, :sourced_fields, :field_path_resolver - - # Applies additional validations (beyond what `RelationshipResolver` applies) on relationships that are - # used by `sourced_from` fields. - def validate_relationship - errors = [] # : ::Array[::String] - - if resolved_relationship.relationship.many? - errors << "#{relationship_error_prefix} is a `relates_to_many` relationship, but `sourced_from` is only supported on a `relates_to_one` relationship." - end - - relation_metadata = resolved_relationship.relation_metadata - if relation_metadata.direction == :out - errors << "#{relationship_error_prefix} has an outbound foreign key (`dir: :out`), but `sourced_from` is only supported via inbound foreign key (`dir: :in`) relationships." - end - - unless relation_metadata.additional_filter.empty? - errors << "#{relationship_error_prefix} is a `relationship` using an `additional_filter` but `sourced_from` is not supported on relationships with `additional_filter`." - end - - errors - end - - # Helper method for building the prefix of relationship-related error messages. - def relationship_error_prefix - sourced_fields_description = "(referenced from `sourced_from` on field(s): #{sourced_fields.map { |f| "`#{f.name}`" }.join(", ")})" - "`#{object_type.name}.#{resolved_relationship.relationship_name}` #{sourced_fields_description}" - end - - # Resolves the `sourced_fields` into a top-level fields params map, validating them along the way. - # - # Returns a tuple of the top-level fields params and a list of any errors that occurred during resolution. - def resolve_top_level_fields_params - related_type = resolved_relationship.related_type - errors = [] # : ::Array[::String] - - top_level_fields_params = sourced_fields.filter_map do |field| - field_source = field.source # : SchemaElements::FieldSource - - referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| - !parent_field.type.list? - end - - if referenced_field_path.nil? - explanation = - if field_source.field_path.include?(".") - "could not be resolved: some parts do not exist on their respective types as non-list fields" - else - "does not exist as an indexing field" - end - - errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." - nil - elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null - errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of it's source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." - nil - elsif field.type.non_null? - errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." - nil - else - param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( - source_path: referenced_field_path.path_in_index, - cardinality: :one - ) - - [field.name_in_index, param] - end - end.to_h - - [top_level_fields_params, errors] - end - - # Helper method that assists with resolving `routing_value_source` and `rollover_timestamp_value_source`. - # Uses an `adapter` for the differences in these two cases. - # - # Returns a tuple of the resolved source (if successful) and an error (if invalid). - def resolve_field_source(adapter) - index_def = object_type.own_index_def # : Index - - field_source_graphql_path_string = adapter.get_field_source(resolved_relationship.relationship, index_def) do |local_need| - relationship_name = resolved_relationship.relationship_name - - error = "Cannot update `#{object_type.name}` documents with data from related `#{relationship_name}` events, " \ - "because #{adapter.cannot_update_reason(object_type, relationship_name)}. To fix it, add a call like this to the " \ - "`#{object_type.name}.#{relationship_name}` relationship definition: `rel.equivalent_field " \ - "\"[#{resolved_relationship.related_type.name} field]\", locally_named: \"#{local_need}\"`." - - return [nil, error] - end - - if field_source_graphql_path_string - field_path = field_path_resolver.resolve_public_path(resolved_relationship.related_type, field_source_graphql_path_string) do |parent_field| - !parent_field.type.list? - end - - [field_path&.path_in_index, nil] - else - [nil, nil] - end - end - - # Adapter for the `routing_value_source` case for use by `resolve_field_source`. - # - # @private - module RoutingSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.routing_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(object_type, relationship_name) - "`#{object_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ - "to route the `#{object_type.name}` update requests" - end - end - - # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. - # - # @private - module RolloverTimestampSourceAdapter - def self.get_field_source(relationship, index, &block) - relationship.rollover_timestamp_value_source_for_index(index, &block) - end - - def self.cannot_update_reason(object_type, relationship_name) - "`#{object_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ - "to select an index for the `#{object_type.name}` update requests" - end - end - end - end - end -end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb new file mode 100644 index 000000000..930b8574d --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_resolver_support.rb @@ -0,0 +1,178 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Shared logic used by both `TopLevelUpdateTargetResolver` (top-level `sourced_from`) and + # `NestedUpdateTargetResolver` (nested `sourced_from`) to build their `UpdateTarget`s. The two resolvers + # differ in *which* type, relationship, and index they operate on, but resolve `sourced_from` fields, + # routing, and rollover the same way—so that common logic lives here. + # + # @private + module UpdateTargetResolverSupport + # Resolves `sourced_fields` into a `[field_name_in_index => DynamicParam]` map, validating each field + # against its source on `related_type`. Returns a tuple of the params map and a list of any errors. + def self.resolve_sourced_field_params(object_type:, related_type:, sourced_fields:, field_path_resolver:) + errors = [] # : ::Array[::String] + + field_params = sourced_fields.filter_map do |field| + field_source = field.source # : SchemaElements::FieldSource + + referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| + !parent_field.type.list? + end + + if referenced_field_path.nil? + explanation = + if field_source.field_path.include?(".") + "could not be resolved: some parts do not exist on their respective types as non-list fields" + else + "does not exist as an indexing field" + end + + errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: `#{related_type.name}.#{field_source.field_path}` #{explanation}." + nil + elsif referenced_field_path.type.unwrap_non_null != field.type.unwrap_non_null + errors << "The type of `#{object_type.name}.#{field.name}` is `#{field.type}`, but the type of its source (`#{related_type.name}.#{field_source.field_path}`) is `#{referenced_field_path.type}`. These must agree to use `sourced_from`." + nil + elsif field.type.non_null? + errors << "The type of `#{object_type.name}.#{field.name}` (`#{field.type}`) is not nullable, but this is not allowed for `sourced_from` fields since the value will be `null` before the related type's event is ingested." + nil + else + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: referenced_field_path.path_in_index, + cardinality: :one + ) + + [field.name_in_index, param] + end + end.to_h + + [field_params, errors] + end + + # Resolves the `routing_value_source` or `rollover_timestamp_value_source` for an `UpdateTarget`, using + # an `adapter` for the differences between the two cases. The value is drawn from `relationship`'s + # `equivalent_field` configuration and resolved to an indexing path on `related_type`. `index_def` is the + # index needing the value, and `updated_type` is the indexed type being updated (used in error messages). + # + # Returns a tuple of the resolved source (if successful) and an error (if invalid). + def self.resolve_field_source(adapter, relationship:, index_def:, related_type:, field_path_resolver:, updated_type:) + field_source_graphql_path_string = adapter.get_field_source(relationship, index_def) do |local_need| + error = "Cannot update `#{updated_type.name}` documents with data from related `#{relationship.name}` events, " \ + "because #{adapter.cannot_update_reason(updated_type, relationship.name)}. To fix it, add a call like this to the " \ + "`#{updated_type.name}.#{relationship.name}` relationship definition: `rel.equivalent_field " \ + "\"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." + + return [nil, error] + end + + if field_source_graphql_path_string + field_path = field_path_resolver.resolve_public_path(related_type, field_source_graphql_path_string) do |parent_field| + !parent_field.type.list? + end + + [field_path&.path_in_index, nil] + else + [nil, nil] + end + end + + # Validates that `relationship` is `relates_to_one`, since a `sourced_from` field copies a value from a + # single source record and a `relates_to_many` relationship has no single value to copy. `error_prefix` + # identifies the offending relationship (and the `sourced_from` fields that depend on it) in the message. + # + # Returns a list of any errors found. + def self.validate_relationship_cardinality(relationship, error_prefix:) + return [] unless relationship.many? + + ["#{error_prefix} is a `relates_to_many` relationship, but `sourced_from` is only supported on a " \ + "`relates_to_one` relationship."] + end + + # Validates that `relationship` can route `sourced_from` source events to the documents they update: it + # must use an inbound foreign key (so the event carries the key) and no `additional_filter` (which the + # `sourced_from` update path ignores, so a filtered relationship would silently mismatch). `error_prefix` + # identifies the offending relationship (and the `sourced_from` fields that depend on it) in the messages. + # + # Returns a list of any errors found. + def self.validate_relationship_routability(relationship, error_prefix:) + errors = [] # : ::Array[::String] + relation_metadata = relationship.runtime_metadata # : SchemaArtifacts::RuntimeMetadata::Relation + + if relation_metadata.direction == :out + errors << "#{error_prefix} has an outbound foreign key (`dir: :out`), but `sourced_from` is only " \ + "supported via inbound foreign key (`dir: :in`) relationships." + end + + unless relation_metadata.additional_filter.empty? + errors << "#{error_prefix} uses an `additional_filter`, but `sourced_from` is not supported on " \ + "relationships with `additional_filter`." + end + + errors + end + + # Builds the prefix of a relationship-related `sourced_from` error: the `Type.relationship` description, + # followed by the `sourced_from` fields that depend on it (so the author knows what's affected). Only + # called when there are `sourced_fields` (a relationship with none produces no update target to validate). + def self.relationship_error_prefix(relationship, sourced_fields) + fields_description = sourced_fields.map { |f| "`#{f.name}`" }.join(", ") + "`#{relationship.parent_type.name}.#{relationship.name}` (referenced from `sourced_from` on field(s): #{fields_description})" + end + + # Validates that `index` (which `type` writes to, sourcing data via `relationship`) has been configured + # with `has_had_multiple_sources!`. `sourced_from` makes an index multi-sourced, and ElasticGraph needs + # that flag to filter incomplete documents correctly. + # + # Returns a list of any errors found. + def self.validate_has_had_multiple_sources(index, type, relationship) + return [] if index.has_had_multiple_sources_flag + + ["Type `#{type.name}` has `sourced_from` fields (via `#{relationship.parent_type.name}.#{relationship.name}`) but " \ + "its index `#{index.name}` has not been configured with `has_had_multiple_sources!`. To resolve this, add " \ + "`i.has_had_multiple_sources!` within the `t.index \"#{index.name}\"` block. This flag is required because " \ + "indices with multiple sources can contain incomplete documents, and ElasticGraph needs to know this to apply " \ + "proper filtering. Once set, this flag should remain even if you later remove all `sourced_from` fields, as the " \ + "index may still contain historical incomplete documents."] + end + + # Adapter for the `routing_value_source` case for use by `resolve_field_source`. + # + # @private + module RoutingSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.routing_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(updated_type, relationship_name) + "`#{updated_type.name}` uses custom shard routing but we don't know what `#{relationship_name}` field to use " \ + "to route the `#{updated_type.name}` update requests" + end + end + + # Adapter for the `rollover_timestamp_value_source` case for use by `resolve_field_source`. + # + # @private + module RolloverTimestampSourceAdapter + def self.get_field_source(relationship, index, &block) + relationship.rollover_timestamp_value_source_for_index(index, &block) + end + + def self.cannot_update_reason(updated_type, relationship_name) + "`#{updated_type.name}` uses a rollover index but we don't know what `#{relationship_name}` timestamp field to use " \ + "to select an index for the `#{updated_type.name}` update requests" + end + end + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs index 39ae5957c..1f3eef1da 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs @@ -14,7 +14,7 @@ module ElasticGraph def to_index_config: () -> ::Hash[::String, untyped] def to_index_template_config: () -> ::Hash[::String, untyped] def runtime_metadata: () -> SchemaArtifacts::RuntimeMetadata::IndexDefinition - def register_resolved_relationship_chain: (ResolvedRelationshipChain) -> void + def register_sourced_from_nested_paths: (::String, ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment]) -> void private diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs new file mode 100644 index 000000000..e3f24a780 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs @@ -0,0 +1,36 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class NestedUpdateTargetResolver + def initialize: ( + object_type: indexableType, + sourced_fields: ::Array[SchemaElements::Field], + resolved_chain: ResolvedRelationshipChain, + field_path_resolver: SchemaElements::FieldPath::Resolver, + schema_def_state: State, + ) -> void + + def resolve: () -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] + + private + + attr_reader object_type: indexableType + attr_reader sourced_fields: ::Array[SchemaElements::Field] + attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver + attr_reader resolved_chain: ResolvedRelationshipChain + attr_reader schema_def_state: State + + @related_type: indexableType + def related_type: () -> indexableType + + def relationship: () -> SchemaElements::Relationship + def root_relationship: () -> SchemaElements::Relationship + def root_type: () -> indexableType + def root_index: () -> Index + + def validate_relationships: () -> ::Array[::String] + def resolve_field_source: (UpdateTargetResolverSupport::_FieldSourceAdapter) -> [::String?, ::String?] + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs index 75b80ece0..9538dff3b 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/relationship_chain_resolver.rbs @@ -1,23 +1,6 @@ module ElasticGraph module SchemaDefinition module Indexing - class ResolvedRelationshipChainSuperType - attr_reader root_relationship: SchemaElements::Relationship - attr_reader leaf_relationship: SchemaElements::Relationship - attr_reader path_segments: ::Array[PathSegment] - - def initialize: ( - root_relationship: SchemaElements::Relationship, - leaf_relationship: SchemaElements::Relationship, - path_segments: ::Array[PathSegment] - ) -> void - end - - class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType - def qualified_relationship: () -> ::String - def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] - end - class PathSegment attr_reader field: SchemaElements::Field attr_reader source_field_name: ::String? @@ -40,15 +23,15 @@ module ElasticGraph def resolve_chain: ( SchemaElements::Relationship, ::Array[PathSegment], - ::Array[::String], - ::Set[SchemaElements::Relationship] + ::Array[SchemaElements::Relationship], + ::Array[::String] ) -> SchemaElements::Relationship? def resolve_parent_ref: ( SchemaElements::Relationship, SchemaElements::Relationship::ParentRef, - ::Array[::String], - ::Set[SchemaElements::Relationship] + ::Array[SchemaElements::Relationship], + ::Array[::String] ) -> SchemaElements::Relationship? def build_path_segment: ( diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs new file mode 100644 index 000000000..b80953630 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/resolved_relationship_chain.rbs @@ -0,0 +1,28 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class ResolvedRelationshipChainSuperType + attr_reader relationships: ::Array[SchemaElements::Relationship] + attr_reader path_segments: ::Array[PathSegment] + + def initialize: ( + relationships: ::Array[SchemaElements::Relationship], + path_segments: ::Array[PathSegment] + ) -> void + end + + class ResolvedRelationshipChain < ResolvedRelationshipChainSuperType + def root_relationship: () -> SchemaElements::Relationship + def leaf_relationship: () -> SchemaElements::Relationship + def root_index: () -> Index + def register_on_root_index: () -> void + @qualified_relationship: ::String + def qualified_relationship: () -> ::String + @sourced_from_nested_paths: ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] + def sourced_from_nested_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::sourcedFromNestedPathSegment] + @path_identifier_params: ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + def path_identifier_params: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam] + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs index c9c2ca72b..4b5bdca9e 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_from_update_targets_resolver.rbs @@ -9,10 +9,16 @@ module ElasticGraph private + @relationship_chain_resolver: RelationshipChainResolver + def resolve_for_type: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] - def resolve_update_target: (indexableType, ResolvedRelationship, ::Array[SchemaElements::Field]) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? - def resolve_relationship_chains: (indexableType) { (::Symbol, ::String) -> void } -> void - def sourced_fields_by_relationship_name: (indexableType) -> ::Hash[::String, ::Array[SchemaElements::Field]] + def resolve_top_level_update_targets: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + def resolve_nested_update_targets: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + def resolve_top_level_update_target: (indexableType, ResolvedRelationship, ::Array[SchemaElements::Field]) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? + def group_sourced_fields_by_relationship_name: (indexableType) -> ::Hash[::String, ::Array[SchemaElements::Field]] + def resolve_sourced_fields_and_chains: (indexableType) { (::Symbol, ::String) -> void } -> ::Array[[::Array[SchemaElements::Field], ResolvedRelationshipChain]] + def relationship_chain_resolver: () -> RelationshipChainResolver + def resolve_nested_update_target: (indexableType, ::Array[SchemaElements::Field], ResolvedRelationshipChain) { (::Symbol, ::String) -> void } -> [::String, SchemaArtifacts::RuntimeMetadata::UpdateTarget]? def raise_if_errors: (::Array[::String], ::Array[::String]) -> void end end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs similarity index 50% rename from elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs rename to elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs index 668331f7c..c6d190283 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/top_level_update_target_resolver.rbs @@ -1,7 +1,7 @@ module ElasticGraph module SchemaDefinition module Indexing - class UpdateTargetResolver + class TopLevelUpdateTargetResolver def initialize: ( object_type: indexableType, resolved_relationship: ResolvedRelationship, @@ -19,30 +19,8 @@ module ElasticGraph attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver def validate_relationship: () -> ::Array[::String] - def relationship_error_prefix: () -> ::String - - def resolve_top_level_fields_params: () -> [ - ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], - ::Array[::String] - ] - - def resolve_field_source: (_FieldSourceAdapter) -> [::String?, ::String?] - - interface _FieldSourceAdapter - def get_field_source: (SchemaElements::Relationship, Index) { - (::String) -> bot - } -> ::String? - - def cannot_update_reason: (indexableType, ::String) -> ::String - end - - module RoutingSourceAdapter - extend _FieldSourceAdapter - end - - module RolloverTimestampSourceAdapter - extend _FieldSourceAdapter - end + def related_type: () -> indexableType + def resolve_field_source: (UpdateTargetResolverSupport::_FieldSourceAdapter) -> [::String?, ::String?] end end end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs index b8769fe76..025f7d51a 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs @@ -6,10 +6,10 @@ module ElasticGraph type: ::String, relationship: ::String, id_source: ::String, - top_level_fields_params: SchemaArtifacts::RuntimeMetadata::paramsHash, routing_value_source: ::String?, rollover_timestamp_value_source: ::String?, - sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams + ?top_level_fields_params: SchemaArtifacts::RuntimeMetadata::paramsHash, + ?sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams ) -> SchemaArtifacts::RuntimeMetadata::UpdateTarget private diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs new file mode 100644 index 000000000..ecfa7eacd --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_resolver_support.rbs @@ -0,0 +1,47 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + module UpdateTargetResolverSupport + def self.resolve_sourced_field_params: ( + object_type: indexableType, + related_type: indexableType, + sourced_fields: ::Array[SchemaElements::Field], + field_path_resolver: SchemaElements::FieldPath::Resolver + ) -> [ + ::Hash[::String, SchemaArtifacts::RuntimeMetadata::DynamicParam], + ::Array[::String] + ] + + def self.resolve_field_source: ( + _FieldSourceAdapter, + relationship: SchemaElements::Relationship, + index_def: Index, + related_type: indexableType, + field_path_resolver: SchemaElements::FieldPath::Resolver, + updated_type: indexableType + ) -> [::String?, ::String?] + + def self.validate_relationship_cardinality: (SchemaElements::Relationship, error_prefix: ::String) -> ::Array[::String] + def self.validate_relationship_routability: (SchemaElements::Relationship, error_prefix: ::String) -> ::Array[::String] + def self.relationship_error_prefix: (SchemaElements::Relationship, ::Array[SchemaElements::Field]) -> ::String + def self.validate_has_had_multiple_sources: (Index, indexableType, SchemaElements::Relationship) -> ::Array[::String] + + interface _FieldSourceAdapter + def get_field_source: (SchemaElements::Relationship, Index) { + (::String) -> bot + } -> ::String? + + def cannot_update_reason: (indexableType, ::String) -> ::String + end + + module RoutingSourceAdapter + extend _FieldSourceAdapter + end + + module RolloverTimestampSourceAdapter + extend _FieldSourceAdapter + end + end + end + end +end diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb index fa39065f9..ee9c300c5 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/index_definitions_by_name_spec.rb @@ -982,7 +982,7 @@ def build_count_paths_from_mapping(mapping) f.mapping type: "object" end t.relates_to_many "statLines", "StatLine", via: "leagueId", dir: :in, indexing_only: true - t.index "leagues" + t.index("leagues") { |i| i.has_had_multiple_sources! } end s.object_type "Team" do |t| diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb index 7cf9c3a43..07557cd8a 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/runtime_metadata/object_types_by_name/update_targets_spec.rb @@ -451,7 +451,8 @@ def expect_widget_update_target_with( id_source:, top_level_fields_params:, relationship:, routing_value_source: nil, - rollover_timestamp_value_source: nil + rollover_timestamp_value_source: nil, + sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams::EMPTY ) expect(update_targets.count { |t| t.type == "Widget" }).to eq(1) widget_target = update_targets.find { |t| t.type == "Widget" } @@ -464,6 +465,7 @@ def expect_widget_update_target_with( expect(widget_target.routing_value_source).to eq(routing_value_source) expect(widget_target.rollover_timestamp_value_source).to eq(rollover_timestamp_value_source) expect(widget_target.top_level_fields_params).to eq(top_level_fields_params) + expect(widget_target.sourced_from_nested_params).to eq(sourced_from_nested_params) expect(widget_target.metadata_params).to eq(standard_metadata_params(relationship: relationship)) end @@ -491,7 +493,7 @@ def expect_widget_update_target_with( end end }.to raise_error Errors::SchemaError, a_string_including( - "Type `Widget` uses `sourced_from` fields but its index `widgets` has not been configured with `has_had_multiple_sources!`", + "Type `Widget` has `sourced_from` fields (via `Widget.workspace`) but its index `widgets` has not been configured with `has_had_multiple_sources!`", "To resolve this, add `i.has_had_multiple_sources!` within the `t.index \"widgets\"` block", "This flag is required because indices with multiple sources can contain incomplete documents", "Once set, this flag should remain even if you later remove all `sourced_from` fields" @@ -537,7 +539,7 @@ def expect_widget_update_target_with( f.sourced_from "workspace", "created_at" end end - }.to raise_error_about_workspace_relationship("is a `relationship` using an `additional_filter` but `sourced_from` is not supported on relationships with `additional_filter`.") + }.to raise_error_about_workspace_relationship("uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`.") end it "raises an error if the referenced relationship is not defined" do @@ -1196,8 +1198,8 @@ def expect_widget_update_target_with( end end }.to raise_error a_string_including( - "1. The type of `Widget.workspace_name` is `DateTime`, but the type of it's source (`WidgetWorkspace.name`) is `String`. These must agree to use `sourced_from`.", - "2. The type of `Widget.workspace_created_at` is `String`, but the type of it's source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." + "1. The type of `Widget.workspace_name` is `DateTime`, but the type of its source (`WidgetWorkspace.name`) is `String`. These must agree to use `sourced_from`.", + "2. The type of `Widget.workspace_created_at` is `String`, but the type of its source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." ) end @@ -1255,8 +1257,8 @@ def expect_widget_update_target_with( end end }.to raise_error a_string_including( - "1. The type of `Widget.workspace_name` is `String`, but the type of it's source (`WidgetWorkspace.name`) is `[String]`. These must agree to use `sourced_from`.", - "2. The type of `Widget.workspace_created_at` is `[DateTime]`, but the type of it's source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." + "1. The type of `Widget.workspace_name` is `String`, but the type of its source (`WidgetWorkspace.name`) is `[String]`. These must agree to use `sourced_from`.", + "2. The type of `Widget.workspace_created_at` is `[DateTime]`, but the type of its source (`WidgetWorkspace.created_at`) is `DateTime`. These must agree to use `sourced_from`." ) end @@ -1298,35 +1300,217 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end end - describe "`parent_relationship` validations" do - it "does not raise an error for a valid `parent_relationship` configuration" do - expect { nested_sourced_from_schema }.not_to raise_error + describe "nested `sourced_from` update targets" do + it "dumps a nested update target on the source type, keyed by the qualified relationship" do + expect_statline_update_target_with(nested_sourced_from_schema) end - it "works with a non-list (object) embedding field" do - expect { - nested_sourced_from_schema(players_field: "Player!") - }.not_to raise_error + it "omits `path_identifier_params` for a non-list (object) embedding field, since there is no element to match" do + expect_statline_update_target_with(nested_sourced_from_schema(players_field: "Player!"), path_identifier_params: {}) end - it "accepts an explicit `parent_field_name:` to identify the embedding field" do - expect { - nested_sourced_from_schema( - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "players" } + it "bundles every `sourced_from` field on the nested type into `field_params`" do + metadata = nested_sourced_from_schema( + on_player: ->(t) { + t.field "assists", "Int" do |f| + f.sourced_from "statLine", "assists" + end + }, + on_statline: ->(t) { t.field "assists", "Int" } + ) + + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "goals", cardinality: :one), + "assists" => dynamic_param_with(source_path: "assists", cardinality: :one) + }) + end + + it "resolves a `sourced_from` field from a nested path on the source type" do + metadata = nested_sourced_from_schema( + player_goals_source: "stats.goals", + on_statline: ->(t) { t.field "stats", "StatLineStats" } + ) do |s| + s.object_type "StatLineStats" do |t| + t.field "goals", "Int" + end + end + + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "stats.goals", cardinality: :one) + }) + end + + it "keys `field_params` by the `sourced_from` field's `name_in_index`" do + metadata = nested_sourced_from_schema( + on_player: ->(t) { + t.field "assists", "Int", name_in_index: "assists_in_index" do |f| + f.sourced_from "statLine", "assists" + end + }, + on_statline: ->(t) { t.field "assists", "Int" } + ) + + expect_statline_update_target_with(metadata, field_params: { + "goals" => dynamic_param_with(source_path: "goals", cardinality: :one), + "assists_in_index" => dynamic_param_with(source_path: "assists", cardinality: :one) + }) + end + + context "on a root type that uses custom routing" do + it "determines the `routing_value_source` from an `equivalent_field` configured on the root relationship" do + metadata = nested_sourced_from_schema( + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" }, + on_statlines_relationship: ->(r) { + # A nested routing source forces the field-path resolver to descend through the parent field. + r.equivalent_field "stats.owner_id", locally_named: "team_owner_id" + }, + on_statline: ->(t) { t.field "stats", "StatLineStats" } + ) do |s| + s.object_type "StatLineStats" do |t| + t.field "owner_id", "ID" + end + end + + expect_statline_update_target_with(metadata, routing_value_source: "stats.owner_id") + end + + it "raises a clear error when the equivalent field is a `graphql_only` field with no indexing path" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" }, + on_statlines_relationship: ->(r) { r.equivalent_field "owner_id", locally_named: "team_owner_id" }, + on_statline: ->(t) { t.field "owner_id", "ID", graphql_only: true } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`StatLine.owner_id` (referenced from an `equivalent_field` defined on `Team.statLines`) does not exist" + ) + end + + it "raises a clear error when no `equivalent_field` is configured for the custom routing field" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_owner_id", "ID!" }, + on_teams_index: ->(i) { i.route_with "team_owner_id" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "Cannot update `Team` documents with data from related `statLines` events", + "`Team` uses custom shard routing but we don't know what `statLines` field to use to route the `Team` update requests", + "add a call like this to the `Team.statLines` relationship definition", + '`rel.equivalent_field "[StatLine field]", locally_named: "team_owner_id"`' + ) + end + end + + context "on a root type that uses a rollover index" do + it "determines the `rollover_timestamp_value_source` from an `equivalent_field` configured on the root relationship" do + metadata = nested_sourced_from_schema( + on_team: ->(t) { t.field "team_created_at", "DateTime" }, + on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" }, + on_statlines_relationship: ->(r) { r.equivalent_field "created_at", locally_named: "team_created_at" }, + on_statline: ->(t) { t.field "created_at", "DateTime" } + ) + + expect_statline_update_target_with(metadata, rollover_timestamp_value_source: "created_at") + end + + it "raises a clear error when no `equivalent_field` is configured for the rollover timestamp field" do + expect { + nested_sourced_from_schema( + on_team: ->(t) { t.field "team_created_at", "DateTime" }, + on_teams_index: ->(i) { i.rollover :yearly, "team_created_at" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "Cannot update `Team` documents with data from related `statLines` events", + "`Team` uses a rollover index but we don't know what `statLines` timestamp field to use to select an index for the `Team` update requests", + "add a call like this to the `Team.statLines` relationship definition", + '`rel.equivalent_field "[StatLine field]", locally_named: "team_created_at"`' + ) + end + end + + describe "validations" do + it "raises an error when the nested `sourced_from` field does not exist on the sourced type" do + expect { + nested_sourced_from_schema(player_goals_source: "nonexistent") + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.goals` has an invalid `sourced_from` argument: `StatLine.nonexistent` does not exist as an indexing field." + ) + end + + it "raises an error when the nested `sourced_from` field's type does not match its source's type" do + expect { + nested_sourced_from_schema(player_goals_type: "String") + }.to raise_error Errors::SchemaError, a_string_including( + "The type of `Player.goals` is `String`, but the type of its source (`StatLine.goals`) is `Int`. These must agree to use `sourced_from`." + ) + end + + it "raises an error when the root index has not been configured with `has_had_multiple_sources!`" do + expect { + nested_sourced_from_schema(multiple_sources: false) + }.to raise_error Errors::SchemaError, a_string_including( + "Type `Team` has `sourced_from` fields (via `Player.statLine`) but its index `teams` has not been configured with `has_had_multiple_sources!`" + ) + end + + it "raises an error when the leaf relationship is `relates_to_many`" do + expect { + object_type_metadata_for "Team" do |s| + s.object_type "Team" do |t| + t.field "id", "ID!" + t.field "players", "[Player!]!" do |f| + f.mapping type: "object" + end + t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true + t.index("teams") { |i| i.has_had_multiple_sources! } + end + + s.object_type "Player" do |t| + t.field "id", "ID!" + t.field "goals", "Int" do |f| + f.sourced_from "statLines", "goals" + end + t.relates_to_many "statLines", "StatLine", via: "playerId", dir: :in, indexing_only: true do |r| + r.parent_relationship "Team", "statLines" + end + end + + s.object_type "StatLine" do |t| + t.field "id", "ID!" + t.field "teamId", "ID" + t.field "playerId", "ID" + t.field "goals", "Int" + t.index "stat_lines" + end + end + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLines` (referenced from `sourced_from` on field(s): `goals`) is a `relates_to_many` relationship, but `sourced_from` is only supported on a `relates_to_one` relationship." ) - }.not_to raise_error + end + end + end + + describe "`parent_relationship` validations" do + it "accepts an explicit `parent_field_name:` to identify the embedding field" do + metadata = nested_sourced_from_schema( + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "players" } + ) + + expect_statline_update_target_with(metadata) end it "discovers an embedding field declared with `indexing_only: true`" do # An `indexing_only: true` field is absent from `graphql_fields_by_name` but present in # `indexing_fields_by_name_in_index`, so this only resolves when the latter is used. - expect { - nested_sourced_from_schema(players_field: nil, on_team: ->(t) { - t.field "players", "[Player!]!", indexing_only: true do |f| - f.mapping type: "object" - end - }) - }.not_to raise_error + metadata = nested_sourced_from_schema(players_field: nil, on_team: ->(t) { + t.field "players", "[Player!]!", indexing_only: true do |f| + f.mapping type: "object" + end + }) + + expect_statline_update_target_with(metadata) end it "raises an error when `parent_relationship` is called twice on the same relationship" do @@ -1393,16 +1577,16 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) # League -> Team -> Player -> GameAppearance is a 4-level `parent_relationship` chain (leaf # to root: GameAppearance's relationship walks up through Player and Team to the indexed # League). All four relationships relate to the same `StatLine` source type, so the chain is - # valid. This can only pass if `resolve_chain` actually recurses through every level. - expect { - object_type_metadata_for "League" do |s| + # valid. This can only resolve correctly if `resolve_chain` recurses through every level. + metadata = + object_type_metadata_for "StatLine" do |s| s.object_type "League" do |t| t.field "id", "ID!" t.field "teams", "[Team!]!" do |f| f.mapping type: "object" end t.relates_to_many "statLines", "StatLine", via: "leagueId", dir: :in, indexing_only: true - t.index "leagues" + t.index("leagues") { |i| i.has_had_multiple_sources! } end s.object_type "Team" do |t| @@ -1445,7 +1629,10 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) t.index "stat_lines" end end - }.not_to raise_error + + # The qualified relationship spans every embedding field from the root index down to the leaf, + # proving the chain recursed through all four levels. + expect(nested_update_targets_by_relationship(metadata).keys).to contain_exactly("teams.players.gameAppearances.statLine") end it "raises an error when the parent type does not exist" do @@ -1493,6 +1680,42 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) ) end + it "raises an error when a relationship in the chain uses an outbound foreign key" do + expect { + nested_sourced_from_schema(player_statline_dir: :out, player_statline_via: "statLineId") + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLine` (referenced from `sourced_from` on field(s): `goals`) has an outbound foreign key (`dir: :out`), but `sourced_from` is only supported via inbound foreign key (`dir: :in`) relationships." + ) + end + + it "raises an error when a relationship in the chain uses an `additional_filter`" do + expect { + nested_sourced_from_schema( + on_player_relationship: ->(r) { + r.parent_relationship "Team", "statLines" + r.additional_filter status: "active" + } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`Player.statLine` (referenced from `sourced_from` on field(s): `goals`) uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`." + ) + end + + it "raises an error when a non-leaf (root) relationship in the chain uses an `additional_filter`" do + # Routability (inbound FK, no `additional_filter`) must hold for *every* relationship in the chain, + # not just the leaf that backs the `sourced_from` field: each link routes the source event up to the + # root document, so a filtered/outbound link anywhere would silently mismatch. Here we break the + # *root* relationship (`Team.statLines`) — the leaf (`Player.statLine`) remains valid — to prove the + # chain-wide validation covers non-leaf relationships and isn't narrowed to the leaf. + expect { + nested_sourced_from_schema( + on_statlines_relationship: ->(r) { r.additional_filter status: "active" } + ) + }.to raise_error Errors::SchemaError, a_string_including( + "`Team.statLines` (referenced from `sourced_from` on field(s): `goals`) uses an `additional_filter`, but `sourced_from` is not supported on relationships with `additional_filter`." + ) + end + it "raises an error when the chain terminates at a non-indexed type" do expect { nested_sourced_from_schema(index_teams: false) @@ -1526,16 +1749,18 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) end it "uses `parent_field_name:` to disambiguate when multiple embedding fields exist" do - expect { - nested_sourced_from_schema( - on_team: ->(t) { - t.field "bench_players", "[Player!]!" do |f| - f.mapping type: "object" - end - }, - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "bench_players" } - ) - }.not_to raise_error + metadata = nested_sourced_from_schema( + on_team: ->(t) { + t.field "bench_players", "[Player!]!" do |f| + f.mapping type: "object" + end + }, + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines", parent_field_name: "bench_players" } + ) + + # The qualified relationship reflects `bench_players`, confirming disambiguation chose that field + # rather than the also-eligible `players`. + expect_statline_update_target_with(metadata, relationship: "bench_players.statLine") end it "raises an error when an explicit `parent_field_name:` references a non-existent field" do @@ -1547,59 +1772,110 @@ def raise_error_about_workspace_relationship(details, sourced_fields: true) "`Player.statLine` references field `Team.nonExistentField` via `parent_relationship`, but that field does not exist." ) end + end - def nested_sourced_from_schema( - on_team: nil, - on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines" }, - player_indexing_only: true, - players_field: "[Player!]!", - index_teams: true, - index_players: false - ) - object_type_metadata_for "Team" do |s| - s.object_type "Team" do |t| - t.field "id", "ID!" + # Projects a source type's update targets to a `{qualified_relationship => target}` map, excluding the + # type's own `__self` target — leaving one entry per `parent_relationship` chain backing `sourced_from`. + def nested_update_targets_by_relationship(metadata) + metadata + .update_targets + .to_h { |t| [t.relationship, t] } + .except(SELF_RELATIONSHIP_NAME) + end - if players_field - t.field "players", players_field do |f| - f.mapping type: "object" if players_field.start_with?("[") - end - end + # Asserts the full nested `UpdateTarget` `StatLine` events produce for `Team`, with defaults for the + # common `nested_sourced_from_schema` case so each test overrides only the attributes it exercises. + def expect_statline_update_target_with( + metadata, + relationship: "players.statLine", + routing_value_source: nil, + rollover_timestamp_value_source: nil, + field_params: {"goals" => dynamic_param_with(source_path: "goals", cardinality: :one)}, + path_identifier_params: {"playerId" => dynamic_param_with(source_path: "playerId", cardinality: :one)} + ) + targets = nested_update_targets_by_relationship(metadata) + expect(targets.keys).to contain_exactly(relationship) + + target = targets.fetch(relationship) + expect(target.type).to eq "Team" + expect(target.relationship).to eq relationship + expect(target.script_id).to eq(INDEX_DATA_UPDATE_SCRIPT_ID) + expect(target.id_source).to eq "teamId" + expect(target.routing_value_source).to eq(routing_value_source) + expect(target.rollover_timestamp_value_source).to eq(rollover_timestamp_value_source) + expect(target.top_level_fields_params).to eq({}) + expect(target.sourced_from_nested_params.field_params).to eq(field_params) + expect(target.sourced_from_nested_params.path_identifier_params).to eq(path_identifier_params) + expect(target.metadata_params).to eq(standard_metadata_params(relationship: relationship)) + end - t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true - on_team&.call(t) + def nested_sourced_from_schema( + on_team: nil, + on_player: nil, + on_statlines_relationship: nil, + on_player_relationship: ->(r) { r.parent_relationship "Team", "statLines" }, + player_indexing_only: true, + players_field: "[Player!]!", + index_teams: true, + index_players: false, + multiple_sources: true, + on_teams_index: nil, + on_statline: nil, + player_goals_type: "Int", + player_goals_source: "goals", + player_statline_dir: :in, + player_statline_via: "playerId" + ) + # `StatLine` is the source type, so its metadata carries the nested update targets we assert on. + object_type_metadata_for "StatLine" do |s| + s.object_type "Team" do |t| + t.field "id", "ID!" - if index_teams - t.index("teams") { |i| i.has_had_multiple_sources! } + if players_field + t.field "players", players_field do |f| + f.mapping type: "object" if players_field.start_with?("[") end end - s.object_type "Player" do |t| - t.field "id", "ID!" + t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true, &on_statlines_relationship + on_team&.call(t) - t.field "goals", "Int" do |f| - f.sourced_from "statLine", "goals" + if index_teams + t.index("teams") do |i| + i.has_had_multiple_sources! if multiple_sources + on_teams_index&.call(i) end + end + end - t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: player_indexing_only do |r| - on_player_relationship.call(r) - end + s.object_type "Player" do |t| + t.field "id", "ID!" - if index_players - t.index("players") { |i| i.has_had_multiple_sources! } - end + t.field "goals", player_goals_type do |f| + f.sourced_from "statLine", player_goals_source end - s.object_type "StatLine" do |t| - t.field "id", "ID!" - t.field "teamId", "ID" - t.field "playerId", "ID" - t.field "goals", "Int" - t.index "stat_lines" + t.relates_to_one "statLine", "StatLine", via: player_statline_via, dir: player_statline_dir, indexing_only: player_indexing_only do |r| + on_player_relationship.call(r) end - yield s if block_given? + on_player&.call(t) + + if index_players + t.index("players") { |i| i.has_had_multiple_sources! } + end end + + s.object_type "StatLine" do |t| + t.field "id", "ID!" + t.field "teamId", "ID" + t.field "playerId", "ID" + t.field "goals", "Int" + on_statline&.call(t) + t.index "stat_lines" + end + + yield s if block_given? end end