-
Notifications
You must be signed in to change notification settings - Fork 152
Expand file tree
/
Copy pathresumable_step_tx.go
More file actions
124 lines (107 loc) · 4.55 KB
/
resumable_step_tx.go
File metadata and controls
124 lines (107 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package river
import (
"context"
"encoding/json"
"errors"
"github.com/riverqueue/river/internal/execution"
"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)
// ResumableSetStepTx immediately persists the current resumable step as
// part of transaction tx. If tx is rolled back, the step update will be as
// well.
//
// Normally, a resumable job's step progress is recorded after it runs along
// with its result status. This is normally sufficient, but because it happens
// out-of-transaction, there's a chance that it doesn't happen in case of panic
// or other abrupt termination. This function is useful in cases where a
// resumable worker needs a guarantee of a checkpoint being recorded durably, at
// the cost of an extra database operation.
//
// Must be called from within a ResumableStep or ResumableStepCursor callback.
// The current step name to persist is read from context.
func ResumableSetStepTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error) {
return resumableSetStepTx(ctx, tx, job, nil)
}
// ResumableSetStepCursorTx immediately persists the current resumable step and
// cursor as part of transaction tx. If tx is rolled back, the step and cursor
// update will be as well.
//
// Normally, a resumable job's step progress is recorded after it runs along
// with its result status. This is normally sufficient, but because it happens
// out-of-transaction, there's a chance that it doesn't happen in case of panic
// or other abrupt termination. This function is useful in cases where a
// resumable worker needs a guarantee of a checkpoint being recorded durably, at
// the cost of an extra database operation.
//
// Must be called from within a ResumableStepCursor callback. The current step
// name to persist is read from context.
func ResumableSetStepCursorTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs, TCursor any](ctx context.Context, tx TTx, job *Job[TArgs], cursor TCursor) (*Job[TArgs], error) {
cursorBytes, err := json.Marshal(cursor)
if err != nil {
return nil, err
}
return resumableSetStepTx(ctx, tx, job, json.RawMessage(cursorBytes))
}
func resumableSetStepTx[TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs], cursor json.RawMessage) (*Job[TArgs], error) {
if job.State != rivertype.JobStateRunning {
return nil, errors.New("job must be running")
}
state, ok := resumableStateFromContext(ctx)
if !ok {
return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor")
}
if state.StepName == "" {
return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor")
}
step := state.StepName
client := ClientFromContext[TTx](ctx)
if client == nil {
return nil, errors.New("client not found in context, can only work within a River worker")
}
metadataUpdates := map[string]any{
rivercommon.MetadataKeyResumableStep: step,
}
state.CompletedStep = step
if cursor != nil {
if state.Cursors == nil {
state.Cursors = make(map[string]json.RawMessage)
}
state.Cursors[step] = cursor
}
if len(state.Cursors) > 0 {
metadataUpdates[rivercommon.MetadataKeyResumableCursor] = state.Cursors
}
workMetadataUpdates, hasWorkMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if hasWorkMetadataUpdates {
workMetadataUpdates[rivercommon.MetadataKeyResumableStep] = step
if resumableCursorMetadata, ok := metadataUpdates[rivercommon.MetadataKeyResumableCursor]; ok {
workMetadataUpdates[rivercommon.MetadataKeyResumableCursor] = resumableCursorMetadata
}
}
metadataUpdatesBytes, err := json.Marshal(metadataUpdates)
if err != nil {
return nil, err
}
updatedJob, err := client.Driver().UnwrapExecutor(tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{
ID: job.ID,
MetadataDoMerge: true,
Metadata: metadataUpdatesBytes,
Schema: client.config.Schema,
})
if err != nil {
if errors.Is(err, rivertype.ErrNotFound) {
if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker {
panic("to use ResumableSetStepTx or ResumableSetStepCursorTx in a rivertest.Worker, the job must be inserted into the database first")
}
}
return nil, err
}
result := &Job[TArgs]{JobRow: updatedJob}
if err := json.Unmarshal(result.EncodedArgs, &result.Args); err != nil {
return nil, err
}
return result, nil
}