Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ Custom gems can be added via `Gemfile-custom` (see `Gemfile-custom.example`), th
- Use `expect_to_return_non_nil_values_from_all_attributes` to test wrapper classes (like `WarehouseLambda`, `GraphQL`, `Indexer`, etc.). This automatically exercises every zero-argument method and verifies all dependencies are built successfully.
- Use the `:capture_logs` RSpec tag instead of logger test doubles for verifying log output. Access logs with `logged_jsons_of_type(message_type)`.
- Use `build_*` helper methods from `spec/support/builds_*.rb` to construct test objects. These helpers provide sensible defaults while allowing selective overrides for testing specific scenarios.
- Only tag a spec with `:dont_validate_graphql_schema` when it's actually required for the test to pass under `VALIDATE_GRAPHQL_SCHEMAS=1` (the tag skips that validation). First try to fix the test's schema so it produces a valid GraphQL schema while still exercising what the test is meant to exercise; reach for the tag only when that isn't possible. Verify by running the spec with `VALIDATE_GRAPHQL_SCHEMAS=1`.

## Important Patterns

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,11 @@ def runtime_metadata
)
end

# Registers a resolved `parent_relationship` chain on this index.
# Records the path segments that navigate from this index's documents down to a nested element that
# receives `sourced_from` data, keyed by the qualified relationship backing those fields.
# @api private
def register_resolved_relationship_chain(resolved_chain)
sourced_from_nested_paths_by_qualified_relationship[resolved_chain.qualified_relationship] = resolved_chain.sourced_from_nested_paths
def register_sourced_from_nested_paths(qualified_relationship, nested_paths)
sourced_from_nested_paths_by_qualified_relationship[qualified_relationship] = nested_paths
end

private
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/schema_artifacts/runtime_metadata/params"
require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_params"
require "elastic_graph/schema_definition/indexing/update_target_factory"
require "elastic_graph/schema_definition/indexing/update_target_resolver_support"

module ElasticGraph
module SchemaDefinition
module Indexing
# Resolves a relationship and a set of `sourced_from` fields into an `UpdateTarget` that instructs the
# indexer how to update a type from the related type's source events. This handles the *nested* case,
# where the `sourced_from` fields live on a type embedded within an indexed type (reached via a
# `parent_relationship` chain) and the target updates the root indexed type the embedded type nests
# within. (The *top-level* case—`sourced_from` fields directly on an indexed type—is handled by
# `TopLevelUpdateTargetResolver`.)
#
# @private
class NestedUpdateTargetResolver
def initialize(
object_type:,
sourced_fields:,
resolved_chain:,
field_path_resolver:,
schema_def_state:
)
@object_type = object_type
@sourced_fields = sourced_fields
@resolved_chain = resolved_chain
@field_path_resolver = field_path_resolver
@schema_def_state = schema_def_state
end

# Resolves the chain and `sourced_fields` into an `UpdateTarget` on the root indexed type,
# validating everything along the way.
#
# Returns a tuple of the `update_target` (if valid) and a list of errors.
def resolve
relationship_errors = validate_relationships
field_params, field_params_errors = UpdateTargetResolverSupport.resolve_sourced_field_params(
object_type: object_type,
related_type: related_type,
sourced_fields: sourced_fields,
field_path_resolver: field_path_resolver
)
routing_value_source, routing_error = resolve_field_source(UpdateTargetResolverSupport::RoutingSourceAdapter)
rollover_timestamp_value_source, rollover_timestamp_error = resolve_field_source(UpdateTargetResolverSupport::RolloverTimestampSourceAdapter)
# Routing/rollover values resolve from `equivalent_field`s on the root relationship, so they are
# validated there (matching how `TopLevelUpdateTargetResolver` validates its own relationship).
equivalent_field_errors = root_relationship.validate_equivalent_fields(field_path_resolver)
has_had_multiple_sources_errors = UpdateTargetResolverSupport.validate_has_had_multiple_sources(root_index, root_type, relationship)

all_errors = relationship_errors + field_params_errors + equivalent_field_errors + has_had_multiple_sources_errors +
[routing_error, rollover_timestamp_error].compact

if all_errors.empty?
update_target = UpdateTargetFactory.new_normal_indexing_update_target(
type: root_type.name,
relationship: resolved_chain.qualified_relationship,
id_source: root_relationship.foreign_key,
sourced_from_nested_params: SchemaArtifacts::RuntimeMetadata::SourcedFromNestedParams.new(
field_params: field_params,
path_identifier_params: resolved_chain.path_identifier_params
),
routing_value_source: routing_value_source,
rollover_timestamp_value_source: rollover_timestamp_value_source
)
end

