-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathplanner.rb
More file actions
470 lines (403 loc) · 19 KB
/
planner.rb
File metadata and controls
470 lines (403 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# frozen_string_literal: true
require_relative "planner/step"
module GraphQL
module Stitching
# Planner partitions request selections by best-fit graph locations,
# and provides a query plan with sequential execution steps.
class Planner
SUPERGRAPH_LOCATIONS = [Supergraph::SUPERGRAPH_LOCATION].freeze
ROOT_INDEX = 0
class ScopePartition
attr_reader :location, :selections
def initialize(location:, selections:)
@location = location
@selections = selections
end
end
def initialize(request)
@request = request
@supergraph = request.supergraph
@planning_index = ROOT_INDEX
@steps_by_entrypoint = {}
@errors = nil
end
def perform
build_root_entrypoints
expand_abstract_resolvers
Plan.new(
ops: steps.map!(&:to_plan_op),
claims: @request.claims&.to_a || EMPTY_ARRAY,
errors: @errors || EMPTY_ARRAY,
)
end
def steps
@steps_by_entrypoint.values.sort_by!(&:index)
end
private
# **
# Algorithm:
#
# A) Group all root selections by their preferred entrypoint locations.
# A.1) Group query fields by location for parallel execution.
# A.2) Partition mutation fields by consecutive location for serial execution.
# A.3) Permit exactly one subscription field.
#
# B) Extract contiguous selections for each entrypoint location.
# B.1) Selections on interface types that do not belong to the interface at the
# entrypoint location are expanded into concrete type fragments prior to extraction.
# B.2) Filter the selection tree down to just fields of the entrypoint location.
# Adjoining selections not available here get split off into new entrypoints (C).
# B.3) Collect all variable definitions used within the filtered selection.
# These specify which request variables to pass along with each step.
# B.4) Add a `__typename` export to abstracts and types that implement fragments.
# This provides resolved type information used during execution.
#
# C) Delegate adjoining selections to new entrypoint locations.
# C.1) Distribute unique fields among their required locations.
# C.2) Distribute non-unique fields among locations that were added during C.1.
# C.3) Distribute remaining fields among locations weighted by greatest availability.
#
# D) Create paths routing to new entrypoint locations via resolver queries.
# D.1) Types joining through multiple keys route using A* search.
# D.2) Types joining through a single key route via quick location match.
# (D.2 is an optional optimization of D.1)
#
# E) Translate resolver pathways into new entrypoints.
# E.1) Add the key of each resolver query into the prior location's selection set.
# E.2) Add a planner step for each new entrypoint location, then extract it (B).
#
# F) Wrap concrete selections targeting abstract resolvers in typed fragments.
# **
# adds a planning step for fetching and inserting data into the aggregate result.
def add_step(
location:,
parent_index:,
parent_type:,
selections:,
variables: {},
path: [],
operation_type: QUERY_OP,
resolver: nil
)
# coalesce repeat parameters into a single entrypoint
entrypoint = String.new
entrypoint << parent_index.to_s << "/" << location << "/" << parent_type.graphql_name
entrypoint << "/" << (resolver&.key&.to_s || "") << "/#"
path.each { entrypoint << "/" << _1 }
step = @steps_by_entrypoint[entrypoint]
next_index = step ? parent_index : @planning_index += 1
unless selections.empty?
selections = extract_locale_selections(location, parent_type, next_index, selections, path, variables)
end
if step.nil?
@steps_by_entrypoint[entrypoint] = Step.new(
index: next_index,
after: parent_index,
location: location,
parent_type: parent_type,
operation_type: operation_type,
selections: selections,
variables: variables,
path: path,
resolver: resolver,
)
else
step.selections.concat(selections)
step
end
end
def add_unauthorized(path)
@errors ||= []
@errors << Plan::Error.new(
code: "unauthorized",
path: path,
)
end
# A) Group all root selections by their preferred entrypoint locations.
def build_root_entrypoints
parent_type = @request.query.root_type_for_operation(@request.operation.operation_type)
case @request.operation.operation_type
when QUERY_OP
# A.1) Group query fields by location for parallel execution.
selections_by_location = Hash.new { |h, k| h[k] = [] }
each_field_in_scope(parent_type, @request.operation.selections) do |node|
locations = @supergraph.locations_by_type_and_field[parent_type.graphql_name][node.name] || SUPERGRAPH_LOCATIONS
selections_by_location[locations.first] << node
end
selections_by_location.each do |location, selections|
add_step(
location: location,
parent_index: ROOT_INDEX,
parent_type: parent_type,
selections: selections,
operation_type: QUERY_OP,
)
end
when MUTATION_OP
# A.2) Partition mutation fields by consecutive location for serial execution.
partitions = []
each_field_in_scope(parent_type, @request.operation.selections) do |node|
next_location = @supergraph.locations_by_type_and_field[parent_type.graphql_name][node.name].first
if partitions.none? || partitions.last.location != next_location
partitions << ScopePartition.new(location: next_location, selections: [])
end
partitions.last.selections << node
end
partitions.reduce(ROOT_INDEX) do |parent_index, partition|
add_step(
location: partition.location,
parent_index: parent_index,
parent_type: parent_type,
selections: partition.selections,
operation_type: MUTATION_OP,
).index
end
when SUBSCRIPTION_OP
# A.3) Permit exactly one subscription field.
each_field_in_scope(parent_type, @request.operation.selections) do |node|
raise DocumentError.new("root field") unless @steps_by_entrypoint.empty?
locations = @supergraph.locations_by_type_and_field[parent_type.graphql_name][node.name] || SUPERGRAPH_LOCATIONS
add_step(
location: locations.first,
parent_index: ROOT_INDEX,
parent_type: parent_type,
selections: [node],
operation_type: SUBSCRIPTION_OP,
)
end
else
raise DocumentError.new("operation type")
end
end
def each_field_in_scope(parent_type, input_selections, &block)
input_selections.each do |node|
case node
when GraphQL::Language::Nodes::Field
if @request.authorized?(parent_type.graphql_name, node.name)
yield(node)
else
add_unauthorized([node.alias || node.name])
end
when GraphQL::Language::Nodes::InlineFragment
next unless node.type.nil? || parent_type.graphql_name == node.type.name
each_field_in_scope(parent_type, node.selections, &block)
when GraphQL::Language::Nodes::FragmentSpread
fragment = @request.fragment_definitions[node.name]
next unless parent_type.graphql_name == fragment.type.name
each_field_in_scope(parent_type, fragment.selections, &block)
else
raise DocumentError.new("selection node type")
end
end
end
# B) Contiguous selections are extracted for each entrypoint location.
def extract_locale_selections(
current_location,
parent_type,
parent_index,
input_selections,
path,
locale_variables,
locale_selections = []
)
# B.1) Expand selections on interface types that do not belong to this location.
input_selections = expand_interface_selections(current_location, parent_type, input_selections)
# B.2) Filter the selection tree down to just fields of the entrypoint location.
# Adjoining selections not available here get split off into new entrypoints (C).
remote_selections = nil
requires_typename = parent_type.kind.abstract?
input_selections.each do |node|
case node
when GraphQL::Language::Nodes::Field
if node.alias&.start_with?(TypeResolver::EXPORT_PREFIX) && node.object_id != TypeResolver::TYPENAME_EXPORT_NODE.object_id
raise StitchingError, %(Alias "#{node.alias}" is not allowed because "#{TypeResolver::EXPORT_PREFIX}" is a reserved prefix.)
elsif node.name == TYPENAME
locale_selections << node
next
elsif !@request.authorized?(parent_type.graphql_name, node.name)
requires_typename = true
add_unauthorized([*path, node.alias || node.name])
next
end
possible_locations = @supergraph.locations_by_type_and_field[parent_type.graphql_name][node.name] || SUPERGRAPH_LOCATIONS
unless possible_locations.include?(current_location)
remote_selections ||= []
remote_selections << node
next
end
# B.3) Collect all variable definitions used within the filtered selection.
extract_node_variables(node, locale_variables)
schema_fields = @supergraph.memoized_schema_fields(parent_type.graphql_name)
field_type = schema_fields[node.name].type.unwrap
if Util.is_leaf_type?(field_type)
locale_selections << node
else
path.push(node.alias || node.name)
selection_set = extract_locale_selections(current_location, field_type, parent_index, node.selections, path, locale_variables)
path.pop
locale_selections << node.merge(selections: selection_set)
end
when GraphQL::Language::Nodes::InlineFragment
fragment_type = node.type ? @supergraph.memoized_schema_types[node.type.name] : parent_type
next unless @supergraph.locations_by_type[fragment_type.graphql_name].include?(current_location)
is_same_scope = fragment_type == parent_type
selection_set = is_same_scope ? locale_selections : []
extract_locale_selections(current_location, fragment_type, parent_index, node.selections, path, locale_variables, selection_set)
unless is_same_scope
locale_selections << node.merge(selections: selection_set)
requires_typename = true
end
when GraphQL::Language::Nodes::FragmentSpread
fragment = @request.fragment_definitions[node.name]
next unless @supergraph.locations_by_type[fragment.type.name].include?(current_location)
requires_typename = true
fragment_type = @supergraph.memoized_schema_types[fragment.type.name]
is_same_scope = fragment_type == parent_type
selection_set = is_same_scope ? locale_selections : []
extract_locale_selections(current_location, fragment_type, parent_index, fragment.selections, path, locale_variables, selection_set)
unless is_same_scope
locale_selections << GraphQL::Language::Nodes::InlineFragment.new(type: fragment.type, selections: selection_set)
end
else
raise DocumentError.new("selection node type")
end
end
# B.4) Add a `__typename` export to abstracts and types that implement
# fragments so that resolved type information is available during execution.
if requires_typename && !locale_selections.include?(TypeResolver::TYPENAME_EXPORT_NODE)
locale_selections << TypeResolver::TYPENAME_EXPORT_NODE
end
if remote_selections
# C) Delegate adjoining selections to new entrypoint locations.
remote_selections_by_location = delegate_remote_selections(parent_type, remote_selections)
# D) Create paths routing to new entrypoint locations via resolver queries.
routes = @supergraph.route_type_to_locations(parent_type.graphql_name, current_location, remote_selections_by_location.each_key)
# E) Translate resolver pathways into new entrypoints.
routes.each_value do |route|
route.reduce(locale_selections) do |parent_selections, resolver|
# E.1) Add the key of each resolver query into the prior location's selection set.
parent_selections.push(*resolver.key.export_nodes) if resolver.key
parent_selections.uniq! do |node|
export_node = node.is_a?(GraphQL::Language::Nodes::Field) && TypeResolver.export_key?(node.alias)
export_node ? node.alias : node.object_id
end
# E.2) Add a planner step for each new entrypoint location.
add_step(
location: resolver.location,
parent_index: parent_index,
parent_type: parent_type,
selections: remote_selections_by_location[resolver.location] || [],
path: path.dup,
resolver: resolver.key ? resolver : nil,
).selections
end
end
end
locale_selections
end
# B.1) Selections on interface types that do not belong to the interface at the
# entrypoint location are expanded into concrete type fragments prior to extraction.
def expand_interface_selections(current_location, parent_type, input_selections)
return input_selections unless parent_type.kind.interface?
local_interface_fields = @supergraph.fields_by_type_and_location[parent_type.graphql_name][current_location]
expanded_selections = nil
input_selections = input_selections.filter_map do |node|
if node.is_a?(GraphQL::Language::Nodes::Field) && node.name != TYPENAME && !local_interface_fields.include?(node.name)
expanded_selections ||= []
expanded_selections << node
nil
else
node
end
end
if expanded_selections
@request.query.possible_types(parent_type).each do |possible_type|
next unless @supergraph.locations_by_type[possible_type.graphql_name].include?(current_location)
type_name = GraphQL::Language::Nodes::TypeName.new(name: possible_type.graphql_name)
input_selections << GraphQL::Language::Nodes::InlineFragment.new(type: type_name, selections: expanded_selections)
end
end
input_selections
end
# B.3) Collect all variable definitions used within the filtered selection.
# These specify which request variables to pass along with each step.
def extract_node_variables(node_with_args, variable_definitions)
node_with_args.arguments.each do |argument|
case argument.value
when GraphQL::Language::Nodes::InputObject
extract_node_variables(argument.value, variable_definitions)
when GraphQL::Language::Nodes::VariableIdentifier
variable_definitions[argument.value.name] ||= @request.variable_definitions[argument.value.name]
end
end
if node_with_args.respond_to?(:directives)
node_with_args.directives.each do |directive|
extract_node_variables(directive, variable_definitions)
end
end
end
# C) Delegate adjoining selections to new entrypoint locations.
def delegate_remote_selections(parent_type, remote_selections)
possible_locations_by_field = @supergraph.locations_by_type_and_field[parent_type.graphql_name]
selections_by_location = {}
# C.1) Distribute unique fields among their required locations.
remote_selections.reject! do |node|
possible_locations = possible_locations_by_field[node.name]
if possible_locations.length == 1
selections_by_location[possible_locations.first] ||= []
selections_by_location[possible_locations.first] << node
true
end
end
# C.2) Distribute non-unique fields among locations that were added during C.1.
if !selections_by_location.empty? && !remote_selections.empty?
available_locations = Set.new(selections_by_location.each_key)
remote_selections.reject! do |node|
used_location = possible_locations_by_field[node.name].find { available_locations.include?(_1) }
if used_location
selections_by_location[used_location] << node
true
end
end
end
# C.3) Distribute remaining fields among locations weighted by greatest availability.
if !remote_selections.empty?
field_count_by_location = Hash.new(0)
remote_selections.each do |node|
possible_locations_by_field[node.name].each do |location|
field_count_by_location[location] += 1
end
end
remote_selections.each do |node|
possible_locations = possible_locations_by_field[node.name]
preferred_location = possible_locations.max_by { field_count_by_location[_1] } || possible_locations.first
selections_by_location[preferred_location] ||= []
selections_by_location[preferred_location] << node
end
end
selections_by_location
end
# F) Wrap concrete selections targeting abstract resolvers in typed fragments.
def expand_abstract_resolvers
@steps_by_entrypoint.each_value do |step|
next unless step.resolver
resolver_type = @supergraph.memoized_schema_types[step.resolver.type_name]
next unless resolver_type.kind.abstract?
next if resolver_type == step.parent_type
expanded_selections = nil
step.selections.reject! do |node|
if node.is_a?(GraphQL::Language::Nodes::Field)
expanded_selections ||= []
expanded_selections << node
true
end
end
if expanded_selections
type_name = GraphQL::Language::Nodes::TypeName.new(name: step.parent_type.graphql_name)
step.selections << GraphQL::Language::Nodes::InlineFragment.new(type: type_name, selections: expanded_selections)
end
end
end
end
end
end