Skip to content

Commit a2106c0

Browse files
committed
Priority keys
1 parent 02f09b2 commit a2106c0

16 files changed

Lines changed: 102 additions & 19 deletions

File tree

examples/bin/worker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ worker.register_workflow(MetadataWorkflow)
5454
worker.register_workflow(ParentCloseWorkflow)
5555
worker.register_workflow(ParentIdReuseWorkflow)
5656
worker.register_workflow(ParentWorkflow)
57+
worker.register_workflow(PriorityWorkflow)
5758
worker.register_workflow(ProcessFileWorkflow)
5859
worker.register_workflow(QueryWorkflow)
5960
worker.register_workflow(QuickTimeoutWorkflow)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
require 'workflows/priority_workflow'
2+
3+
describe PriorityWorkflow do
4+
subject { described_class }
5+
6+
it 'executes with priority' do
7+
workflow_id = SecureRandom.uuid
8+
run_id = Temporal.start_workflow(
9+
PriorityWorkflow,
10+
true,
11+
options: { workflow_id: workflow_id, priority_key: 4 },
12+
)
13+
Temporal.await_workflow_result(
14+
PriorityWorkflow,
15+
workflow_id: workflow_id,
16+
run_id: run_id,
17+
)
18+
execution_info = Temporal.fetch_workflow_execution_info('ruby-samples', workflow_id, run_id)
19+
expect(execution_info.priority_key).to eq(4)
20+
end
21+
end
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
require 'activities/hello_world_activity'
2+
3+
class PriorityWorkflow < Temporal::Workflow
4+
def execute(important = false)
5+
return HelloWorldActivity.execute!('somebody', options: { priority_key: important ? 1 : 5 })
6+
end
7+
end

lib/temporal/client.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def initialize(config)
4242
# @option options [Hash] :headers
4343
# @option options [Hash] :search_attributes
4444
# @option options [Integer] :start_delay determines the amount of seconds to wait before initiating a Workflow
45+
# @option options [Integer] :priority_key
4546
#
4647
# @return [String] workflow's run ID
4748
def start_workflow(workflow, *input, options: {}, **args)
@@ -68,7 +69,8 @@ def start_workflow(workflow, *input, options: {}, **args)
6869
headers: config.header_propagator_chain.inject(execution_options.headers),
6970
memo: execution_options.memo,
7071
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
71-
start_delay: execution_options.start_delay
72+
start_delay: execution_options.start_delay,
73+
priority_key: options[:priority_key]
7274
)
7375
else
7476
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
@@ -88,7 +90,8 @@ def start_workflow(workflow, *input, options: {}, **args)
8890
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
8991
signal_name: signal_name,
9092
signal_input: signal_input,
91-
start_delay: execution_options.start_delay
93+
start_delay: execution_options.start_delay,
94+
priority_key: options[:priority_key]
9295
)
9396
end
9497

@@ -112,6 +115,7 @@ def start_workflow(workflow, *input, options: {}, **args)
112115
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
113116
# @option options [Hash] :headers
114117
# @option options [Hash] :search_attributes
118+
# @option options [Integer] :priority_key
115119
#
116120
# @return [String] workflow's run ID
117121
def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
@@ -137,6 +141,7 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
137141
cron_schedule: cron_schedule,
138142
memo: execution_options.memo,
139143
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
144+
priority_key: options[:priority_key]
140145
)
141146

142147
response.run_id

lib/temporal/connection/grpc.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ def start_workflow_execution(
121121
cron_schedule: nil,
122122
memo: nil,
123123
search_attributes: nil,
124-
start_delay: nil
124+
start_delay: nil,
125+
priority_key: nil
125126
)
126127
request = Temporalio::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
127128
identity: identity,
@@ -149,6 +150,9 @@ def start_workflow_execution(
149150
),
150151
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
151152
indexed_fields: converter.to_payload_map_without_codec(search_attributes || {})
153+
),
154+
priority: priority_key.nil? ? nil : Temporalio::Api::Common::V1::Priority.new(
155+
priority_key: priority_key
152156
)
153157
)
154158

