Skip to content

Commit c2a9c38

Browse files
committed
Improve timeseries effiency when handling large result sets
1 parent b2cc874 commit c2a9c38

5 files changed

Lines changed: 163 additions & 109 deletions

File tree

src/cbang/api/Timeseries.cpp

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -89,34 +89,12 @@ void Timeseries::query(uint64_t since, unsigned maxResults, const cb_t &cb) {
8989
}
9090

9191

92-
void Timeseries::add(uint64_t time, const JSON::ValuePtr &value) {
93-
// Load last data point if necessary
94-
if (last.isNull()) {
95-
auto cb = [=] (
96-
const SmartPointer<Exception> &err, const JSON::ValuePtr &data) {
97-
if (err.isNull() && data.isSet() && data->isList() && data->size())
98-
last = data->get(0)->get("value");
99-
add(time, value);
100-
};
101-
102-
last = JSON::Null::instancePtr();
103-
return query(0, 1, cb);
104-
}
105-
106-
// Don't record if result is unchanged
107-
if (*last != *value) {
108-
last = value;
92+
void Timeseries::broadcast(uint64_t time, const JSON::ValuePtr &value) {
93+
if (subscribers.empty()) return;
10994

110-
auto key = getKey(time);
111-
LOG_DEBUG(5, "key: " << key << " value: " << *value);
112-
db.set(key, value->toString());
113-
114-
if (!subscribers.empty()) {
115-
auto entry = makeEntry(time, value);
116-
for (auto p: subscribers)
117-
if (p.second.isSet()) p.second->next(entry);
118-
}
119-
}
95+
auto entry = makeEntry(time, value);
96+
for (auto p: subscribers)
97+
if (p.second.isSet()) p.second->next(entry);
12098
}
12199

122100

@@ -152,8 +130,3 @@ void Timeseries::unsubscribe(uint64_t id) {
152130
THROWX("Timeseries does not have subscriber with id " << id,
153131
HTTP::Status::HTTP_NOT_FOUND);
154132
}
155-
156-
157-
string Timeseries::getKey(uint64_t ts) const {
158-
return Time(handler.getTimePeriod(ts)).toString(TIME_FMT);
159-
}

src/cbang/api/Timeseries.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,13 @@ namespace cb {
6060
Timeseries(TimeseriesHandler &handler, const std::string &key);
6161

6262
void query(uint64_t since, unsigned maxResults, const cb_t &cb);
63-
void add(uint64_t time, const JSON::ValuePtr &value);
63+
void broadcast(uint64_t time, const JSON::ValuePtr &value);
6464

6565
SmartPointer<Subscriber> subscribe(
6666
uint64_t id, uint64_t since, unsigned maxResults, const cb_t &cb);
6767
void unsubscribe(uint64_t id);
6868

6969
protected:
70-
std::string getKey(uint64_t ts) const;
7170
void query(uint64_t ts);
7271
void query();
7372
};

src/cbang/api/handler/TimeseriesHandler.cpp

Lines changed: 142 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,37 @@
2929
joseph@cauldrondevelopment.com
3030
3131
\******************************************************************************/
32+
/*
33+
This class handles the storing and recalling of timeseries data. Queries
34+
are made to the SQL DB and those results are stored as a timeseries in
35+
LevelDB.
36+
37+
Timeseries data may either be a single value or a list of values keyed by
38+
one or more variables. The key or keys are specified via the ``key``
39+
configuration option. The key value(s) are removed from the timeseries
40+
data to save space. If the resulting data is a dictionary with a single
41+
value then only the value is stored.
42+
43+
Clients may subscribe to the timeseries via Websocket. Subscribers will
44+
first receive a list of all the recent timeseries data limited by time
45+
and total number of results. Subsequently, new results will be broadcast
46+
to all subscribers for as long as they stay connected.
47+
48+
The last result is tracked in LevelDB. If a new result is the same as the
49+
last result it is not added to the timeseries. In this way, gaps in the time
50+
series are assumed to be repeats of the previous value.
51+
52+
The "last" result is stored at key "\0". If the timeseries is a list of
53+
values then "last" will be dictionary of key, value pairs, otherwise just the
54+
one value. The "last" value is reloaded when the server is restarted before
55+
making further queries.
56+
57+
The implementation is complicated because care is taken to not dominate the
58+
main thread when processing large lists of data. A low priority event is
59+
used, repetitive operations are spread across multiple event callbacks and
60+
LevelDB writes are batched.
61+
*/
62+
3263

3364
#include "TimeseriesHandler.h"
3465

@@ -44,6 +75,10 @@ using namespace std;
4475
using namespace cb;
4576
using namespace cb::API;
4677

78+
#undef CBANG_LOG_PREFIX
79+
#define CBANG_LOG_PREFIX "TS:" << name << ":"
80+
#define TIME_FMT "%Y%m%d%H%M%S"
81+
4782

4883
TimeseriesHandler::TimeseriesHandler(
4984
API &api, const string &name, const JSON::ValuePtr &config) :
@@ -76,9 +111,9 @@ TimeseriesHandler::TimeseriesHandler(
76111
key = config->createList(); // Empty key
77112
}
78113

79-
event->setPriority(4);
114+
event->setPriority(7);
80115

81-
load();
116+
schedule();
82117
}
83118

