|
| 1 | +CREATE OR REPLACE FUNCTION public.end_sync_task(s_target bigint, s_function character varying, s_worker character varying, s_status task_status) |
| 2 | + RETURNS void |
| 3 | + LANGUAGE plpgsql |
| 4 | +AS $function$ |
| 5 | +DECLARE t_id INTEGER; |
| 6 | +DECLARE t_worker varchar; |
| 7 | +DECLARE t_status task_status; |
| 8 | +DECLARE t_failure_count SMALLINT; |
| 9 | +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; |
| 10 | +BEGIN |
| 11 | + SELECT id, worker, status, failure_count, last_task_end |
| 12 | + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end |
| 13 | + FROM sync_info WHERE sync_target = s_target AND sync_function = s_function; |
| 14 | + ASSERT s_status > 'active'; |
| 15 | + ASSERT t_worker = s_worker, 'Wrong worker'; |
| 16 | + ASSERT s_status >= t_status, 'do not go back in status'; |
| 17 | + IF s_status = 'complete' THEN |
| 18 | + t_last_task_end := now(); |
| 19 | + t_failure_count := 0; |
| 20 | + ELSE |
| 21 | + IF t_status != s_status THEN |
| 22 | + t_failure_count := t_failure_count + 1; |
| 23 | + END IF; |
| 24 | + END IF; |
| 25 | + |
| 26 | + UPDATE sync_info |
| 27 | + SET status = s_status, |
| 28 | + task_times_out_at=null, |
| 29 | + last_task_end=t_last_task_end, |
| 30 | + failure_count=t_failure_count |
| 31 | + WHERE id=t_id; |
| 32 | +END; |
| 33 | +$function$ |
| 34 | +; |
| 35 | + |
| 36 | +CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval) |
| 37 | + RETURNS interval |
| 38 | + LANGUAGE plpgsql |
| 39 | +AS $function$ |
| 40 | +DECLARE s_id INTEGER; |
| 41 | +DECLARE start_time TIMESTAMP WITH TIME ZONE; |
| 42 | +DECLARE t_status task_status; |
| 43 | +DECLARE t_failure_count SMALLINT; |
| 44 | +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; |
| 45 | +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; |
| 46 | +DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; |
| 47 | +DECLARE result INTERVAL = NULL; |
| 48 | +BEGIN |
| 49 | + ASSERT timeout * 2 < task_interval; |
| 50 | + ASSERT timeout >= '1s'::interval; |
| 51 | + ASSERT task_interval >= '5s'::interval; |
| 52 | + start_time := now(); |
| 53 | + INSERT INTO sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at) |
| 54 | + VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout) |
| 55 | + ON CONFLICT DO NOTHING |
| 56 | + RETURNING id INTO s_id; |
| 57 | + -- zut il renvoie null... |
| 58 | + IF s_id IS NOT NULL THEN |
| 59 | + -- totally new_row, I'm on the task |
| 60 | + RETURN NULL; |
| 61 | + END IF; |
| 62 | + -- now we know it pre-existed. Maybe already active. |
| 63 | + SELECT id INTO STRICT s_id FROM sync_info WHERE sync_target = s_target AND sync_function = s_function; |
| 64 | + PERFORM pg_advisory_lock(s_id); |
| 65 | + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at |
| 66 | + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at |
| 67 | + FROM sync_info |
| 68 | + WHERE id = s_id; |
| 69 | + |
| 70 | + IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN |
| 71 | + t_status := 'timeout'; |
| 72 | + t_failure_count := t_failure_count + 1; |
| 73 | + END IF; |
| 74 | + -- basic backoff |
| 75 | + task_interval := task_interval * (1+t_failure_count); |
| 76 | + IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN |
| 77 | + -- we are ready to take on the task |
| 78 | + UPDATE sync_info |
| 79 | + SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = now(), failure_count=t_failure_count |
| 80 | + WHERE id=s_id; |
| 81 | + ELSE |
| 82 | + -- the task has been tried recently enough |
| 83 | + IF t_status = 'timeout' THEN |
| 84 | + UPDATE sync_info |
| 85 | + SET status=t_status, failure_count=t_failure_count |
| 86 | + WHERE id=s_id; |
| 87 | + END IF; |
| 88 | + result := coalesce(t_last_task_end, t_last_task_start) + task_interval - now(); |
| 89 | + END IF; |
| 90 | + |
| 91 | + PERFORM pg_advisory_unlock(s_id); |
| 92 | + RETURN result; |
| 93 | +END; |
| 94 | +$function$ |
| 95 | +; |
0 commit comments