@@ -382,7 +386,8 @@ def signal_with_start_workflow_execution(
382386
cron_schedule: nil,
383387
memo: nil,
384388
search_attributes: nil,
385-
start_delay: nil
389+
start_delay: nil,
390+
priority_key: nil
386391
)
387392
proto_header_fields = if headers.nil?
388393
converter.to_payload_map({})
@@ -422,6 +427,9 @@ def signal_with_start_workflow_execution(
422427
),
423428
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
424429
indexed_fields: converter.to_payload_map_without_codec(search_attributes || {})
430+
),
431+
priority: priority_key.nil? ? nil : Temporalio::Api::Common::V1::Priority.new(
432+
priority_key: priority_key
425433
)
426434
)
427435

lib/temporal/connection/serializer/schedule_activity.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ def to_proto
1919
start_to_close_timeout: object.timeouts[:start_to_close],
2020
heartbeat_timeout: object.timeouts[:heartbeat],
2121
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto,
22-
header: serialize_headers(object.headers)
22+
header: serialize_headers(object.headers),
23+
priority: object.priority_key.nil? ? nil : Temporalio::Api::Common::V1::Priority.new(
24+
priority_key: object.priority_key
25+
)
2326
)
2427
)
2528
end

lib/temporal/connection/serializer/start_child_workflow.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def to_proto
3232
memo: serialize_memo(object.memo),
3333
workflow_id_reuse_policy: Temporal::Connection::Serializer::WorkflowIdReusePolicy.new(object.workflow_id_reuse_policy, converter).to_proto,
3434
search_attributes: serialize_search_attributes(object.search_attributes),
35+
priority: object.priority_key.nil? ? nil : Temporalio::Api::Common::V1::Priority.new(
36+
priority_key: object.priority_key
37+
)
3538
)
3639
)
3740
end

lib/temporal/workflow/command.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ module Temporal
22
class Workflow
33
module Command
44
# TODO: Move these classes into their own directories under workflow/command/*
5-
ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
6-
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :parent_close_policy, :timeouts, :headers, :cron_schedule, :memo, :workflow_id_reuse_policy, :search_attributes, keyword_init: true)
5+
ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :task_queue, :retry_policy, :timeouts, :headers, :priority_key, keyword_init: true)
6+
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :parent_close_policy, :timeouts, :headers, :cron_schedule, :memo, :workflow_id_reuse_policy, :search_attributes, :priority_key, keyword_init: true)
77
ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, :memo, :search_attributes, keyword_init: true)
88
RequestActivityCancellation = Struct.new(:activity_id, keyword_init: true)
99
RecordMarker = Struct.new(:name, :details, keyword_init: true)

lib/temporal/workflow/context.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ def execute_activity(activity_class, *input, **args)
8888
task_queue: execution_options.task_queue,
8989
retry_policy: execution_options.retry_policy,
9090
timeouts: execution_options.timeouts,
91-
headers: config.header_propagator_chain.inject(execution_options.headers)
91+
headers: config.header_propagator_chain.inject(execution_options.headers),
92+
priority_key: options[:priority_key]
9293
)
9394

9495
target, cancelation_id = schedule_command(command)
@@ -150,6 +151,7 @@ def execute_workflow(workflow_class, *input, **args)
150151
memo: execution_options.memo,
151152
workflow_id_reuse_policy: workflow_id_reuse_policy,
152153
search_attributes: Helpers.process_search_attributes(execution_options.search_attributes),
154+
priority_key: options[:priority_key]
153155
)
154156

155157
target, cancelation_id = schedule_command(command)

lib/temporal/workflow/execution_info.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
module Temporal
44
class Workflow
55
class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status,
6-
:history_length, :memo, :search_attributes, keyword_init: true)
6+
:history_length, :memo, :search_attributes, :priority_key, keyword_init: true)
77
STATUSES = [
88
Temporal::Workflow::Status::RUNNING,
99
Temporal::Workflow::Status::COMPLETED,
@@ -25,7 +25,8 @@ def self.generate_from(response, converter)
2525
status: Temporal::Workflow::Status::API_STATUS_MAP.fetch(response.status),
2626
history_length: response.history_length,
2727
memo: converter.from_payload_map(response.memo.fields),
28-
search_attributes: search_attributes
28+
search_attributes: search_attributes,
29+
priority_key: response.priority&.priority_key
2930
).freeze
3031
end
3132

0 commit comments

Comments
 (0)