84119

@@ -98,7 +133,7 @@ double TimeseriesHandler::getNext() const {
98133
auto now = Timer::now();
99134
double next = getTimePeriod(ceil(now)) + period - now;
100135

101-
LOG_DEBUG(5, "Querying timeseries in " << next << " seconds "
136+
LOG_DEBUG(4, "Querying in " << next << " seconds "
102137
<< " current=" << Time(getTimePeriod(ceil(now))).toString()
103138
<< " period=" << period
104139
<< " now=" << Time(now).toString());
@@ -114,95 +149,76 @@ SmartPointer<Timeseries> TimeseriesHandler::get(
114149
if (it != series.end()) return it->second;
115150
if (!create) return 0;
116151

117-
LOG_DEBUG(4, "Adding `" << name << "` timeseries with key `" << key << "`");
152+
LOG_DEBUG(5, "Adding key `" << key << "`");
118153
return series[key] = new Timeseries(*this, key);
119154
}
120155

121156

122-
123-
void TimeseriesHandler::action(const CtxPtr &ctx) {
124-
// Resolve variables
125-
auto resolver = ctx->getResolver();
126-
auto action = resolver->selectString("args.action", "query");
127-
auto since = resolver->selectTime("args.since", 0);
128-
auto maxCount = resolver->selectU64("args.max_count", 0);
129-
auto key = resolveKey(*resolver->select("args"));
130-
131-
// Get Timeseries
132-
auto ts = get(key);
133-
if (ts.isNull() && action == "unsubscribe") return;
134-
135-
// Execute action
136-
auto cb = [this, ctx] (
137-
const SmartPointer<Exception> &err, const JSON::ValuePtr &data) {
138-
if (err.isNull()) ctx->reply(data);
139-
else ctx->reply(*err);
140-
};
141-
142-
if (action == "query") return ts->query(since, maxCount, cb);
143-
144-
auto ws = ctx->getWebsocket();
145-
if (ws.isSet()) {
146-
if (action == "subscribe") return ws->subscribe(*ts, since, maxCount, cb);
147-
if (action == "unsubscribe") return ws->unsubscribe(*ts);
148-
}
149-
150-
THROW("Unrecognized timeseries action '" << action << "'");
157+
void TimeseriesHandler::schedule() {
158+
if (last.isNull() || results.isSet()) event->add(0);
159+
else event->add(getNext());
151160
}
152161

153162

154-
void TimeseriesHandler::load() {schedule();}
155-
156-
157-
void TimeseriesHandler::query(uint64_t time) {
158-
auto cb = [=] (HTTP::Status status, const JSON::ValuePtr &results) {
159-
if (status == HTTP::Status::HTTP_OK) try {
160-
if (ret != "list") get("")->add(time, results);
161-
else {
162-
LOG_DEBUG(3, "Storing timeseries results " << name);
163-
this->results = results;
164-
resultsTime = time;
165-
it = results->begin();
166-
}
167-
} CATCH_ERROR;
163+
void TimeseriesHandler::process() {
164+
if (last.isNull()) {
165+
// Try to load the last value
166+
LOG_DEBUG(3, "Querying last value");
168167

169-
schedule();
170-
};
168+
auto cb = [this] (bool success, const string &value) {
169+
LOG_DEBUG(3, "Last value success=" << (success ? "true" : "false"));
171170

172-
LOG_DEBUG(3, "Querying timeseries " << name);
173-
LOG_DEBUG(5, "Timeseries query: " << sql);
174-
175-
query(sql, cb);
176-
}
171+
last = new JSON::Dict;
172+
schedule();
177173

174+
try {
175+
if (success) last = JSON::Reader::parse(value);
176+
} catch (const Exception &e) {
177+
LOG_ERROR("Failed to parse last timeseries result: " << e);
178+
}
179+
};
178180

179-
void TimeseriesHandler::schedule() {
180-
if (results.isSet()) event->add(0);
181-
else if (!event->isPending()) event->add(getNext());
182-
}
181+
db.get("\0"s, cb);
183182

183+
} else if (results.isSet()) {
184+
// Process a batch of results
185+
LOG_DEBUG(4, "Processing result batch");
184186

185-
void TimeseriesHandler::process() {
186-
if (results.isSet()) {
187187
try {
188188
for (unsigned i = 0; i < 1000; i++) {
189189
if (it == results->end()) {
190190
LOG_DEBUG(3, "Done " << name);
191+
if (batch.isSet()) {
192+
auto cb = [=] () {
193+
batch->set("\0"s, last->toString());
194+
batch->commit();
195+
};
196+
197+
batch.release();
198+
db.getPool()->submit(0, cb);
199+
}
191200
results.release();
192201
break;
193202
}
194203

195204
auto result = *it++;
196205
string key = resolveKey(*result);
197206

198-
// Don't bother storing key data
207+
// Don't store key data
199208
for (auto it: *this->key)
200209
result->erase(it->asString());
201210

202211
// If there's only one key store just its value
203212
if (result->size() == 1) result = *result->begin();
204213

205-
get(key)->add(resultsTime, result);
214+
// Don't record if result is unchanged
215+
auto it = last->find(key);
216+
if (it != last->end() && **it == *result) continue;
217+
last->insert(key, result);
218+
219+
if (batch.isNull()) batch = new LevelDB::Batch(db.batch());
220+
batch->set(key + "\0"s + resultsTimeKey, result->toString());
221+
get(key)->broadcast(resultsTime, result);
206222
}
207223
} catch (const Exception &e) {
208224
LOG_ERROR(e);
@@ -212,13 +228,75 @@ void TimeseriesHandler::process() {
212228

213229
schedule();
214230

215-
} else query(getTimePeriod(Time::now()));
231+
} else query(getTimePeriod(Time::now())); // Load next timeseries result
232+
}
233+
234+
235+
void TimeseriesHandler::query(uint64_t time) {
236+
auto cb = [=] (HTTP::Status status, const JSON::ValuePtr &results) {
237+
if (status == HTTP::Status::HTTP_OK) try {
238+
resultsTimeKey = Time(getTimePeriod(time)).toString(TIME_FMT);
239+
240+
if (ret != "list") {
241+
if (*last != *results) {
242+
last = results;
243+
db.set("\0"s, last->toString());
244+
db.set("\0"s + resultsTimeKey, results->toString());
245+
get("")->broadcast(time, results);
246+
}
247+
248+
} else {
249+
LOG_DEBUG(3, "Storing results");
250+
this->results = results;
251+
resultsTime = time;
252+
it = results->begin();
253+
}
254+
} CATCH_ERROR;
255+
256+
schedule();
257+
};
258+
259+
LOG_DEBUG(3, "Querying");
260+
LOG_DEBUG(5, "query: " << sql);
261+
262+
query(sql, cb);
263+
}
264+
265+
266+
void TimeseriesHandler::action(const CtxPtr &ctx) {
267+
// Resolve variables
268+
auto resolver = ctx->getResolver();
269+
auto action = resolver->selectString("args.action", "query");
270+
auto since = resolver->selectTime("args.since", 0);
271+
auto maxCount = resolver->selectU64("args.max_count", 0);
272+
273+
// Get Timeseries
274+
auto key = ret == "list" ? resolveKey(*resolver->select("args")) : "";
275+
auto ts = get(key, action != "unsubscribe");
276+
if (ts.isNull()) return;
277+
278+
// Execute action
279+
auto cb = [this, ctx] (
280+
const SmartPointer<Exception> &err, const JSON::ValuePtr &data) {
281+
if (err.isNull()) ctx->reply(data);
282+
else ctx->reply(*err);
283+
};
284+
285+
if (action == "query") return ts->query(since, maxCount, cb);
286+
287+
auto ws = ctx->getWebsocket();
288+
if (ws.isSet()) {
289+
if (action == "subscribe") return ws->subscribe(*ts, since, maxCount, cb);
290+
if (action == "unsubscribe") return ws->unsubscribe(*ts);
291+
}
292+
293+
THROW("Unrecognized timeseries action '" << action << "'");
216294
}
217295

218296

219297
SmartPointer<MariaDB::EventDB> TimeseriesHandler::getDBConnection() const {
220298
auto db = QueryDef::getDBConnection();
221-
db->setPriority(5); // Lower priority to avoid blocking regular API requests
299+
db->setPriority(8); // Lower priority to avoid blocking regular API requests
222300
return db;
223301
}
224302

src/cbang/api/handler/TimeseriesHandler.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,30 @@ namespace cb {
5252

5353
std::map<std::string, SmartPointer<Timeseries>> series;
5454

55-
JSON::ValuePtr results;
55+
JSON::ValuePtr last;
56+
JSON::ValuePtr results;
5657
JSON::Value::iterator it;
57-
uint64_t resultsTime = 0;
58+
uint64_t resultsTime = 0;
59+
std::string resultsTimeKey;
60+
SmartPointer<LevelDB::Batch> batch;
5861

5962
TimeseriesHandler(
6063
API &api, const std::string &name, const JSON::ValuePtr &config);
6164

6265
const std::string &getName() const {return name;}
66+
67+
protected:
6368
std::string resolveKey(const JSON::Value &dict) const;
6469
uint64_t getTimePeriod(uint64_t ts) const {return (ts / period) * period;}
6570
double getNext() const;
66-
67-
protected:
6871
SmartPointer<Timeseries> get(const std::string &key, bool create = true);
69-
void action(const CtxPtr &ctx);
70-
void load();
72+
7173
void schedule();
72-
using QueryDef::query;
73-
void query(uint64_t time);
7474
void process();
7575

76+
using QueryDef::query;
77+
void query(uint64_t time);
78+
void action(const CtxPtr &ctx);
7679

7780
// From QueryDef
7881
SmartPointer<MariaDB::EventDB> getDBConnection() const override;

0 commit comments

Comments
 (0)