[update_target, all_errors]
end

private

# @dynamic object_type, sourced_fields, resolved_chain, field_path_resolver, schema_def_state
attr_reader :object_type, :sourced_fields, :resolved_chain, :field_path_resolver, :schema_def_state

# The leaf relationship the chain was resolved from — the one backing this type's `sourced_from` fields.
def relationship
resolved_chain.leaf_relationship
end

def root_relationship
resolved_chain.root_relationship
end

def root_type
root_relationship.parent_type
end

def root_index
resolved_chain.root_index
end

def related_type
@related_type ||= schema_def_state.object_types_by_name.fetch(relationship.related_type.unwrap_non_null.name)
end

# Applies validations on the relationships backing nested `sourced_from` fields. Only the leaf must be
# `relates_to_one` (it's where a value is sourced through), but every relationship in the chain joins on
# a foreign key that routes the source event, so each must be routable (inbound foreign key, no filter).
def validate_relationships
leaf_prefix = UpdateTargetResolverSupport.relationship_error_prefix(relationship, sourced_fields)

UpdateTargetResolverSupport.validate_relationship_cardinality(relationship, error_prefix: leaf_prefix) +
resolved_chain.relationships.flat_map do |chain_relationship|
error_prefix = UpdateTargetResolverSupport.relationship_error_prefix(chain_relationship, sourced_fields)
UpdateTargetResolverSupport.validate_relationship_routability(chain_relationship, error_prefix: error_prefix)
end
end

# Resolves a routing/rollover field source via the shared helper, supplying the root type, index, and
# relationship — the update target updates the root indexed type via the root relationship, so routing
# and rollover (and the `equivalent_field` config) are resolved there.
def resolve_field_source(adapter)
UpdateTargetResolverSupport.resolve_field_source(
adapter,
relationship: root_relationship,
index_def: root_index,
related_type: related_type,
field_path_resolver: field_path_resolver,
updated_type: root_type
)
end
Comment thread
myronmarston marked this conversation as resolved.
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,11 @@
# frozen_string_literal: true

require "elastic_graph/errors"
require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment"
require "elastic_graph/schema_definition/indexing/resolved_relationship_chain"

module ElasticGraph
module SchemaDefinition
module Indexing
# The result of resolving a relationship chain.
#
# @private
class ResolvedRelationshipChain < ::Data.define(
:root_relationship, # Relationship the chain terminated at on the root indexed type
:leaf_relationship, # Relationship the chain was resolved from — backs `sourced_from` field(s)
:path_segments # Array<PathSegment> - the embedding fields to descend, ordered root-to-leaf
)
# The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain)
def qualified_relationship
(path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".")
end

# The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for
# each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment`
# for each object embedding field.
def sourced_from_nested_paths
path_segments.map do |segment|
if (source_field = segment.source_field_name)
SchemaArtifacts::RuntimeMetadata::ListPathSegment.new(
field: segment.field.name_in_index,
source_field: source_field
)
else
SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new(
field: segment.field.name_in_index
)
end
end
end
end

# Describes how to navigate from a parent type into a nested child element.
# For list fields, `source_field_name` identifies which element to update: the element
# whose `id` matches `event[source_field_name]`. We implicitly match on the `id` field
Expand Down Expand Up @@ -82,11 +50,11 @@ def initialize(schema_def_state:)
def resolve(starting_relationship)
errors = [] # : ::Array[::String]
path_segments = [] # : ::Array[PathSegment]
visited_relationships = Set[starting_relationship]
relationships = [] # : ::Array[SchemaElements::Relationship]
Comment thread
myronmarston marked this conversation as resolved.

# resolve_chain returns the chain's root relationship (the one with no parent_ref), or nil
# if it hit an error walking the chain (in which case the error is already recorded).
root_relationship = resolve_chain(starting_relationship, path_segments, errors, visited_relationships)
root_relationship = resolve_chain(starting_relationship, path_segments, relationships, errors)
return [nil, errors] unless root_relationship

# A valid chain must terminate at a relationship defined on an indexed type.
Expand All @@ -99,8 +67,7 @@ def resolve(starting_relationship)
end

resolved_chain = ResolvedRelationshipChain.new(
root_relationship: root_relationship,
leaf_relationship: starting_relationship,
relationships: relationships.reverse, # reverse so root-to-leaf order
path_segments: path_segments.reverse # reverse so root-to-leaf order
)

