forked from seomoz/qless-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpop.lua
More file actions
297 lines (258 loc) · 11.2 KB
/
pop.lua
File metadata and controls
297 lines (258 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
-- This script takes the name of the queue and then checks
-- for any expired locks, then inserts any scheduled items
-- that are now valid, and lastly returns any work items
-- that can be handed over.
--
-- Keys:
-- 1) queue name
-- Args:
-- 1) worker name
-- 2) the number of items to return
-- 3) the current time
if #KEYS ~= 1 then
if #KEYS < 1 then
error('Pop(): Expected 1 KEYS argument')
else
error('Pop(): Got ' .. #KEYS .. ', expected 1 KEYS argument')
end
end
local queue = assert(KEYS[1] , 'Pop(): Key "queue" missing')
local key = 'ql:q:' .. queue
local worker = assert(ARGV[1] , 'Pop(): Arg "worker" missing')
local count = assert(tonumber(ARGV[2]) , 'Pop(): Arg "count" missing or not a number: ' .. (ARGV[2] or 'nil'))
local now = assert(tonumber(ARGV[3]) , 'Pop(): Arg "now" missing or not a number: ' .. (ARGV[3] or 'nil'))
-- We should find the heartbeat interval for this queue
-- heartbeat
local _hb, _qhb = unpack(redis.call('hmget', 'ql:config', 'heartbeat', queue .. '-heartbeat'))
local expires = now + tonumber(_qhb or _hb or 60)
-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)
-- These are the ids that we're going to return
local keys = {}
-- Make sure we this worker to the list of seen workers
redis.call('zadd', 'ql:workers', now, worker)
if redis.call('sismember', 'ql:paused_queues', queue) == 1 then
return {}
end
-- Iterate through all the expired locks and add them to the list
-- of keys that we'll return
for index, jid in ipairs(redis.call('zrangebyscore', key .. '-locks', 0, now, 'LIMIT', 0, count)) do
-- Remove this job from the jobs that the worker that was running it has
local w = redis.call('hget', 'ql:j:' .. jid, 'worker')
redis.call('zrem', 'ql:w:' .. w .. ':jobs', jid)
-- For each of these, decrement their retries. If any of them
-- have exhausted their retries, then we should mark them as
-- failed.
if redis.call('hincrby', 'ql:j:' .. jid, 'remaining', -1) < 0 then
-- Now remove the instance from the schedule, and work queues for the queue it's in
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)
local group = 'failed-retries-' .. queue
-- First things first, we should get the history
local history = redis.call('hget', 'ql:j:' .. jid, 'history')
-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
history[#history]['failed'] = now
redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
['group'] = group,
['message'] = 'Job exhausted retries in queue "' .. queue .. '"',
['when'] = now,
['worker'] = history[#history]['worker']
}))
-- Add this type of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed types
redis.call('lpush', 'ql:f:' .. group, jid)
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'failed', jid)
end
else
table.insert(keys, jid)
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'stalled', jid)
end
end
end
-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested.
-- If we got any expired locks, then we should increment the
-- number of retries for this stage for this bin
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'retries', #keys)
-- If we still need jobs in order to meet demand, then we should
-- look for all the recurring jobs that need jobs run
if #keys < count then
-- This is how many jobs we've moved so far
local moved = 0
-- These are the recurring jobs that need work
local r = redis.call('zrangebyscore', key .. '-recur', 0, now, 'LIMIT', 0, (count - #keys))
for index, jid in ipairs(r) do
-- For each of the jids that need jobs scheduled, first
-- get the last time each of them was run, and then increment
-- it by its interval. While this time is less than now,
-- we need to keep putting jobs on the queue
local klass, data, priority, tags, retries, interval = unpack(redis.call('hmget', 'ql:r:' .. jid, 'klass', 'data', 'priority', 'tags', 'retries', 'interval'))
local _tags = cjson.decode(tags)
-- We're saving this value so that in the history, we can accurately
-- reflect when the job would normally have been scheduled
local score = math.floor(tonumber(redis.call('zscore', key .. '-recur', jid)))
while (score <= now) and (moved < (count - #keys)) do
-- Increment the count of how many jobs we've moved from recurring
-- to 'work'
moved = moved + 1
-- the count'th job that we've moved from this recurring job
local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
-- Add this job to the list of jobs tagged with whatever tags were supplied
for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
redis.call('zincrby', 'ql:tags', 1, tag)
end
-- First, let's save its data
redis.call('hmset', 'ql:j:' .. jid .. '-' .. count,
'jid' , jid .. '-' .. count,
'klass' , klass,
'data' , data,
'priority' , priority,
'tags' , tags,
'state' , 'waiting',
'worker' , '',
'expires' , 0,
'queue' , queue,
'retries' , retries,
'remaining', retries,
'history' , cjson.encode({{
-- The job was essentially put in this queue at this time,
-- and not the current time
q = queue,
put = math.floor(score)
}}))
-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
redis.call('zadd', key .. '-work', priority - (score / 10000000000), jid .. '-' .. count)
redis.call('zincrby', key .. '-recur', interval, jid)
score = score + interval
end
end
end
-- If we still need values in order to meet the demand, then we
-- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
if #keys < count then
-- zadd is a list of arguments that we'll be able to use to
-- insert into the work queue
local zadd = {}
local r = redis.call('zrangebyscore', key .. '-scheduled', 0, now, 'LIMIT', 0, (count - #keys))
for index, jid in ipairs(r) do
-- With these in hand, we'll have to go out and find the
-- priorities of these jobs, and then we'll insert them
-- into the work queue and then when that's complete, we'll
-- remove them from the scheduled queue
table.insert(zadd, tonumber(redis.call('hget', 'ql:j:' .. jid, 'priority') or 0))
table.insert(zadd, jid)
end
-- Now add these to the work list, and then remove them
-- from the scheduled list
if #zadd > 0 then
redis.call('zadd', key .. '-work', unpack(zadd))
redis.call('zrem', key .. '-scheduled', unpack(r))
end
-- And now we should get up to the maximum number of requested
-- work items from the work queue.
for index, jid in ipairs(redis.call('zrevrange', key .. '-work', 0, (count - #keys) - 1)) do
table.insert(keys, jid)
end
end
-- Alright, now the `keys` table is filled with all the job
-- ids which we'll be returning. Now we need to get the
-- metadeata about each of these, update their metadata to
-- reflect which worker they're on, when the lock expires,
-- etc., add them to the locks queue and then we have to
-- finally return a list of json blobs
local response = {}
local state
local history
for index, jid in ipairs(keys) do
-- First, we should get the state and history of the item
state, history = unpack(redis.call('hmget', 'ql:j:' .. jid, 'state', 'history'))
history = cjson.decode(history or '{}')
history[#history]['worker'] = worker
history[#history]['popped'] = math.floor(now)
----------------------------------------------------------
-- This is the massive stats update that we have to do
----------------------------------------------------------
-- This is how long we've been waiting to get popped
local waiting = math.floor(now) - history[#history]['put']
-- Now we'll go through the apparently long and arduous process of update
local count, mean, vk = unpack(redis.call('hmget', 'ql:s:wait:' .. bin .. ':' .. queue, 'total', 'mean', 'vk'))
count = count or 0
if count == 0 then
mean = waiting
vk = 0
count = 1
else
count = count + 1
local oldmean = mean
mean = mean + (waiting - mean) / count
vk = vk + (waiting - mean) * (waiting - oldmean)
end
-- Now, update the histogram
-- - `s1`, `s2`, ..., -- second-resolution histogram counts
-- - `m1`, `m2`, ..., -- minute-resolution
-- - `h1`, `h2`, ..., -- hour-resolution
-- - `d1`, `d2`, ..., -- day-resolution
waiting = math.floor(waiting)
if waiting < 60 then -- seconds
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 's' .. waiting, 1)
elseif waiting < 3600 then -- minutes
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'm' .. math.floor(waiting / 60), 1)
elseif waiting < 86400 then -- hours
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'h' .. math.floor(waiting / 3600), 1)
else -- days
redis.call('hincrby', 'ql:s:wait:' .. bin .. ':' .. queue, 'd' .. math.floor(waiting / 86400), 1)
end
redis.call('hmset', 'ql:s:wait:' .. bin .. ':' .. queue, 'total', count, 'mean', mean, 'vk', vk)
----------------------------------------------------------
-- Add this job to the list of jobs handled by this worker
redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)
-- Update the jobs data, and add its locks, and return the job
redis.call(
'hmset', 'ql:j:' .. jid, 'worker', worker, 'expires', expires,
'state', 'running', 'history', cjson.encode(history))
redis.call('zadd', key .. '-locks', expires, jid)
local job = redis.call(
'hmget', 'ql:j:' .. jid, 'jid', 'klass', 'state', 'queue', 'worker', 'priority',
'expires', 'retries', 'remaining', 'data', 'tags', 'history', 'failure')
local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false
if tracked then
redis.call('publish', 'popped', jid)
end
table.insert(response, cjson.encode({
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
worker = job[5] or '',
tracked = tracked,
priority = tonumber(job[6]),
expires = tonumber(job[7]) or 0,
retries = tonumber(job[8]),
remaining = tonumber(job[9]),
data = cjson.decode(job[10]),
tags = cjson.decode(job[11]),
history = cjson.decode(job[12]),
failure = cjson.decode(job[13] or '{}'),
dependents = redis.call('smembers', 'ql:j:' .. jid .. '-dependents'),
-- A job in the waiting state can not have dependencies
-- because it has been popped off of a queue, which
-- means all of its dependencies have been satisfied
dependencies = {}
}))
end
if #keys > 0 then
redis.call('zrem', key .. '-work', unpack(keys))
end
return response