-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadd_job.sql
More file actions
106 lines (96 loc) · 2.41 KB
/
add_job.sql
File metadata and controls
106 lines (96 loc) · 2.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
-- Deploy schemas/app_jobs/procedures/add_job to pg
-- requires: schemas/app_jobs/schema
-- requires: schemas/app_jobs/tables/jobs/table
-- requires: schemas/app_jobs/tables/job_queues/table
BEGIN;
CREATE FUNCTION app_jobs.add_job (
db_id uuid,
identifier text,
payload json DEFAULT '{}' ::json,
job_key text DEFAULT NULL,
queue_name text DEFAULT NULL,
run_at timestamptz DEFAULT now(),
max_attempts integer DEFAULT 25,
priority integer DEFAULT 0
)
RETURNS app_jobs.jobs
AS $$
DECLARE
v_job app_jobs.jobs;
BEGIN
-- Bake actor_id into payload
payload := (coalesce(payload, '{}'::json)::jsonb || jsonb_build_object('actor_id', jwt_public.current_user_id()))::json;
IF job_key IS NOT NULL THEN
-- Upsert job
INSERT INTO app_jobs.jobs (
database_id,
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority
) VALUES (
db_id,
identifier,
coalesce(payload,
'{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0)
)
ON CONFLICT (key)
DO UPDATE SET
task_identifier = EXCLUDED.task_identifier,
payload = EXCLUDED.payload,
queue_name = EXCLUDED.queue_name,
max_attempts = EXCLUDED.max_attempts,
run_at = EXCLUDED.run_at,
priority = EXCLUDED.priority,
-- always reset error/retry state
attempts = 0, last_error = NULL
WHERE
jobs.locked_at IS NULL
RETURNING
* INTO v_job;
-- If upsert succeeded (insert or update), return early
IF NOT (v_job IS NULL) THEN
RETURN v_job;
END IF;
-- Upsert failed -> there must be an existing job that is locked. Remove
-- existing key to allow a new one to be inserted, and prevent any
-- subsequent retries by bumping attempts to the max allowed.
UPDATE
app_jobs.jobs
SET
KEY = NULL,
attempts = jobs.max_attempts
WHERE
KEY = job_key;
END IF;
INSERT INTO app_jobs.jobs (
database_id,
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
priority
) VALUES (
db_id,
identifier,
payload,
queue_name,
run_at,
max_attempts,
priority
)
RETURNING * INTO v_job;
RETURN v_job;
END;
$$
LANGUAGE 'plpgsql' VOLATILE SECURITY DEFINER;
COMMIT;