Expand All @@ -109,25 +76,27 @@ def resolve(starting_relationship)

private

# Recursively walks from leaf to root, building path segments in reverse. Returns the root
# relationship (the one with no parent_ref) on success, or nil if an error was encountered.
def resolve_chain(current_rel, path_segments, errors, visited_relationships)
# Recursively walks from leaf to root, collecting relationships and building path segments in reverse.
# Returns the root relationship (the one with no parent_ref) on success, or nil if an error was
# encountered.
def resolve_chain(current_rel, path_segments, relationships, errors)
relationships << current_rel

parent_ref = current_rel.parent_ref
return current_rel unless parent_ref

parent_rel = resolve_parent_ref(current_rel, parent_ref, errors, visited_relationships)
parent_rel = resolve_parent_ref(current_rel, parent_ref, relationships, errors)
return nil unless parent_rel

build_path_segment(current_rel, parent_rel.parent_type, path_segments, errors)
return nil if errors.any?

visited_relationships.add(parent_rel)
resolve_chain(parent_rel, path_segments, errors, visited_relationships)
resolve_chain(parent_rel, path_segments, relationships, errors)
end

# Resolves a parent_ref into the concrete parent relationship.
# Returns the parent relationship on success, or appends to errors and returns nil.
def resolve_parent_ref(current_rel, ref, errors, visited_relationships)
def resolve_parent_ref(current_rel, ref, relationships, errors)
unless current_rel.indexing_only
errors << "#{rel_description(current_rel)} uses `parent_relationship` but is not declared with " \
"`indexing_only: true`. Relationships with `parent_relationship` must be indexing-only."
Expand All @@ -149,7 +118,7 @@ def resolve_parent_ref(current_rel, ref, errors, visited_relationships)
return nil
end

if visited_relationships.include?(parent_rel)
if relationships.include?(parent_rel)
errors << "#{rel_description(current_rel)} creates a circular `parent_relationship` chain " \
"— `#{parent_type.name}.#{ref.relationship_name}` was already visited. The chain must terminate at a root indexed type."
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/schema_artifacts/runtime_metadata/params"
require "elastic_graph/schema_artifacts/runtime_metadata/sourced_from_nested_path_segment"
require "elastic_graph/support/memoizable_data"

module ElasticGraph
module SchemaDefinition
module Indexing
# The result of resolving a relationship chain.
#
# @private
class ResolvedRelationshipChain < Support::MemoizableData.define(
:relationships, # Array<Relationship> - every relationship in the chain, ordered root-to-leaf
:path_segments # Array<PathSegment> - the embedding fields to descend, ordered root-to-leaf
)
# The relationship the chain terminated at on the root indexed type.
def root_relationship
relationships.first
end

# The relationship the chain was resolved from — backs the `sourced_from` field(s).
def leaf_relationship
relationships.last
end

# The index the chain terminates at — where the root indexed type's documents (and their nested
# elements) live, and where the chain's navigation path is registered. The chain always terminates at
# an indexed type (enforced when it is resolved), so this is never `nil`.
def root_index
root_relationship.parent_type.index_def # : Index
end

# Records this chain's navigation path on its root index, so the painless script can locate the
# nested element to update at index time.
def register_on_root_index
root_index.register_sourced_from_nested_paths(qualified_relationship, sourced_from_nested_paths)
end

# The leaf relationship name qualified by its embedding-field path (hence unique per resolved chain)
def qualified_relationship
@qualified_relationship ||=
(path_segments.map { |segment| segment.field.name_in_index } + [leaf_relationship.name_in_index]).join(".")
end

# The runtime-metadata segments the painless script uses to navigate this chain: a `ListPathSegment` for
# each list embedding field (carrying the source field that matches the element) and an `ObjectPathSegment`
# for each object embedding field.
def sourced_from_nested_paths
@sourced_from_nested_paths ||= path_segments.map do |segment|
if (source_field = segment.source_field_name)
SchemaArtifacts::RuntimeMetadata::ListPathSegment.new(
field: segment.field.name_in_index,
source_field: source_field
)
else
SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new(
field: segment.field.name_in_index
)
end
end
end

# The params identifying which nested element to update at each level: one entry per list segment,
# pulling the matching value from the segment's foreign key on the source event. Object segments have no
# ambiguity, so they contribute no identifier.
def path_identifier_params
@path_identifier_params ||= path_segments.filter_map do |segment|
source_field = segment.source_field_name
next unless source_field

param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: source_field, cardinality: :one)
[source_field, param]
end.to_h
end
end
end
end
end
Loading