Skip to content

Commit befdaaa

Browse files
authored
DSL sample (#52)
Closes #32
1 parent b4b66ce commit befdaaa

10 files changed

Lines changed: 363 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Prerequisites:
2222
[Coinbase Ruby SDK](https://github.com/coinbase/temporal-ruby).
2323
* [context_propagation](context_propagation) - Use interceptors to propagate thread/fiber local data from clients
2424
through workflows/activities.
25+
* [dsl](dsl) - Demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL.
2526
* [encryption](encryption) - Demonstrates how to make a codec for end-to-end encryption.
2627
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
2728
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.

dsl/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# DSL
2+
3+
This sample demonstrates having a workflow interpret/invoke arbitrary steps defined in a DSL. It is similar to the DSL
4+
samples in [TypeScript](https://github.com/temporalio/samples-typescript/tree/main/dsl-interpreter), in
5+
[Go](https://github.com/temporalio/samples-go/tree/main/dsl), and in
6+
[Python](https://github.com/temporalio/samples-python/tree/main/dsl).
7+
8+
To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker
9+
from this directory:
10+
11+
bundle exec ruby worker.rb
12+
13+
Now, in another terminal, run the following from this directory to execute a workflow of steps defined in
14+
[workflow1.yaml](workflow1.yaml):
15+
16+
bundle exec ruby starter.rb workflow1.yaml
17+
18+
This will run the workflow and show the final variables that the workflow returns. Looking in the worker terminal, each
19+
step executed will be visible.
20+
21+
Similarly we can do the same for the more advanced [workflow2.yaml](workflow2.yaml) file:
22+
23+
bundle exec ruby starter.rb workflow2.yaml
24+
25+
This sample gives a guide of how one can write a workflow to interpret arbitrary steps from a user-provided DSL. Many
26+
DSL models are more advanced and are more specific to conform to business logic needs.

dsl/activities.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
require_relative 'models'
5+
6+
module Dsl
7+
module Activities
8+
class Activity1 < Temporalio::Activity::Definition
9+
activity_name :activity1
10+
11+
def execute(arg)
12+
Temporalio::Activity::Context.current.logger.info("Executing activity1 with arg: #{arg}")
13+
"[result from activity1: #{arg}]"
14+
end
15+
end
16+
17+
class Activity2 < Temporalio::Activity::Definition
18+
activity_name :activity2
19+
20+
def execute(arg)
21+
Temporalio::Activity::Context.current.logger.info("Executing activity2 with arg: #{arg}")
22+
"[result from activity2: #{arg}]"
23+
end
24+
end
25+
26+
class Activity3 < Temporalio::Activity::Definition
27+
activity_name :activity3
28+
29+
def execute(arg1, arg2)
30+
Temporalio::Activity::Context.current.logger.info("Executing activity3 with args: #{arg1} and #{arg2}")
31+
"[result from activity3: #{arg1} #{arg2}]"
32+
end
33+
end
34+
35+
class Activity4 < Temporalio::Activity::Definition
36+
activity_name :activity4
37+
38+
def execute(arg)
39+
Temporalio::Activity::Context.current.logger.info("Executing activity4 with arg #{arg}")
40+
"[result from activity4: #{arg}]"
41+
end
42+
end
43+
44+
class Activity5 < Temporalio::Activity::Definition
45+
activity_name :activity5
46+
47+
def execute(arg1, arg2)
48+
Temporalio::Activity::Context.current.logger.info("Executing activity5 with args: #{arg1} and #{arg2}")
49+
"[result from activity5: #{arg1} #{arg2}]"
50+
end
51+
end
52+
end
53+
end

dsl/dsl_workflow.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'models'
5+
6+
module Dsl
7+
class DslWorkflow < Temporalio::Workflow::Definition
8+
def execute(input)
9+
# Run and return the final variable set
10+
Temporalio::Workflow.logger.info('Running DSL workflow')
11+
@variables = input.variables
12+
execute_statement(input.root)
13+
Temporalio::Workflow.logger.info('DSL workflow completed')
14+
@variables
15+
end
16+
17+
def execute_statement(stmt)
18+
case stmt
19+
when Models::Statement::Activity
20+
# Invoke activity loading arguments from variables and optionally storing result as a variable
21+
result = Temporalio::Workflow.execute_activity(
22+
stmt.name,
23+
*stmt.arguments.map { |a| @variables[a] },
24+
start_to_close_timeout: 60
25+
)
26+
@variables[stmt.result] = result if stmt.result
27+
when Models::Statement::Sequence
28+
# Execute each statement in order
29+
stmt.elements.each { |s| execute_statement(s) }
30+
when Models::Statement::Parallel
31+
# Execute all in parallel. Note, this will raise an exception when the first activity fails and will not cancel
32+
# the others. We could provide a linked Cancellation to each and cancel it on error if we wanted.
33+
Temporalio::Workflow::Future.all_of(
34+
*stmt.branches.map { |s| Temporalio::Workflow::Future.new { execute_statement(s) } }
35+
).wait
36+
end
37+
end
38+
end
39+
end

dsl/models.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# frozen_string_literal: true
2+
3+
require 'json/add/struct'
4+
require 'yaml'
5+
6+
module Dsl
7+
module Models
8+
Input = Struct.new(:root, :variables) do
9+
def self.from_yaml(yaml_str)
10+
from_h(YAML.load(yaml_str))
11+
end
12+
13+
def self.from_h(hash)
14+
new(
15+
root: Statement.from_h(hash['root'] || raise('Missing root')),
16+
variables: hash['variables'] || {}
17+
)
18+
end
19+
end
20+
21+
module Statement
22+
def self.from_h(hash)
23+
raise 'Expected single activity, sequence, or parallel field' unless hash.one?
24+
25+
type, sub_hash = hash.first
26+
case type
27+
when 'activity' then Activity.from_h(sub_hash)
28+
when 'sequence' then Sequence.from_h(sub_hash)
29+
when 'parallel' then Parallel.from_h(sub_hash)
30+
else raise 'Expected single activity, sequence, or parallel field'
31+
end
32+
end
33+
34+
Activity = Struct.new(:name, :arguments, :result) do
35+
def self.from_h(hash)
36+
new(name: hash['name'], arguments: hash['arguments'], result: hash['result'])
37+
end
38+
end
39+
40+
Sequence = Struct.new(:elements) do
41+
def self.from_h(hash)
42+
new(elements: hash['elements'].map { |e| Statement.from_h(e) })
43+
end
44+
end
45+
46+
Parallel = Struct.new(:branches) do
47+
def self.from_h(hash)
48+
new(branches: hash['branches'].map { |e| Statement.from_h(e) })
49+
end
50+
end
51+
end
52+
end
53+
end

dsl/starter.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'activities'
5+
require_relative 'models'
6+
require_relative 'dsl_workflow'
7+
8+
# Create a Temporal client
9+
logger = Logger.new($stdout, level: Logger::INFO)
10+
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)
11+
12+
# Load YAML file
13+
yaml_str = File.read(ARGV.first || raise('Missing argument for YAML file'))
14+
input = Dsl::Models::Input.from_yaml(yaml_str)
15+
16+
# Run workflow
17+
result = client.execute_workflow(
18+
Dsl::DslWorkflow, input,
19+
id: 'dsl-sample-workflow-id', task_queue: 'dsl-sample'
20+
)
21+
logger.info("Final variables: #{result}")

dsl/worker.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
require 'logger'
4+
require 'temporalio/client'
5+
require 'temporalio/worker'
6+
require_relative 'activities'
7+
require_relative 'dsl_workflow'
8+
9+
# Create a Temporal client
10+
logger = Logger.new($stdout, level: Logger::INFO)
11+
client = Temporalio::Client.connect('localhost:7233', 'default', logger:)
12+
13+
# Create worker with the activities and workflow
14+
worker = Temporalio::Worker.new(
15+
client:,
16+
task_queue: 'dsl-sample',
17+
activities: [Dsl::Activities::Activity1, Dsl::Activities::Activity2, Dsl::Activities::Activity3,
18+
Dsl::Activities::Activity4, Dsl::Activities::Activity5],
19+
workflows: [Dsl::DslWorkflow]
20+
)
21+
22+
# Run the worker until SIGINT
23+
logger.info('Starting worker (ctrl+c to exit)')
24+
worker.run(shutdown_signals: ['SIGINT'])

dsl/workflow1.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# This sample workflows execute 3 steps in sequence.
2+
# 1) Activity1, takes arg1 as input, and put result as result1.
3+
# 2) Activity2, takes result1 as input, and put result as result2.
4+
# 3) Activity3, takes args2 and result2 as input, and put result as result3.
5+
6+
variables:
7+
arg1: value1
8+
arg2: value2
9+
10+
root:
11+
sequence:
12+
elements:
13+
- activity:
14+
name: activity1
15+
arguments:
16+
- arg1
17+
result: result1
18+
- activity:
19+
name: activity2
20+
arguments:
21+
- result1
22+
result: result2
23+
- activity:
24+
name: activity3
25+
arguments:
26+
- arg2
27+
- result2
28+
result: result3

dsl/workflow2.yaml

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# This sample workflow executes 3 steps in sequence.
2+
# 1) activity1, takes arg1 as input, and put result as result1.
3+
# 2) it runs a parallel block which runs below sequence branches in parallel
4+
# 2.1) sequence 1
5+
# 2.1.1) activity2, takes result1 as input, and put result as result2
6+
# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3
7+
# 2.2) sequence 2
8+
# 2.2.1) activity4, takes result1 as input, and put result as result4
9+
# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5
10+
# 3) activity3, takes result3 and result5 as input, and put result as result6.
11+
12+
variables:
13+
arg1: value1
14+
arg2: value2
15+
arg3: value3
16+
17+
root:
18+
sequence:
19+
elements:
20+
- activity:
21+
name: activity1
22+
arguments:
23+
- arg1
24+
result: result1
25+
- parallel:
26+
branches:
27+
- sequence:
28+
elements:
29+
- activity:
30+
name: activity2
31+
arguments:
32+
- result1
33+
result: result2
34+
- activity:
35+
name: activity3
36+
arguments:
37+
- arg2
38+
- result2
39+
result: result3
40+
- sequence:
41+
elements:
42+
- activity:
43+
name: activity4
44+
arguments:
45+
- result1
46+
result: result4
47+
- activity:
48+
name: activity5
49+
arguments:
50+
- arg3
51+
- result4
52+
result: result5
53+
- activity:
54+
name: activity3
55+
arguments:
56+
- result3
57+
- result5
58+
result: result6

test/dsl/dsl_workflow_test.rb

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'dsl/activities'
5+
require 'dsl/models'
6+
require 'dsl/dsl_workflow'
7+
require 'securerandom'
8+
require 'temporalio/client'
9+
require 'temporalio/testing'
10+
require 'temporalio/worker'
11+
12+
module Dsl
13+
class DslWorkflowTest < Test
14+
def test_workflow
15+
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
16+
# Load the YAML from workflow2.yaml
17+
yaml_str = File.read(File.join(File.dirname(__FILE__), '../../dsl/workflow2.yaml'))
18+
# Run workflow in a worker
19+
worker = Temporalio::Worker.new(
20+
client: env.client,
21+
task_queue: "tq-#{SecureRandom.uuid}",
22+
activities: [Activities::Activity1, Activities::Activity2, Activities::Activity3,
23+
Activities::Activity4, Activities::Activity5],
24+
workflows: [DslWorkflow]
25+
)
26+
handle, result = worker.run do
27+
handle = env.client.start_workflow(
28+
DslWorkflow, Models::Input.from_yaml(yaml_str),
29+
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue
30+
)
31+
[handle, handle.result]
32+
end
33+
# Confirm expected variable results
34+
assert_equal(
35+
{
36+
'arg1' => 'value1',
37+
'arg2' => 'value2',
38+
'arg3' => 'value3',
39+
'result1' => '[result from activity1: value1]',
40+
'result2' => '[result from activity2: [result from activity1: value1]]',
41+
'result3' => '[result from activity3: value2 [result from activity2: [result from activity1: value1]]]',
42+
'result4' => '[result from activity4: [result from activity1: value1]]',
43+
'result5' => '[result from activity5: value3 [result from activity4: [result from activity1: value1]]]',
44+
'result6' => '[result from activity3: [result from activity3: value2 [result from activity2: ' \
45+
'[result from activity1: value1]]] [result from activity5: ' \
46+
'value3 [result from activity4: [result from activity1: value1]]]]'
47+
},
48+
result
49+
)
50+
# Collect all activity events and confirm they are in order expected
51+
activity_names = handle.fetch_history_events
52+
.map { |e| e.activity_task_scheduled_event_attributes&.activity_type&.name }
53+
.compact
54+
assert_equal 'activity1', activity_names[0]
55+
assert_equal %w[activity2 activity3 activity4 activity5], activity_names[1, 4].sort
56+
assert_equal 'activity3', activity_names[5]
57+
end
58+
end
59+
end
60+
end

0 commit comments

Comments
 (0)