-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpgpm-database-jobs--0.15.3.sql
More file actions
805 lines (721 loc) · 22.7 KB
/
pgpm-database-jobs--0.15.3.sql
File metadata and controls
805 lines (721 loc) · 22.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
\echo Use "CREATE EXTENSION pgpm-database-jobs" to load this file. \quit
CREATE SCHEMA IF NOT EXISTS app_jobs;
GRANT USAGE ON SCHEMA app_jobs TO administrator;
ALTER DEFAULT PRIVILEGES IN SCHEMA app_jobs
GRANT EXECUTE ON FUNCTIONS TO administrator;
CREATE FUNCTION app_jobs.tg_update_timestamps() RETURNS trigger AS $EOFCODE$
BEGIN
IF TG_OP = 'INSERT' THEN
NEW.created_at = NOW();
NEW.updated_at = NOW();
ELSIF TG_OP = 'UPDATE' THEN
NEW.created_at = OLD.created_at;
NEW.updated_at = greatest (now(), OLD.updated_at + interval '1 millisecond');
END IF;
RETURN NEW;
END;
$EOFCODE$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.tg_add_job_with_row_id() RETURNS trigger AS $EOFCODE$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
PERFORM
app_jobs.add_job (jwt_private.current_database_id(), tg_argv[0], json_build_object('id', NEW.id));
RETURN NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
PERFORM
app_jobs.add_job (jwt_private.current_database_id(), tg_argv[0], json_build_object('id', OLD.id));
RETURN OLD;
END IF;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
COMMENT ON FUNCTION app_jobs.tg_add_job_with_row_id IS 'Useful shortcut to create a job on insert or update. Pass the task name as the trigger argument, and the record id will automatically be available on the JSON payload.';
CREATE FUNCTION app_jobs.tg_add_job_with_row() RETURNS trigger AS $EOFCODE$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
PERFORM
app_jobs.add_job (jwt_private.current_database_id(), TG_ARGV[0], to_json(NEW));
RETURN NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
PERFORM
app_jobs.add_job (jwt_private.current_database_id(), TG_ARGV[0], to_json(OLD));
RETURN OLD;
END IF;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
COMMENT ON FUNCTION app_jobs.tg_add_job_with_row IS 'Useful shortcut to create a job on insert or update. Pass the task name as the trigger argument, and the record data will automatically be available on the JSON payload.';
CREATE FUNCTION app_jobs.json_build_object_apply(arguments text[]) RETURNS pg_catalog.json AS $EOFCODE$
DECLARE
arg text;
_sql text;
_res json;
args text[];
BEGIN
_sql = 'SELECT json_build_object(';
FOR arg IN
SELECT
unnest(arguments)
LOOP
args = array_append(args, format('''%s''', arg));
END LOOP;
_sql = _sql || format('%s);', array_to_string(args, ','));
EXECUTE _sql INTO _res;
RETURN _res;
END;
$EOFCODE$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.trigger_job_with_fields() RETURNS trigger AS $EOFCODE$
DECLARE
arg text;
fn text;
i int;
args text[];
BEGIN
FOR i IN
SELECT
*
FROM
generate_series(1, TG_NARGS) g (i)
LOOP
IF (i = 1) THEN
fn = TG_ARGV[i - 1];
ELSE
args = array_append(args, TG_ARGV[i - 1]);
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
EXECUTE format('SELECT ($1).%s::text', TG_ARGV[i - 1])
USING NEW INTO arg;
END IF;
IF (TG_OP = 'DELETE') THEN
EXECUTE format('SELECT ($1).%s::text', TG_ARGV[i - 1])
USING OLD INTO arg;
END IF;
args = array_append(args, arg);
END IF;
END LOOP;
PERFORM
app_jobs.add_job (jwt_private.current_database_id(), fn, app_jobs.json_build_object_apply (args));
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
RETURN NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
CREATE TABLE app_jobs.scheduled_jobs (
id bigserial PRIMARY KEY,
database_id uuid NOT NULL,
queue_name text DEFAULT (public.gen_random_uuid())::text,
task_identifier text NOT NULL,
payload pg_catalog.json DEFAULT '{}'::json NOT NULL,
priority int DEFAULT 0 NOT NULL,
max_attempts int DEFAULT 25 NOT NULL,
key text,
locked_at timestamptz,
locked_by text,
schedule_info pg_catalog.json NOT NULL,
last_scheduled timestamptz,
last_scheduled_id bigint,
CHECK (length(key) < 513),
CHECK (length(task_identifier) < 127),
CHECK (max_attempts > 0),
CHECK (length(queue_name) < 127),
CHECK (length(locked_by) > 3),
UNIQUE (key)
);
COMMENT ON TABLE app_jobs.scheduled_jobs IS 'Recurring/cron-style job definitions with database scoping: each row spawns jobs on a schedule for a specific database';
COMMENT ON COLUMN app_jobs.scheduled_jobs.id IS 'Auto-incrementing scheduled job identifier';
COMMENT ON COLUMN app_jobs.scheduled_jobs.database_id IS 'Database this scheduled job belongs to, for multi-tenant isolation';
COMMENT ON COLUMN app_jobs.scheduled_jobs.queue_name IS 'Name of the queue spawned jobs are placed into';
COMMENT ON COLUMN app_jobs.scheduled_jobs.task_identifier IS 'Task type identifier for spawned jobs';
COMMENT ON COLUMN app_jobs.scheduled_jobs.payload IS 'JSON payload passed to each spawned job';
COMMENT ON COLUMN app_jobs.scheduled_jobs.priority IS 'Priority assigned to spawned jobs (lower = higher priority)';
COMMENT ON COLUMN app_jobs.scheduled_jobs.max_attempts IS 'Max retry attempts for spawned jobs';
COMMENT ON COLUMN app_jobs.scheduled_jobs.key IS 'Optional unique deduplication key';
COMMENT ON COLUMN app_jobs.scheduled_jobs.locked_at IS 'Timestamp when the scheduler locked this record for processing';
COMMENT ON COLUMN app_jobs.scheduled_jobs.locked_by IS 'Identifier of the scheduler worker holding the lock';
COMMENT ON COLUMN app_jobs.scheduled_jobs.schedule_info IS 'JSON schedule configuration (e.g. cron expression, interval)';
COMMENT ON COLUMN app_jobs.scheduled_jobs.last_scheduled IS 'Timestamp when a job was last spawned from this schedule';
COMMENT ON COLUMN app_jobs.scheduled_jobs.last_scheduled_id IS 'ID of the last job spawned from this schedule';
CREATE FUNCTION app_jobs.do_notify() RETURNS trigger AS $EOFCODE$
BEGIN
PERFORM
pg_notify(TG_ARGV[0], '');
RETURN NEW;
END;
$EOFCODE$ LANGUAGE plpgsql;
CREATE TRIGGER _900_notify_scheduled_job
AFTER INSERT
ON app_jobs.scheduled_jobs
FOR EACH ROW
EXECUTE PROCEDURE app_jobs.do_notify('scheduled_jobs:insert');
CREATE INDEX scheduled_jobs_priority_id_idx ON app_jobs.scheduled_jobs (priority, id);
CREATE INDEX scheduled_jobs_locked_by_idx ON app_jobs.scheduled_jobs (locked_by);
GRANT SELECT, INSERT, UPDATE, DELETE ON app_jobs.scheduled_jobs TO administrator;
CREATE TABLE app_jobs.jobs (
id bigserial PRIMARY KEY,
database_id uuid NOT NULL,
queue_name text DEFAULT (public.gen_random_uuid())::text,
task_identifier text NOT NULL,
payload pg_catalog.json DEFAULT '{}'::json NOT NULL,
priority int DEFAULT 0 NOT NULL,
run_at timestamptz DEFAULT now() NOT NULL,
attempts int DEFAULT 0 NOT NULL,
max_attempts int DEFAULT 25 NOT NULL,
key text,
last_error text,
locked_at timestamptz,
locked_by text,
CHECK (length(key) < 513),
CHECK (length(task_identifier) < 127),
CHECK (max_attempts > 0),
CHECK (length(queue_name) < 127),
CHECK (length(locked_by) > 3),
UNIQUE (key)
);
COMMENT ON TABLE app_jobs.jobs IS 'Background job queue with database scoping: each row is a pending or in-progress task for a specific database';
COMMENT ON COLUMN app_jobs.jobs.id IS 'Auto-incrementing job identifier';
COMMENT ON COLUMN app_jobs.jobs.database_id IS 'Database this job belongs to, for multi-tenant job isolation';
COMMENT ON COLUMN app_jobs.jobs.queue_name IS 'Name of the queue this job belongs to; used for worker routing and concurrency control';
COMMENT ON COLUMN app_jobs.jobs.task_identifier IS 'Identifier for the task type (maps to a worker handler function)';
COMMENT ON COLUMN app_jobs.jobs.payload IS 'JSON payload of arguments passed to the task handler';
COMMENT ON COLUMN app_jobs.jobs.priority IS 'Execution priority; lower numbers run first (default 0)';
COMMENT ON COLUMN app_jobs.jobs.run_at IS 'Earliest time this job should be executed; used for delayed/scheduled execution';
COMMENT ON COLUMN app_jobs.jobs.attempts IS 'Number of times this job has been attempted so far';
COMMENT ON COLUMN app_jobs.jobs.max_attempts IS 'Maximum retry attempts before the job is considered permanently failed';
COMMENT ON COLUMN app_jobs.jobs.key IS 'Optional unique deduplication key; prevents duplicate jobs with the same key';
COMMENT ON COLUMN app_jobs.jobs.last_error IS 'Error message from the most recent failed attempt';
COMMENT ON COLUMN app_jobs.jobs.locked_at IS 'Timestamp when a worker locked this job for processing';
COMMENT ON COLUMN app_jobs.jobs.locked_by IS 'Identifier of the worker that currently holds the lock';
ALTER TABLE app_jobs.jobs
ADD COLUMN created_at timestamptz;
ALTER TABLE app_jobs.jobs
ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE app_jobs.jobs
ADD COLUMN updated_at timestamptz;
ALTER TABLE app_jobs.jobs
ALTER COLUMN updated_at SET DEFAULT now();
CREATE TRIGGER _100_update_jobs_modtime_tg
BEFORE INSERT OR UPDATE
ON app_jobs.jobs
FOR EACH ROW
EXECUTE PROCEDURE app_jobs.tg_update_timestamps();
CREATE FUNCTION app_jobs.tg_increase_job_queue_count() RETURNS trigger AS $EOFCODE$
BEGIN
INSERT INTO app_jobs.job_queues (queue_name, job_count)
VALUES (NEW.queue_name, 1)
ON CONFLICT (queue_name)
DO UPDATE SET
job_count = job_queues.job_count + 1;
RETURN NEW;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;
CREATE TRIGGER _500_increase_job_queue_count_on_insert
AFTER INSERT
ON app_jobs.jobs
FOR EACH ROW
WHEN (new.queue_name IS NOT NULL)
EXECUTE PROCEDURE app_jobs.tg_increase_job_queue_count();
CREATE TRIGGER _500_increase_job_queue_count_on_update
AFTER UPDATE OF queue_name
ON app_jobs.jobs
FOR EACH ROW
WHEN (new.queue_name IS DISTINCT FROM old.queue_name
AND new.queue_name IS NOT NULL)
EXECUTE PROCEDURE app_jobs.tg_increase_job_queue_count();
CREATE TRIGGER _900_notify_worker
AFTER INSERT
ON app_jobs.jobs
FOR EACH ROW
EXECUTE PROCEDURE app_jobs.do_notify('jobs:insert');
CREATE FUNCTION app_jobs.tg_decrease_job_queue_count() RETURNS trigger AS $EOFCODE$
DECLARE
v_new_job_count int;
BEGIN
UPDATE
app_jobs.job_queues
SET
job_count = job_queues.job_count - 1
WHERE
queue_name = OLD.queue_name
RETURNING
job_count INTO v_new_job_count;
IF v_new_job_count <= 0 THEN
DELETE FROM app_jobs.job_queues
WHERE queue_name = OLD.queue_name
AND job_count <= 0;
END IF;
RETURN OLD;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;
CREATE TRIGGER decrease_job_queue_count_on_delete
AFTER DELETE
ON app_jobs.jobs
FOR EACH ROW
WHEN (old.queue_name IS NOT NULL)
EXECUTE PROCEDURE app_jobs.tg_decrease_job_queue_count();
CREATE TRIGGER decrease_job_queue_count_on_update
AFTER UPDATE OF queue_name
ON app_jobs.jobs
FOR EACH ROW
WHEN (new.queue_name IS DISTINCT FROM old.queue_name
AND old.queue_name IS NOT NULL)
EXECUTE PROCEDURE app_jobs.tg_decrease_job_queue_count();
CREATE INDEX priority_run_at_id_idx ON app_jobs.jobs (priority, run_at, id);
CREATE INDEX jobs_locked_by_idx ON app_jobs.jobs (locked_by);
GRANT SELECT, INSERT, UPDATE, DELETE ON app_jobs.jobs TO administrator;
CREATE TABLE app_jobs.job_queues (
queue_name text NOT NULL PRIMARY KEY,
job_count int DEFAULT 0 NOT NULL,
locked_at timestamptz,
locked_by text
);
COMMENT ON TABLE app_jobs.job_queues IS 'Queue metadata: tracks job counts and locking state for each named queue';
COMMENT ON COLUMN app_jobs.job_queues.queue_name IS 'Unique name identifying this queue';
COMMENT ON COLUMN app_jobs.job_queues.job_count IS 'Number of pending jobs in this queue';
COMMENT ON COLUMN app_jobs.job_queues.locked_at IS 'Timestamp when this queue was locked for batch processing';
COMMENT ON COLUMN app_jobs.job_queues.locked_by IS 'Identifier of the worker that currently holds the queue lock';
CREATE INDEX job_queues_locked_by_idx ON app_jobs.job_queues (locked_by);
GRANT SELECT, INSERT, UPDATE, DELETE ON app_jobs.job_queues TO administrator;
CREATE FUNCTION app_jobs.run_scheduled_job(id bigint, job_expiry interval DEFAULT '1 hours') RETURNS app_jobs.jobs AS $EOFCODE$
DECLARE
j app_jobs.jobs;
last_id bigint;
lkd_by text;
BEGIN
-- check last scheduled
SELECT
last_scheduled_id
FROM
app_jobs.scheduled_jobs s
WHERE
s.id = run_scheduled_job.id INTO last_id;
-- if it's been scheduled check if it's been run
IF (last_id IS NOT NULL) THEN
SELECT
locked_by
FROM
app_jobs.jobs js
WHERE
js.id = last_id
AND (js.locked_at IS NULL -- never been run
OR js.locked_at >= (NOW() - job_expiry)
-- still running within a safe interval
) INTO lkd_by;
IF (FOUND) THEN
RAISE EXCEPTION 'ALREADY_SCHEDULED';
END IF;
END IF;
-- insert new job
INSERT INTO app_jobs.jobs (
database_id,
queue_name,
task_identifier,
payload,
priority,
max_attempts,
key
) SELECT
database_id,
queue_name,
task_identifier,
payload,
priority,
max_attempts,
key
FROM
app_jobs.scheduled_jobs s
WHERE
s.id = run_scheduled_job.id
RETURNING
* INTO j;
-- update the scheduled job
UPDATE
app_jobs.scheduled_jobs s
SET
last_scheduled = NOW(),
last_scheduled_id = j.id
WHERE
s.id = run_scheduled_job.id;
RETURN j;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;
CREATE FUNCTION app_jobs.reschedule_jobs(job_ids bigint[], run_at timestamptz DEFAULT NULL, priority int DEFAULT NULL, attempts int DEFAULT NULL, max_attempts int DEFAULT NULL) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
UPDATE
app_jobs.jobs
SET
run_at = coalesce(reschedule_jobs.run_at, jobs.run_at),
priority = coalesce(reschedule_jobs.priority, jobs.priority),
attempts = coalesce(reschedule_jobs.attempts, jobs.attempts),
max_attempts = coalesce(reschedule_jobs.max_attempts, jobs.max_attempts)
WHERE
id = ANY (job_ids)
AND (locked_by IS NULL
OR locked_at < NOW() - interval '4 hours')
RETURNING
*;
$EOFCODE$;
CREATE FUNCTION app_jobs.release_scheduled_jobs(worker_id text, ids bigint[] DEFAULT NULL) RETURNS void AS $EOFCODE$
DECLARE
BEGIN
-- clear the scheduled job
UPDATE
app_jobs.scheduled_jobs s
SET
locked_at = NULL,
locked_by = NULL
WHERE
locked_by = worker_id
AND (ids IS NULL
OR s.id = ANY (ids));
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;
CREATE FUNCTION app_jobs.release_jobs(worker_id text) RETURNS void AS $EOFCODE$
DECLARE
BEGIN
-- clear the job
UPDATE
app_jobs.jobs
SET
locked_at = NULL,
locked_by = NULL,
attempts = GREATEST (attempts - 1, 0)
WHERE
locked_by = worker_id;
-- clear the queue
UPDATE
app_jobs.job_queues
SET
locked_at = NULL,
locked_by = NULL
WHERE
locked_by = worker_id;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;
CREATE FUNCTION app_jobs.permanently_fail_jobs(job_ids bigint[], error_message text DEFAULT NULL) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
UPDATE
app_jobs.jobs
SET
last_error = coalesce(error_message, 'Manually marked as failed'),
attempts = max_attempts
WHERE
id = ANY (job_ids)
AND (locked_by IS NULL
OR locked_at < NOW() - interval '4 hours')
RETURNING
*;
$EOFCODE$;
CREATE FUNCTION app_jobs.get_scheduled_job(worker_id text, task_identifiers text[] DEFAULT NULL) RETURNS app_jobs.scheduled_jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_job_id bigint;
v_row app_jobs.scheduled_jobs;
BEGIN
--
IF worker_id IS NULL THEN
RAISE exception 'INVALID_WORKER_ID';
END IF;
--
SELECT
scheduled_jobs.id INTO v_job_id
FROM
app_jobs.scheduled_jobs
WHERE (scheduled_jobs.locked_at IS NULL)
AND (task_identifiers IS NULL
OR task_identifier = ANY (task_identifiers))
ORDER BY
priority ASC,
id ASC
LIMIT 1
FOR UPDATE
SKIP LOCKED;
--
IF v_job_id IS NULL THEN
RETURN NULL;
END IF;
--
UPDATE
app_jobs.scheduled_jobs
SET
locked_by = worker_id,
locked_at = NOW()
WHERE
id = v_job_id
RETURNING
* INTO v_row;
--
RETURN v_row;
END;
$EOFCODE$;
CREATE FUNCTION app_jobs.get_job(worker_id text, task_identifiers text[] DEFAULT NULL, job_expiry interval DEFAULT '4 hours') RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_job_id bigint;
v_queue_name text;
v_row app_jobs.jobs;
v_now timestamptz = now();
BEGIN
IF worker_id IS NULL THEN
RAISE exception 'INVALID_WORKER_ID';
END IF;
--
SELECT
jobs.queue_name,
jobs.id INTO v_queue_name,
v_job_id
FROM
app_jobs.jobs
WHERE (jobs.locked_at IS NULL
OR jobs.locked_at < (v_now - job_expiry))
AND (jobs.queue_name IS NULL
OR EXISTS (
SELECT
1
FROM
app_jobs.job_queues
WHERE
job_queues.queue_name = jobs.queue_name
AND (job_queues.locked_at IS NULL
OR job_queues.locked_at < (v_now - job_expiry))
FOR UPDATE
SKIP LOCKED))
AND run_at <= v_now
AND attempts < max_attempts
AND (task_identifiers IS NULL
OR task_identifier = ANY (task_identifiers))
ORDER BY
priority ASC,
run_at ASC,
id ASC
LIMIT 1
FOR UPDATE
SKIP LOCKED;
--
IF v_job_id IS NULL THEN
RETURN NULL;
END IF;
--
IF v_queue_name IS NOT NULL THEN
UPDATE
app_jobs.job_queues
SET
locked_by = worker_id,
locked_at = v_now
WHERE
job_queues.queue_name = v_queue_name;
END IF;
--
UPDATE
app_jobs.jobs
SET
attempts = attempts + 1,
locked_by = worker_id,
locked_at = v_now
WHERE
id = v_job_id
RETURNING
* INTO v_row;
--
RETURN v_row;
END;
$EOFCODE$;
CREATE FUNCTION app_jobs.fail_job(worker_id text, job_id bigint, error_message text) RETURNS app_jobs.jobs LANGUAGE plpgsql STRICT AS $EOFCODE$
DECLARE
v_row app_jobs.jobs;
BEGIN
UPDATE
app_jobs.jobs
SET
last_error = error_message,
run_at = greatest (now(), run_at) + (exp(least (attempts, 10))::text || ' seconds')::interval,
locked_by = NULL,
locked_at = NULL
WHERE
id = job_id
AND locked_by = worker_id
RETURNING
* INTO v_row;
IF v_row.queue_name IS NOT NULL THEN
UPDATE
app_jobs.job_queues
SET
locked_by = NULL,
locked_at = NULL
WHERE
queue_name = v_row.queue_name
AND locked_by = worker_id;
END IF;
RETURN v_row;
END;
$EOFCODE$;
CREATE FUNCTION app_jobs.complete_jobs(job_ids bigint[]) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
DELETE FROM app_jobs.jobs
WHERE id = ANY (job_ids)
AND (locked_by IS NULL
OR locked_at < NOW() - interval '4 hours')
RETURNING
*;
$EOFCODE$;
CREATE FUNCTION app_jobs.complete_job(worker_id text, job_id bigint) RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_row app_jobs.jobs;
BEGIN
DELETE FROM app_jobs.jobs
WHERE id = job_id
RETURNING
* INTO v_row;
IF v_row.queue_name IS NOT NULL THEN
UPDATE
app_jobs.job_queues
SET
locked_by = NULL,
locked_at = NULL
WHERE
queue_name = v_row.queue_name
AND locked_by = worker_id;
END IF;
RETURN v_row;
END;
$EOFCODE$;
CREATE FUNCTION app_jobs.add_scheduled_job(db_id uuid, identifier text, payload pg_catalog.json DEFAULT '{}'::json, schedule_info pg_catalog.json DEFAULT '{}'::json, job_key text DEFAULT NULL, queue_name text DEFAULT NULL, max_attempts int DEFAULT 25, priority int DEFAULT 0) RETURNS app_jobs.scheduled_jobs AS $EOFCODE$
DECLARE
v_job app_jobs.scheduled_jobs;
BEGIN
IF job_key IS NOT NULL THEN
-- Upsert job
INSERT INTO app_jobs.scheduled_jobs (
database_id,
task_identifier,
payload,
queue_name,
schedule_info,
max_attempts,
key,
priority
) VALUES (
db_id,
identifier,
coalesce(payload, '{}'::json),
queue_name,
schedule_info,
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0)
)
ON CONFLICT (key)
DO UPDATE SET
task_identifier = EXCLUDED.task_identifier,
payload = EXCLUDED.payload,
queue_name = EXCLUDED.queue_name,
max_attempts = EXCLUDED.max_attempts,
schedule_info = EXCLUDED.schedule_info,
priority = EXCLUDED.priority
WHERE
scheduled_jobs.locked_at IS NULL
RETURNING
* INTO v_job;
-- If upsert succeeded (insert or update), return early
IF NOT (v_job IS NULL) THEN
RETURN v_job;
END IF;
-- Upsert failed -> there must be an existing scheduled job that is locked. Remove
-- and allow a new one to be inserted
DELETE FROM
app_jobs.scheduled_jobs
WHERE
KEY = job_key;
END IF;
INSERT INTO app_jobs.scheduled_jobs (
database_id,
task_identifier,
payload,
queue_name,
schedule_info,
max_attempts,
priority
) VALUES (
db_id,
identifier,
payload,
queue_name,
schedule_info,
max_attempts,
priority
) RETURNING * INTO v_job;
RETURN v_job;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
CREATE FUNCTION app_jobs.add_job(db_id uuid, identifier text, payload pg_catalog.json DEFAULT '{}'::json, job_key text DEFAULT NULL, queue_name text DEFAULT NULL, run_at timestamptz DEFAULT now(), max_attempts int DEFAULT 25, priority int DEFAULT 0) RETURNS app_jobs.jobs AS $EOFCODE$
DECLARE
v_job app_jobs.jobs;
BEGIN
IF job_key IS NOT NULL THEN
-- Upsert job
INSERT INTO app_jobs.jobs (
database_id,
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority
) VALUES (
db_id,
identifier,
coalesce(payload,
'{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0)
)
ON CONFLICT (key)
DO UPDATE SET
task_identifier = EXCLUDED.task_identifier,
payload = EXCLUDED.payload,
queue_name = EXCLUDED.queue_name,
max_attempts = EXCLUDED.max_attempts,
run_at = EXCLUDED.run_at,
priority = EXCLUDED.priority,
-- always reset error/retry state
attempts = 0, last_error = NULL
WHERE
jobs.locked_at IS NULL
RETURNING
* INTO v_job;
-- If upsert succeeded (insert or update), return early
IF NOT (v_job IS NULL) THEN
RETURN v_job;
END IF;
-- Upsert failed -> there must be an existing job that is locked. Remove
-- existing key to allow a new one to be inserted, and prevent any
-- subsequent retries by bumping attempts to the max allowed.
UPDATE
app_jobs.jobs
SET
KEY = NULL,
attempts = jobs.max_attempts
WHERE
KEY = job_key;
END IF;
INSERT INTO app_jobs.jobs (
database_id,
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
priority
) VALUES (
db_id,
identifier,
payload,
queue_name,
run_at,
max_attempts,
priority
)
RETURNING * INTO v_job;
RETURN v_job;
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;