Skip to content

Commit 541f934

Browse files
authored
Worker specific task queues sample. (#34)
Closes #19
1 parent 4621d03 commit 541f934

8 files changed

Lines changed: 311 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Prerequisites:
2727
through workflows/activities.
2828
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.
2929
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
30+
* [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker.
3031

3132
## Development
3233

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'worker_specific_task_queues/file_processing_workflow'
5+
require 'worker_specific_task_queues/normal_activities'
6+
require 'securerandom'
7+
require 'temporalio/testing'
8+
require 'temporalio/worker'
9+
10+
module WorkerSpecificTaskQueues
11+
class FileProcessingWorkflowTest < Test
12+
class DownloadFileActivity < Temporalio::Activity::Definition
13+
def execute(url)
14+
url
15+
end
16+
end
17+
18+
class WorkOnFileActivity < Temporalio::Activity::Definition
19+
@failed = false
20+
21+
class << self
22+
attr_accessor :failed
23+
end
24+
25+
def execute(_file_path)
26+
# This should fail the first time it's called
27+
unless self.class.failed
28+
self.class.failed = true
29+
raise 'Fake failure'
30+
end
31+
32+
nil
33+
end
34+
end
35+
36+
class CleanupFileActivity < Temporalio::Activity::Definition
37+
def execute(_file_path)
38+
nil
39+
end
40+
end
41+
42+
def test_workflow
43+
# Run test server until completion of the block
44+
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
45+
unique_task_queue = "tq-#{SecureRandom.uuid}"
46+
unique_worker = Temporalio::Worker.new(
47+
client: env.client,
48+
task_queue: unique_task_queue,
49+
activities: [WorkerSpecificActivities::DownloadFileActivity, WorkerSpecificActivities::WorkOnFileActivity,
50+
WorkerSpecificActivities::CleanupFileActivity]
51+
)
52+
53+
# Run worker until completion of the block
54+
worker = Temporalio::Worker.new(
55+
client: env.client,
56+
task_queue: "tq-#{SecureRandom.uuid}",
57+
activities: [WorkerSpecificTaskQueues::NormalActivities::GetUniqueTaskQueueActivity.new(unique_task_queue)],
58+
workflows: [WorkerSpecificTaskQueues::FileProcessingWorkflow]
59+
)
60+
unique_worker.run do
61+
worker.run do
62+
# Run workflow
63+
env.client.execute_workflow(WorkerSpecificTaskQueues::FileProcessingWorkflow, 2,
64+
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue)
65+
end
66+
end
67+
end
68+
end
69+
end
70+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Worker-Specific Task Queues
2+
3+
Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker.
4+
5+
This is useful in scenarios where multiple Activities need to run in the same process or on the same host, for example to share memory or disk. This sample has a file processing Workflow, where one Activity downloads the file to disk and other Activities process it and clean it up.
6+
7+
This strategy is:
8+
9+
- Each Worker process runs two workers:
10+
- One worker listens on the shared `worker-specific-task-queues-sample` Task Queue.
11+
- Another worker listens on a uniquely generated Task Queue.
12+
- Create a `GetUniqueTaskQueue` Activity that returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). It doesn't matter where this Activity is run, so it can be executed on the shared Task Queue. In this sample, the unique Task Queue is simply a UUID, but you can inject smart logic here to uniquely identify the Worker.
13+
- The Workflow and the first Activity are run on the shared `worker-specific-task-queues-sample` Task Queue. The rest of the Activities that do the file processing are run on the Worker-specific Task Queue.
14+
15+
Activities have been artificially slowed with `sleep(3)` to simulate slow activities.
16+
17+
### Running this sample
18+
19+
1. Make sure Temporal Server is running locally (see [temporalio/docker-compose](https://github.com/temporalio/docker-compose))
20+
2. Run the following to start the worker:
21+
```
22+
bundle exec ruby worker.rb
23+
```
24+
25+
3. In another terminal, run the workflow:
26+
```
27+
bundle exec ruby run_workflow.rb
28+
```
29+
30+
You should see output in the worker terminal showing the file being downloaded, processed, and cleaned up.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'normal_activities'
4+
require_relative 'worker_specific_activities'
5+
require 'temporalio/workflow'
6+
7+
module WorkerSpecificTaskQueues
8+
class FileProcessingWorkflow < Temporalio::Workflow::Definition
9+
def execute(max_attempts)
10+
attempt = 0
11+
12+
loop do
13+
attempt += 1
14+
begin
15+
process_file
16+
return
17+
rescue StandardError => e
18+
# If it's at max attempts, re-raise to fail the workflow
19+
if attempt >= max_attempts
20+
Temporalio::Workflow.logger.error(
21+
"File processing failed and reached #{attempt} attempts, failing workflow: #{e.message}"
22+
)
23+
raise
24+
end
25+
# Otherwise, just warn and continue
26+
Temporalio::Workflow.logger.warn(
27+
"File processing failed on attempt #{attempt}, trying again: #{e.message}"
28+
)
29+
end
30+
end
31+
end
32+
33+
private
34+
35+
def process_file
36+
# Get a unique task queue from any worker
37+
unique_worker_task_queue = Temporalio::Workflow.execute_activity(
38+
NormalActivities::GetUniqueTaskQueueActivity,
39+
start_to_close_timeout: 60
40+
)
41+
42+
# Download the file on the specific worker
43+
download_path = Temporalio::Workflow.execute_activity(
44+
WorkerSpecificActivities::DownloadFileActivity,
45+
'https://temporal.io',
46+
task_queue: unique_worker_task_queue,
47+
schedule_to_close_timeout: 300,
48+
heartbeat_timeout: 60
49+
)
50+
51+
# Process the file on the same worker
52+
Temporalio::Workflow.execute_activity(
53+
WorkerSpecificActivities::WorkOnFileActivity,
54+
download_path,
55+
task_queue: unique_worker_task_queue,
56+
schedule_to_close_timeout: 300,
57+
heartbeat_timeout: 60
58+
)
59+
60+
# Clean up the file on the same worker
61+
Temporalio::Workflow.execute_activity(
62+
WorkerSpecificActivities::CleanupFileActivity,
63+
download_path,
64+
task_queue: unique_worker_task_queue,
65+
schedule_to_close_timeout: 300,
66+
heartbeat_timeout: 60
67+
)
68+
end
69+
end
70+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
5+
module WorkerSpecificTaskQueues
6+
module NormalActivities
7+
class GetUniqueTaskQueueActivity < Temporalio::Activity::Definition
8+
def initialize(unique_task_queue)
9+
@unique_task_queue = unique_task_queue
10+
end
11+
12+
def execute
13+
# Return the known worker-specific task queue
14+
# that was provided during initialization
15+
@unique_task_queue
16+
end
17+
end
18+
end
19+
end
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'file_processing_workflow'
4+
require 'logger'
5+
require 'temporalio/client'
6+
7+
# Create client with logger
8+
client = Temporalio::Client.connect(
9+
'localhost:7233',
10+
'default',
11+
logger: Logger.new($stdout, level: Logger::INFO)
12+
)
13+
14+
# Run workflow
15+
client.execute_workflow(
16+
WorkerSpecificTaskQueues::FileProcessingWorkflow,
17+
3, # max_attempts
18+
id: 'file-processing-workflow',
19+
task_queue: 'worker-specific-task-queues-sample'
20+
)
21+
22+
puts 'Workflow completed successfully'
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'file_processing_workflow'
4+
require_relative 'normal_activities'
5+
require_relative 'worker_specific_activities'
6+
require 'logger'
7+
require 'securerandom'
8+
require 'temporalio/client'
9+
require 'temporalio/worker'
10+
11+
# Create client with logger
12+
client = Temporalio::Client.connect(
13+
'localhost:7233',
14+
'default',
15+
logger: Logger.new($stdout, level: Logger::INFO)
16+
)
17+
18+
# Create a unique task queue for this worker
19+
unique_task_queue = SecureRandom.uuid
20+
21+
# Create worker for shared task queue
22+
shared_worker = Temporalio::Worker.new(
23+
client: client,
24+
task_queue: 'worker-specific-task-queues-sample',
25+
workflows: [WorkerSpecificTaskQueues::FileProcessingWorkflow],
26+
activities: [
27+
# Pass the unique task queue to the activity so it can return it when called
28+
WorkerSpecificTaskQueues::NormalActivities::GetUniqueTaskQueueActivity.new(unique_task_queue)
29+
]
30+
)
31+
32+
# Create worker for unique task queue
33+
unique_worker = Temporalio::Worker.new(
34+
client: client,
35+
task_queue: unique_task_queue,
36+
activities: [
37+
WorkerSpecificTaskQueues::WorkerSpecificActivities::DownloadFileActivity.new,
38+
WorkerSpecificTaskQueues::WorkerSpecificActivities::WorkOnFileActivity.new,
39+
WorkerSpecificTaskQueues::WorkerSpecificActivities::CleanupFileActivity.new
40+
]
41+
)
42+
43+
puts "Running worker with unique task queue: #{unique_task_queue}"
44+
45+
# Run both workers using run_all for concurrent execution
46+
# The workers need to be passed as separate arguments, not as an array
47+
Temporalio::Worker.run_all(shared_worker, unique_worker, shutdown_signals: ['SIGINT'])
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# frozen_string_literal: true
2+
3+
require 'digest'
4+
require 'logger'
5+
require 'net/http'
6+
require 'tempfile'
7+
require 'temporalio/activity'
8+
require 'uri'
9+
10+
module WorkerSpecificTaskQueues
11+
module WorkerSpecificActivities
12+
class DownloadFileActivity < Temporalio::Activity::Definition
13+
def execute(url)
14+
# Simulate slow activity
15+
sleep(3)
16+
17+
file = Tempfile.create
18+
19+
Temporalio::Activity::Context.current.logger.info("Downloading #{url} to #{file.path}")
20+
file.write(Net::HTTP.get(URI(url)))
21+
file.close
22+
23+
file.path
24+
end
25+
end
26+
27+
class WorkOnFileActivity < Temporalio::Activity::Definition
28+
def execute(file_path)
29+
# Simulate slow activity
30+
sleep(3)
31+
32+
# Calculate checksum to simulate work
33+
checksum = Digest::SHA256.file(file_path).hexdigest
34+
Temporalio::Activity::Context.current.logger.info("Did some work on #{file_path}, checksum: #{checksum}")
35+
36+
nil
37+
end
38+
end
39+
40+
class CleanupFileActivity < Temporalio::Activity::Definition
41+
def execute(file_path)
42+
# Simulate slow activity
43+
sleep(3)
44+
45+
Temporalio::Activity::Context.current.logger.info("Removing #{file_path}")
46+
File.unlink(file_path)
47+
48+
nil
49+
end
50+
end
51+
end
52+
end

0 commit comments

Comments
 (0)