Skip to content

Commit 4fb3ddf

Browse files
Add support for more timeouts to Nexus operations (#390)
1 parent c40a658 commit 4fb3ddf

9 files changed

Lines changed: 91 additions & 3 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ jobs:
101101
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
102102
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
103103
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00
104+
104105
run: bundle exec rake TESTOPTS="--verbose"
105106

106107
- name: Deploy docs

temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ def initialize(endpoint:, service:, outbound:) # rubocop:disable Lint/MissingSup
1717
@outbound = outbound
1818
end
1919

20-
def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation_type: nil, summary: nil,
20+
def start_operation(operation, arg, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil,
21+
start_to_close_timeout: nil, cancellation_type: nil, summary: nil,
2122
cancellation: Workflow.cancellation, arg_hint: nil, result_hint: nil)
2223
@outbound.start_nexus_operation(
2324
Temporalio::Worker::Interceptor::Workflow::StartNexusOperationInput.new(
@@ -26,6 +27,8 @@ def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation
2627
operation: operation.to_s,
2728
arg:,
2829
schedule_to_close_timeout:,
30+
schedule_to_start_timeout:,
31+
start_to_close_timeout:,
2932
cancellation_type:,
3033
summary:,
3134
cancellation:,

temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,8 @@ def start_nexus_operation(input)
446446
operation: input.operation,
447447
input: @instance.payload_converter.to_payload(input.arg, hint: input.arg_hint),
448448
schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout),
449+
schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout),
450+
start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout),
449451
nexus_header: input.headers,
450452
cancellation_type: input.cancellation_type
451453
),

temporalio/lib/temporalio/worker/interceptor.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ def handle_update(input)
309309
:operation,
310310
:arg,
311311
:schedule_to_close_timeout,
312+
:schedule_to_start_timeout,
313+
:start_to_close_timeout,
312314
:cancellation_type,
313315
:summary,
314316
:cancellation,

