Skip to content

Commit ca779f3

Browse files
committed
WORKFLOW-199: polling every X seconds for job completion
1 parent 8b370c0 commit ca779f3

3 files changed

Lines changed: 179 additions & 109 deletions

File tree

lib/api.js

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ var API = module.exports = function (opts) {
2626

2727
var log;
2828

29+
var waiting = {};
30+
2931
if (opts.log) {
3032
log = opts.log({
3133
component: 'workflow-api'
@@ -72,6 +74,12 @@ var API = module.exports = function (opts) {
7274
version: '0.1.0'
7375
};
7476

77+
var JOB_DONE_PATH = '/jobdone';
78+
var JOB_DONE_ROUTE = {
79+
path: JOB_DONE_PATH,
80+
version: '0.1.0'
81+
};
82+
7583
var JOBS_PATH = '/jobs';
7684
var JOB_PATH = JOBS_PATH + '/:uuid';
7785
var JOBS_ROUTE = {
@@ -279,7 +287,10 @@ var API = module.exports = function (opts) {
279287

280288
return factory.workflow(workflow, meta, function (err, result) {
281289
if (err) {
282-
return next(err.toRestError);
290+
if (typeof (err) === 'string')
291+
return next(new Error(err));
292+
else
293+
return next(err.toRestError || err);
283294
}
284295
res.header('Location', req.path() + '/' + result.uuid);
285296
// If Request-Id hasn't been set, we'll set it to workflow UUID:
@@ -497,8 +508,8 @@ var API = module.exports = function (opts) {
497508
params: {}
498509
};
499510
var meta = {};
500-
var members = ['exec_after', 'workflow', 'target', 'num_attempts',
501-
'uuid', 'locks'];
511+
var members = ['callback_urls', 'exec_after', 'workflow', 'target',
512+
'num_attempts', 'uuid', 'locks'];
502513

503514
var job_members = [];
504515
if (typeof (opts.api.job_extra_params) !== 'undefined') {
@@ -534,9 +545,23 @@ var API = module.exports = function (opts) {
534545
res.header('request-id', result.uuid);
535546
}
536547
res.header('Location', req.path() + '/' + result.uuid);
548+
537549
res.status(201);
538-
res.send(result);
539-
return next();
550+
if (req.params.wait) {
551+
log.info({req: req, job: result.uuid},
552+
'holding onto request for job %s', result.uuid);
553+
waiting[result.uuid] = {
554+
req: req,
555+
res: res,
556+
next: next
557+
};
558+
// flush headers (so they have the UUID as the location header)
559+
res.write('\n');
560+
return undefined;
561+
} else {
562+
res.send(result);
563+
return next();
564+
}
540565
});
541566
}
542567

@@ -677,6 +702,29 @@ var API = module.exports = function (opts) {
677702
}
678703
});
679704
}
705+
706+
function jobDone(req, res, next) {
707+
var job = req.params;
708+
// end the incoming request
709+
res.send(job.uuid ? 200 : 400);
710+
next();
711+
712+
// end any waiting requests
713+
if (job.uuid && waiting[job.uuid]) {
714+
log.info('ending waiting request for %s', job.uuid);
715+
var o = waiting[job.uuid];
716+
717+
// res.send and res.format won't work here because headers were
718+
// flushed by writing a newline to the socket. instead, we format
719+
// the data ourselves and ship it off by assuming the user wanted
720+
// JSON. this is really lame. XXX
721+
o.res.end(JSON.stringify(job));
722+
o.next();
723+
delete waiting[job.uuid];
724+
}
725+
726+
}
727+
680728
// --- Routes
681729
// Workflows:
682730
server.get(WORKFLOWS_ROUTE, listWorkflows);
@@ -706,6 +754,8 @@ var API = module.exports = function (opts) {
706754
server.get(JOB_INFO_ROUTE, getInfo);
707755
server.head(JOB_INFO_ROUTE, getInfo);
708756
server.post(JOB_INFO_ROUTE, postInfo);
757+
// Job done (callback):
758+
server.post(JOB_DONE_ROUTE, jobDone);
709759
// Ping:
710760
server.get(PING_ROUTE, function (req, res, next) {
711761
var data = {

lib/job-runner.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ var WorkflowJobRunner = module.exports = function (opts) {
6262
// pointer to child process forked by runTask
6363
var child = null;
6464
// Properties of job object which a task should not be allowed to modify:
65-
// Properties of job object which a task should not be allowed to modify:
6665
var frozen_props = [
67-
'chain', 'chain_results', 'onerror', 'onerror_results',
66+
'callback_urls', 'chain', 'chain_results', 'onerror', 'onerror_results',
6867
'exec_after', 'timeout', 'elapsed', 'uuid', 'workflow_uuid',
6968
'name', 'execution', 'num_attempts', 'max_attempts', 'initial_delay',
7069
'max_delay', 'prev_attempt', 'oncancel', 'oncancel_results',
@@ -90,6 +89,10 @@ var WorkflowJobRunner = module.exports = function (opts) {
9089
job.chain_results = [];
9190
}
9291

92+
if (!job.callback_urls) {
93+
job.callback_urls = [];
94+
}
95+
9396
if (job.onerror && !job.onerror_results) {
9497
job.onerror_results = [];
9598
}

0 commit comments

Comments
 (0)