-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathget.lua
More file actions
122 lines (111 loc) · 3.22 KB
/
get.lua
File metadata and controls
122 lines (111 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
local _M = {}
local cuturl = require("cuturl")
local util = require("util")
local get_and_sub = function(redis, subject, details)
-- atomically (wrapped in multi+exec) read state for [details] keys in $subject
-- NOTE: This leaves redis in a SUBSCRIBE state - use redis:read_reply() to read
local initial_states = {}
redis:multi()
for i = 1, #details do
-- queue up GETs in multi
redis:get(details[i])
end
for i = 1, #details do
-- list subscriptions
redis:subscribe(details[i])
end
-- exec queued items
local queued_items = redis:exec()
for i = 1, #details do
-- pull out the responses to the GETs
initial_states[i] = queued_items[i]
end
return initial_states
end
_M.subscribe = function(redis)
-- subject - the namespace
-- details - the list of ids being subscribed to within the namespace
local subject, details = cuturl.sub(ngx.var.uri)
for i = 1, #details do
details[i] = subject .. "/" .. details[i]
end
local initial_states = get_and_sub(redis, subject, details)
for i = 1, #initial_states do
ngx.say(initial_states[i])
end
ngx.flush()
local err = nil
local res
while not err do
res, err = redis:read_reply()
if res then
ngx.log(ngx.ERR, "subject is " .. res[2])
ngx.log(ngx.ERR, "details is " .. res[3])
ngx.say(res[3])
ngx.flush()
else
if err == "timeout" then
ngx.log(ngx.ERR, "TIMEOUT")
break
else
ngx.log(ngx.ERR, "OTHER ERROR")
ngx.log(ngx.ERR, err)
break
end
end
end
end
_M.poll = function(redis)
-- subject - the namespace
-- details - the list of ids being subscribed to within the namespace
local subject, details = cuturl.sub(ngx.var.uri)
for i = 1, #details do
details[i] = subject .. "/" .. details[i]
end
local initial_states = get_and_sub(redis, subject, details)
local content_etag = util.calc_md5(initial_states)
local consumer_etag = ngx.var.http_if_none_match
-- do etags match? if yes then already have this, wait for the next thing and send just it
-- not match? send what we have with a tag
--
if content_etag ~= consumer_etag then
ngx.header["ETag"] = content_etag
-- we differ from consumer, send our state
for i = 1, #initial_states do
-- pull out the responses to the GETs
if initial_states[i] ~= ngx.null then
if i == #initial_states then
ngx.print(initial_states[i])
else
ngx.say(initial_states[i])
end
end
end
ngx.flush()
else
-- we match consumer, wait for state change
local err, res
while not err do
res, err = redis:read_reply()
if not err then
util.override_arraypair(details, initial_states, res[2], res[3])
local upd_etag = util.calc_md5(initial_states)
ngx.header["ETag"] = upd_etag
ngx.say(res[3])
break
else
if err == "timeout" then
-- nginx will set the status to 304 here due to matching ETag
ngx.header["ETag"] = consumer_etag
ngx.log(ngx.ERR, "TIMEOUT")
break
else
ngx.log(ngx.ERR, "OTHER ERROR")
ngx.log(ngx.ERR, err)
break
end
end
end
end
end
return _M