Skip to content

Commit 2489e17

Browse files
Updatable timer sample (#54)
* add updatable timer sample * pr feedback * add top level README mention
1 parent befdaaa commit 2489e17

8 files changed

Lines changed: 200 additions & 1 deletion

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ Prerequisites:
3030
* [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities.
3131
* [saga](saga) - Using undo/compensation using a very simplistic Saga pattern.
3232
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
33+
* [updatable_timer](updatable_timer) - Demonstrates a blocking sleep that can be updated.
3334
* [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker.
3435
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
3536

3637
## Development
3738

3839
To check format and test this repository, run:
3940

40-
bundle exec rake
41+
bundle exec rake
42+
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'updatable_timer/updatable_timer_workflow'
5+
require 'securerandom'
6+
require 'temporalio/client'
7+
require 'temporalio/testing'
8+
require 'temporalio/worker'
9+
10+
module UpdatableTimer
11+
class UpdatableTimerWorkflowTest < Test
12+
def test_workflow
13+
Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env|
14+
# Run workflow in a worker
15+
worker = Temporalio::Worker.new(
16+
client: env.client,
17+
task_queue: "tq-#{SecureRandom.uuid}",
18+
workflows: [UpdatableTimerWorkflow]
19+
)
20+
handle = worker.run do
21+
day_from_now = (Time.now(in: 'utc') + (24 * 60 * 60)).to_r
22+
hour_from_now = (Time.now(in: 'utc') + (60 * 60)).to_r
23+
handle = env.client.start_workflow(
24+
UpdatableTimerWorkflow, day_from_now,
25+
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue
26+
)
27+
assert_equal day_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time))
28+
env.sleep(10)
29+
handle.signal(UpdatableTimerWorkflow.update_wake_up_time, hour_from_now)
30+
assert_equal hour_from_now, Rational(handle.query(UpdatableTimerWorkflow.wake_up_time))
31+
32+
handle.result
33+
handle
34+
end
35+
timer_events = handle.fetch_history_events.filter_map do |e|
36+
timer_id = (e.timer_started_event_attributes ||
37+
e.timer_canceled_event_attributes ||
38+
e.timer_fired_event_attributes)&.timer_id
39+
[e.event_type, timer_id] if timer_id
40+
end.compact
41+
assert_equal(
42+
[[:EVENT_TYPE_TIMER_STARTED, '1'], [:EVENT_TYPE_TIMER_CANCELED, '1'], [:EVENT_TYPE_TIMER_STARTED, '2'],
43+
[:EVENT_TYPE_TIMER_FIRED, '2']],
44+
timer_events
45+
)
46+
end
47+
end
48+
end
49+
end

updatable_timer/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Updatable Timer Sample
2+
3+
Demonstrates a helper class which relies on `Temporalio::Workflow.wait_condition` to implement a blocking sleep that can be updated at any moment.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker from this directory:
6+
7+
```bash
8+
bundle exec ruby worker.rb
9+
```
10+
11+
Then in another terminal, use the Ruby client to the workflow from this directory:
12+
13+
```bash
14+
bundle exec ruby starter.rb
15+
```
16+
17+
The Ruby code will invoke the workflow which will create a timer that will resolve in a day.
18+
19+
Finally in a third terminal, run the updater to change the timer to 10 seconds from now:
20+
21+
```bash
22+
bundle exec ruby wake_up_timer_updater.rb
23+
```
24+
25+
There is also a [test](../test/updatable_timer/updatable_timer_workflow_test.rb) that demonstrates querying the wake up time.

updatable_timer/starter.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'updatable_timer_workflow'
5+
6+
# Create a Temporal client
7+
logger = Logger.new($stdout, level: Logger::INFO)
8+
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)
9+
10+
# Run workflow
11+
logger.info('Starting workflow')
12+
client.execute_workflow(
13+
UpdatableTimer::UpdatableTimerWorkflow, (Time.now(in: 'utc') + (24 * 60 * 60)).to_r,
14+
id: 'updatable-timer-sample-workflow-id', task_queue: 'updatable-timer-sample'
15+
)
16+
logger.info('Workflow complete')

updatable_timer/updatable_timer.rb

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
5+
module UpdatableTimer
6+
class UpdatableTimer
7+
def initialize(wake_up_time)
8+
@wake_up_time = wake_up_time
9+
end
10+
11+
attr_reader :wake_up_time
12+
13+
def wake_up_time=(wake_up_time)
14+
Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}")
15+
@wake_up_time = wake_up_time
16+
@wake_up_time_updated = true
17+
end
18+
19+
def sleep
20+
Temporalio::Workflow.logger.info("sleep until: #{@wake_up_time}")
21+
loop do
22+
now = Temporalio::Workflow.now
23+
sleep_interval = @wake_up_time - now
24+
25+
break if sleep_interval.negative?
26+
27+
Temporalio::Workflow.logger.info("going to sleep for: #{sleep_interval}")
28+
29+
begin
30+
@wake_up_time_updated = false
31+
Temporalio::Workflow.timeout(sleep_interval) do
32+
Temporalio::Workflow.wait_condition { @wake_up_time_updated }
33+
end
34+
rescue Timeout::Error
35+
break
36+
end
37+
end
38+
Temporalio::Workflow.logger.info('sleep_until completed')
39+
end
40+
end
41+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'updatable_timer'
5+
6+
module UpdatableTimer
7+
class UpdatableTimerWorkflow < Temporalio::Workflow::Definition
8+
workflow_init
9+
def initialize(wake_up_time)
10+
@timer = UpdatableTimer.new(Time.at(Rational(wake_up_time)))
11+
end
12+
13+
def execute(_wake_up_time)
14+
@timer.sleep
15+
end
16+
17+
workflow_query
18+
def wake_up_time
19+
Temporalio::Workflow.logger.info('get_wake_up_time')
20+
@timer.wake_up_time.to_r
21+
end
22+
23+
workflow_signal
24+
def update_wake_up_time(wake_up_time)
25+
wake_up_time = Time.at(Rational(wake_up_time))
26+
Temporalio::Workflow.logger.info("update_wake_up_time: #{wake_up_time}")
27+
@timer.wake_up_time = wake_up_time
28+
end
29+
end
30+
end
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'updatable_timer_workflow'
5+
6+
# Create a Temporal client
7+
logger = Logger.new($stdout, level: Logger::INFO)
8+
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)
9+
handle = client.workflow_handle('updatable-timer-sample-workflow-id')
10+
11+
handle.signal(UpdatableTimer::UpdatableTimerWorkflow.update_wake_up_time, (Time.now(in: 'utc') + 10).to_r)
12+
logger.info('Updated wake up time to 10 seconds from now')

updatable_timer/worker.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'updatable_timer_workflow'
4+
require 'logger'
5+
require 'temporalio/client'
6+
require 'temporalio/worker'
7+
8+
# Create a Temporal client
9+
client = Temporalio::Client.connect(
10+
'localhost:7233',
11+
'default',
12+
logger: Logger.new($stdout, level: Logger::INFO)
13+
)
14+
15+
# Create worker with the activities and workflow
16+
worker = Temporalio::Worker.new(
17+
client:,
18+
task_queue: 'updatable-timer-sample',
19+
workflows: [UpdatableTimer::UpdatableTimerWorkflow]
20+
)
21+
22+
# Run the worker until SIGINT
23+
puts 'Starting worker (ctrl+c to exit)'
24+
worker.run(shutdown_signals: ['SIGINT'])

0 commit comments

Comments
 (0)