temporalio/lib/temporalio/workflow/nexus_client.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ def service
2828
# @param operation [Symbol, String] Operation name.
2929
# @param arg [Object] Argument for the operation.
3030
# @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds.
31+
# @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the
32+
# operation has not started within this window, a SCHEDULE_TO_START timeout error is raised.
33+
# @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has
34+
# started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised.
3135
# @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation.
3236
# @param summary [String, nil] Optional summary for the operation (appears in UI/CLI).
3337
# @param cancellation [Cancellation] Cancellation for the operation.
@@ -38,6 +42,8 @@ def start_operation(
3842
operation,
3943
arg,
4044
schedule_to_close_timeout: nil,
45+
schedule_to_start_timeout: nil,
46+
start_to_close_timeout: nil,
4147
cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED,
4248
summary: nil,
4349
cancellation: Workflow.cancellation,
@@ -54,6 +60,10 @@ def start_operation(
5460
# @param operation [Symbol, String] Operation name.
5561
# @param arg [Object] Argument for the operation.
5662
# @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds.
63+
# @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the
64+
# operation has not started within this window, a SCHEDULE_TO_START timeout error is raised.
65+
# @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has
66+
# started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised.
5767
# @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation.
5868
# @param summary [String, nil] Optional summary for the operation (appears in UI/CLI).
5969
# @param cancellation [Cancellation] Cancellation for the operation.
@@ -65,15 +75,17 @@ def execute_operation(
6575
operation,
6676
arg,
6777
schedule_to_close_timeout: nil,
78+
schedule_to_start_timeout: nil,
79+
start_to_close_timeout: nil,
6880
cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED,
6981
summary: nil,
7082
cancellation: Workflow.cancellation,
7183
arg_hint: nil,
7284
result_hint: nil
7385
)
7486
start_operation(
75-
operation, arg, schedule_to_close_timeout:, cancellation_type:, summary:, cancellation:,
76-
arg_hint:, result_hint:
87+
operation, arg, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:,
88+
cancellation_type:, summary:, cancellation:, arg_hint:, result_hint:
7789
).result
7890
end
7991
end

temporalio/sig/temporalio/worker/interceptor.rbs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ module Temporalio
318318
attr_reader operation: String
319319
attr_reader arg: Object?
320320
attr_reader schedule_to_close_timeout: duration?
321+
attr_reader schedule_to_start_timeout: duration?
322+
attr_reader start_to_close_timeout: duration?
321323
attr_reader cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum
322324
attr_reader summary: String?
323325
attr_reader cancellation: Cancellation
@@ -331,6 +333,8 @@ module Temporalio
331333
operation: String,
332334
arg: Object?,
333335
schedule_to_close_timeout: duration?,
336+
schedule_to_start_timeout: duration?,
337+
start_to_close_timeout: duration?,
334338
cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum,
335339
summary: String?,
336340
cancellation: Cancellation,

temporalio/sig/temporalio/workflow/nexus_client.rbs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ module Temporalio
88
Symbol | String operation,
99
Object? arg,
1010
?schedule_to_close_timeout: duration?,
11+
?schedule_to_start_timeout: duration?,
12+
?start_to_close_timeout: duration?,
1113
?cancellation_type: NexusOperationCancellationType::enum,
1214
?summary: String?,
1315
?cancellation: Cancellation,
@@ -19,6 +21,8 @@ module Temporalio
1921
Symbol | String operation,
2022
Object? arg,
2123
?schedule_to_close_timeout: duration?,
24+
?schedule_to_start_timeout: duration?,
25+
?start_to_close_timeout: duration?,
2226
?cancellation_type: NexusOperationCancellationType::enum,
2327
?summary: String?,
2428
?cancellation: Cancellation,

temporalio/test/test.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ def initialize
147147
if target_host.empty?
148148
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
149149
logger: Logger.new($stdout),
150+
dev_server_download_version: 'v1.6.1-server-1.31.0-150.0',
150151
dev_server_extra_args: [
151152
# Allow continue as new to be immediate
152153
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',

temporalio/test/worker_workflow_nexus_test.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,65 @@ def test_nexus_operation_summary_in_history
426426
end
427427
end
428428

429+
# Test schedule_to_start_timeout: operation never picked up, times out before starting
430+
class NexusScheduleToStartTimeoutWorkflow < Temporalio::Workflow::Definition
431+
def execute(endpoint)
432+
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
433+
client.execute_operation(
434+
'workflow-operation',
435+
{ 'action' => 'wait-for-cancel' },
436+
schedule_to_start_timeout: 0.1
437+
)
438+
end
439+
end
440+
441+
def test_nexus_schedule_to_start_timeout
442+
# Create a nexus endpoint pointing to a task queue with no worker so no one
443+
# picks up the nexus task, triggering a SCHEDULE_TO_START timeout.
444+
nexus_task_queue = "tq-#{SecureRandom.uuid}"
445+
endpoint_name = "nexus-endpoint-#{nexus_task_queue}"
446+
endpoint = env.server.create_nexus_endpoint(name: endpoint_name, task_queue: nexus_task_queue)
447+
448+
begin
449+
err = assert_raises(Temporalio::Error::WorkflowFailedError) do
450+
execute_workflow(NexusScheduleToStartTimeoutWorkflow, endpoint_name)
451+
end
452+
453+
assert_instance_of Temporalio::Error::NexusOperationError, err.cause
454+
assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause
455+
assert_equal Temporalio::Error::TimeoutError::TimeoutType::SCHEDULE_TO_START, err.cause.cause.type
456+
ensure
457+
env.server.delete_nexus_endpoint(endpoint)
458+
end
459+
end
460+
461+
# Test start_to_close_timeout: operation starts (async token returned) but never completes
462+
class NexusStartToCloseTimeoutWorkflow < Temporalio::Workflow::Definition
463+
def execute(endpoint)
464+
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
465+
handle = client.start_operation(
466+
'workflow-operation',
467+
{ 'action' => 'wait-for-cancel' },
468+
start_to_close_timeout: 0.1
469+
)
470+
handle.result
471+
end
472+
end
473+
474+
def test_nexus_start_to_close_timeout
475+
env.with_kitchen_sink_worker(nexus: true) do |task_queue|
476+
endpoint = "nexus-endpoint-#{task_queue}"
477+
478+
err = assert_raises(Temporalio::Error::WorkflowFailedError) do
479+
execute_workflow(NexusStartToCloseTimeoutWorkflow, endpoint)
480+
end
481+
482+
assert_instance_of Temporalio::Error::NexusOperationError, err.cause
483+
assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause
484+
assert_equal Temporalio::Error::TimeoutError::TimeoutType::START_TO_CLOSE, err.cause.cause.type
485+
end
486+
end
487+
429488
class NexusOperationTracingWorkflow < Temporalio::Workflow::Definition
430489
def execute(endpoint)
431490
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')

0 commit comments

Comments
 (0)