-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.go
More file actions
122 lines (108 loc) · 3.65 KB
/
task.go
File metadata and controls
122 lines (108 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package pgqueue
import (
"encoding/json"
"fmt"
"time"
)
// TaskStatus represents the lifecycle state of a task.
type TaskStatus uint8
const (
TaskStatusPending TaskStatus = 0
TaskStatusRunning TaskStatus = 1
TaskStatusFailed TaskStatus = 2
TaskStatusCompleted TaskStatus = 3
TaskStatusDisabled TaskStatus = 4 // quarantine — requires operator intervention
maxTaskStatus = TaskStatusDisabled
)
var taskStatusNames = [5]string{"pending", "running", "failed", "completed", "disabled"}
func (s TaskStatus) String() string {
if int(s) < len(taskStatusNames) {
return taskStatusNames[s]
}
return fmt.Sprintf("TaskStatus(%d)", s)
}
func (s TaskStatus) MarshalText() ([]byte, error) {
return []byte(s.String()), nil
}
func (s *TaskStatus) UnmarshalText(b []byte) error {
for i, name := range taskStatusNames {
if string(b) == name {
*s = TaskStatus(i)
return nil
}
}
return fmt.Errorf("pgqueue: unknown task status %q", string(b))
}
// Task is the database record stored in pgqueue_tasks.
type Task struct {
ID int64 `db:"id,omitempty"`
Queue string `db:"queue"`
Status TaskStatus `db:"status"`
Try int16 `db:"try"`
RunAt time.Time `db:"run_at,omitempty"`
LastRanAt time.Time `db:"last_ran_at,omitempty"`
CreatedAt time.Time `db:"created_at,omitempty"`
ClaimedAt *time.Time `db:"claimed_at"`
LeaseUntil *time.Time `db:"lease_until"`
ClaimToken *string `db:"claim_token"`
Payload []byte `db:"payload"`
Hash *string `db:"hash"`
}
// GetID implements pgkit.Record.
func (t *Task) GetID() int64 { return t.ID }
// Validate implements pgkit.Record. Enforces DB constraints and record invariants.
func (t *Task) Validate() error {
if t.Queue == "" {
return fmt.Errorf("pgqueue: task queue must not be empty")
}
if len(t.Queue) > 128 {
return fmt.Errorf("pgqueue: task queue exceeds 128 characters")
}
if t.Status > maxTaskStatus {
return fmt.Errorf("pgqueue: invalid task status %d", t.Status)
}
if t.Try < 0 {
return fmt.Errorf("pgqueue: task try must not be negative")
}
if t.Hash != nil && len(*t.Hash) > 128 {
return fmt.Errorf("pgqueue: task hash exceeds 128 characters")
}
if t.ClaimToken != nil && len(*t.ClaimToken) > 36 {
return fmt.Errorf("pgqueue: task claim_token exceeds 36 characters")
}
// Claim tuple consistency: all-or-none for ClaimedAt, LeaseUntil, ClaimToken.
claimSet := (t.ClaimedAt != nil) || (t.LeaseUntil != nil) || (t.ClaimToken != nil)
claimAll := (t.ClaimedAt != nil) && (t.LeaseUntil != nil) && (t.ClaimToken != nil)
if claimSet && !claimAll {
return fmt.Errorf("pgqueue: claim fields (claimed_at, lease_until, claim_token) must be all set or all nil")
}
if t.Status == TaskStatusRunning && !claimAll {
return fmt.Errorf("pgqueue: running task must have claim fields set")
}
if t.Status != TaskStatusRunning && claimSet {
return fmt.Errorf("pgqueue: non-running task must not have claim fields set")
}
if len(t.Payload) > 0 && !json.Valid(t.Payload) {
return fmt.Errorf("pgqueue: task payload is not valid JSON")
}
return nil
}
// SetCreatedAt implements pgkit.HasSetCreatedAt.
func (t *Task) SetCreatedAt(v time.Time) { t.CreatedAt = v }
// DecodePayload unmarshals the task's JSON payload into dst.
// Returns nil without modifying dst if the payload is empty.
func (t *Task) DecodePayload(dst any) error {
if len(t.Payload) == 0 {
return nil
}
return json.Unmarshal(t.Payload, dst)
}
// SetPayload marshals src to JSON and stores it as the task's payload.
func (t *Task) SetPayload(src any) error {
b, err := json.Marshal(src)
if err != nil {
return fmt.Errorf("pgqueue: marshal payload: %w", err)
}
t.Payload = b
return nil
}