Skip to content

Commit 6e8b2fb

Browse files
authored
Add activity heartbeat sample. (#36)
Closes #6
1 parent abcbfc9 commit 6e8b2fb

8 files changed

Lines changed: 192 additions & 1 deletion

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Prerequisites:
1919
## Samples
2020

2121
<!-- Keep this list in alphabetical order -->
22+
* [activity_heartbeating](activity_heartbeating) - Demonstrate activity heartbeating and proper cancellation handling.
2223
* [activity_simple](activity_simple) - Simple workflow that calls two activities.
2324
* [activity_worker](activity_worker) - Use Ruby activities from a workflow in another language.
2425
* [client_mtls](client_mtls) - Demonstrates how to use mutual TLS (mTLS) authentication with the Temporal Ruby SDK.

activity_heartbeating/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Activity Heartbeating and Cancellation
2+
3+
This sample demonstrates activity heartbeating and proper cancellation handling. The activity reports progress via heartbeats, which allows it to resume from the last reported progress if it fails and is retried. The sample also demonstrates how to properly handle cancellation in activities.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker
6+
from this directory:
7+
8+
bundle exec ruby worker.rb
9+
10+
Finally in another terminal, execute the workflow from this directory:
11+
12+
bundle exec ruby starter.rb
13+
14+
The workflow will start the activity that simulates work by tracking progress from 1 to 100. After 15 seconds, the client will cancel the workflow, which will cancel the activity. The activity will detect the cancellation, log it, and raise a cancellation error.
15+
16+
## What to look for
17+
18+
This sample showcases several important concepts:
19+
20+
1. **Activity Heartbeating**: The activity reports its progress using `context.heartbeat(progress)`. If the activity fails and is retried, it will resume from the last reported progress rather than starting over.
21+
22+
2. **Heartbeat Timeout**: The workflow sets a heartbeat timeout of 3 seconds for the activity, which means the server will consider the activity failed if it doesn't receive a heartbeat for more than 3 seconds.
23+
24+
3. **Cancellation Handling**: The activity checks for cancellation after each progress increment using `context.cancellation.canceled?`. When cancelled, it properly raises a `Temporalio::Error::CanceledError`.
25+
26+
4. **Cancellation Type**: The workflow uses `cancellation_type: :wait_cancellation_completed` to ensure the workflow doesn't proceed until the activity has acknowledged and processed the cancellation.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
5+
module ActivityHeartbeating
6+
module MyActivities
7+
# Activity that demonstrates progress tracking with heartbeating and cancellation
8+
class FakeProgress < Temporalio::Activity::Definition
9+
def execute(sleep_interval = 1.0)
10+
context = Temporalio::Activity::Context.current
11+
12+
begin
13+
# Allow for resuming from heartbeat details if available
14+
starting_point = context.info.heartbeat_details.first || 1
15+
16+
context.logger.info("Starting activity at progress: #{starting_point}")
17+
18+
(starting_point..100).each do |progress|
19+
# Sleep for the interval - checking cancellation after sleep
20+
sleep(sleep_interval)
21+
22+
context.logger.info("Progress: #{progress}")
23+
context.heartbeat(progress)
24+
end
25+
26+
context.logger.info('Fake progress activity completed')
27+
rescue Temporalio::Error::CanceledError
28+
# This catches the cancel just for demonstration, you usually don't want to catch it
29+
context.logger.info('Handling cancellation')
30+
raise # Re-raise to properly cancel the activity
31+
end
32+
end
33+
end
34+
end
35+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'my_activities'
5+
6+
module ActivityHeartbeating
7+
class MyWorkflow < Temporalio::Workflow::Definition
8+
def execute
9+
# Execute the activity with a 5-minute timeout and 3-second heartbeat timeout
10+
Temporalio::Workflow.execute_activity(
11+
MyActivities::FakeProgress,
12+
1.0, # 1 second sleep interval
13+
start_to_close_timeout: 5 * 60,
14+
heartbeat_timeout: 3,
15+
# Wait for activity cancellation completion
16+
cancellation_type: Temporalio::Workflow::ActivityCancellationType::WAIT_CANCELLATION_COMPLETED
17+
)
18+
rescue Temporalio::Error::ActivityError => e
19+
# This catches the cancel just for demonstration, you usually don't want to catch it
20+
if e.cause.is_a?(Temporalio::Error::CanceledError)
21+
Temporalio::Workflow.logger.info('Workflow cancelled along with its activity')
22+
end
23+
raise # Re-raise to properly cancel the workflow
24+
end
25+
end
26+
end

activity_heartbeating/starter.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'my_workflow'
5+
6+
# Create a client
7+
client = Temporalio::Client.connect('localhost:7233', 'default')
8+
9+
workflow_id = 'activity-heartbeating-workflow-id'
10+
task_queue = 'activity-heartbeating-sample'
11+
12+
# Start workflow
13+
puts 'Starting workflow'
14+
handle = client.start_workflow(
15+
ActivityHeartbeating::MyWorkflow,
16+
id: workflow_id,
17+
task_queue: task_queue
18+
)
19+
20+
puts "Workflow started with ID: #{workflow_id}"
21+
puts 'Waiting 15 seconds before cancelling workflow...'
22+
23+
# Wait some time to let the activity make progress
24+
sleep 15
25+
26+
# Cancel the workflow
27+
puts 'Cancelling workflow...'
28+
handle.cancel
29+
30+
begin
31+
# Wait for result (which will fail with cancellation)
32+
result = handle.result
33+
puts "Workflow completed with result: #{result}"
34+
rescue Temporalio::Error::WorkflowFailedError => e
35+
raise unless e.cause.is_a?(Temporalio::Error::CanceledError)
36+
37+
puts 'Workflow was successfully cancelled'
38+
end

activity_heartbeating/worker.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'my_activities'
4+
require_relative 'my_workflow'
5+
require 'logger'
6+
require 'temporalio/client'
7+
require 'temporalio/worker'
8+
9+
# Create a Temporal client
10+
client = Temporalio::Client.connect(
11+
'localhost:7233',
12+
'default',
13+
logger: Logger.new($stdout, level: Logger::INFO)
14+
)
15+
16+
# Create worker with the activities and workflow
17+
worker = Temporalio::Worker.new(
18+
client:,
19+
task_queue: 'activity-heartbeating-sample',
20+
activities: [ActivityHeartbeating::MyActivities::FakeProgress],
21+
workflows: [ActivityHeartbeating::MyWorkflow]
22+
)
23+
24+
# Run the worker until SIGINT
25+
puts 'Starting worker (ctrl+c to exit)'
26+
worker.run(shutdown_signals: ['SIGINT'])
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'activity_heartbeating/my_workflow'
5+
require 'securerandom'
6+
require 'temporalio/testing'
7+
require 'temporalio/worker'
8+
9+
module ActivityHeartbeating
10+
class MyWorkflowTest < Test
11+
def test_workflow
12+
# Run test server until completion of the block
13+
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
14+
# Run worker until completion of the block
15+
worker = Temporalio::Worker.new(
16+
client: env.client,
17+
task_queue: "tq-#{SecureRandom.uuid}",
18+
activities: [MyActivities::FakeProgress],
19+
workflows: [MyWorkflow]
20+
)
21+
worker.run do
22+
# Start workflow
23+
wf = env.client.start_workflow(MyWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue)
24+
25+
# Wait for activity to be scheduled
26+
sleep 0.3 until wf.describe.raw_description.pending_activities.any?
27+
28+
# Cancel workflow
29+
wf.cancel
30+
31+
# Check that it was cancelled
32+
assert_raises(Temporalio::Error::WorkflowFailedError, 'Workflow execution canceled') do
33+
wf.result
34+
end
35+
end
36+
end
37+
end
38+
end
39+
end

test/activity_simple/my_workflow_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
require 'temporalio/worker'
88

99
module ActivitySimple
10-
class ActivitySimpleTest < Test
10+
class MyWorkflowTest < Test
1111
# Demonstrates mocking out activities
1212
class MockSelectFromDatabase < Temporalio::Activity::Definition
1313
activity_name :SelectFromDatabase

0 commit comments

Comments
 (0)