-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathreplay_tester.rb
More file actions
77 lines (69 loc) · 2.78 KB
/
replay_tester.rb
File metadata and controls
77 lines (69 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
require "json"
require "temporal/errors"
require "temporal/metadata/workflow_task"
require "temporal/middleware/chain"
require "temporal/workflow/executor"
require "temporal/workflow/stack_trace_tracker"
# Only require protos if not disabled
unless ENV['COINBASE_TEMPORAL_RUBY_DISABLE_PROTO_LOAD'] == '1'
require "gen/temporal/api/history/v1/message_pb"
end
module Temporal
module Testing
class ReplayError < StandardError
end
class ReplayTester
def initialize(config: Temporal.configuration)
@config = config
end
attr_reader :config
# Runs a replay test by using the specific Temporal::Workflow::History object. Instances of these objects
# can be obtained using various from_ methods in Temporal::Workflow::History::Serialization.
#
# If the replay test succeeds, the method will return silently. If the replay tests fails, an error will be raised.
def replay_history(workflow_class, history)
# This code roughly resembles the workflow TaskProcessor but with history being fed in rather
# than being pulled via a workflow task, no query support, no metrics, and other
# simplifications. Fake metadata needs to be provided.
start_workflow_event = history.find_event_by_id(1)
if start_workflow_event.nil? || start_workflow_event.type != "WORKFLOW_EXECUTION_STARTED"
raise ReplayError, "History does not start with workflow_execution_started event"
end
metadata = Temporal::Metadata::WorkflowTask.new(
namespace: config.namespace,
id: 1,
task_token: "",
attempt: 1,
workflow_run_id: "run_id",
workflow_id: "workflow_id",
# Protobuf deserialization will ensure this tree is present
workflow_name: start_workflow_event.attributes.workflow_type.name
)
executor = Workflow::Executor.new(
workflow_class,
history,
metadata,
config,
true,
Middleware::Chain.new([])
)
begin
executor.run
rescue StandardError
query = Struct.new(:query_type, :query_args).new(
Temporal::Workflow::StackTraceTracker::STACK_TRACE_QUERY_NAME,
nil
)
query_result = executor.process_queries(
{"stack_trace" => query}
)
replay_error = ReplayError.new("Workflow code failed to replay successfully against history")
# Override the stack trace to the point in the workflow code where the failure occured, not the
# point in the StateManager where non-determinism is detected
replay_error.set_backtrace("Fiber backtraces: #{query_result["stack_trace"].result}")
raise replay_error
end
end
end
end
end