forked from seomoz/qless-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrack.lua
More file actions
79 lines (75 loc) · 2.44 KB
/
track.lua
File metadata and controls
79 lines (75 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
-- Track(0)
-- Track(0, ('track' | 'untrack'), jid, now)
-- ------------------------------------------
-- If no arguments are provided, it returns details of all currently-tracked jobs.
-- If the first argument is 'track', then it will start tracking the job associated
-- with that id, and 'untrack' stops tracking it. In this context, tracking is
-- nothing more than saving the job to a list of jobs that are considered special.
-- __Returns__ JSON:
--
-- {
-- 'jobs': [
-- {
-- 'jid': ...,
-- # All the other details you'd get from 'get'
-- }, {
-- ...
-- }
-- ], 'expired': [
-- # These are all the jids that are completed and whose data expired
-- 'deadbeef',
-- ...,
-- ...,
-- ]
-- }
--
if #KEYS ~= 0 then
error('Track(): No keys expected. Got ' .. #KEYS)
end
if ARGV[1] ~= nil then
local jid = assert(ARGV[2] , 'Track(): Arg "jid" missing')
local now = assert(tonumber(ARGV[3]), 'Track(): Arg "now" missing or not a number: ' .. (ARGV[3] or 'nil'))
if string.lower(ARGV[1]) == 'track' then
redis.call('publish', 'track', jid)
return redis.call('zadd', 'ql:tracked', now, jid)
elseif string.lower(ARGV[1]) == 'untrack' then
redis.call('publish', 'untrack', jid)
return redis.call('zrem', 'ql:tracked', jid)
else
error('Track(): Unknown action "' .. ARGV[1] .. '"')
end
else
local response = {
jobs = {},
expired = {}
}
local jids = redis.call('zrange', 'ql:tracked', 0, -1)
for index, jid in ipairs(jids) do
local job = redis.call(
'hmget', 'ql:j:' .. jid, 'jid', 'klass', 'state', 'queue', 'worker', 'priority',
'expires', 'retries', 'remaining', 'data', 'tags', 'history', 'failure')
if job[1] then
table.insert(response.jobs, {
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = true,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = tonumber(job[9]),
data = cjson.decode(job[10]),
tags = cjson.decode(job[11]),
history = cjson.decode(job[12]),
failure = cjson.decode(job[13] or '{}'),
dependents = redis.call('smembers', 'ql:j:' .. jid .. '-dependents'),
dependencies = redis.call('smembers', 'ql:j:' .. jid .. '-dependencies')
})
else
table.insert(response.expired, jid)
end
end
return cjson.encode(response)
end