Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit 9d61fc5

Browse files
author
João Barbosa
committed
Add callbacks to Message methods
1 parent 36ec646 commit 9d61fc5

3 files changed

Lines changed: 37 additions & 22 deletions

File tree

Readme.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,17 @@ Close the writer's connection(s) and fire the optional [fn] when completed.
151151

152152
A single message.
153153

154-
### Message#finish()
154+
### Message#finish([fn])
155155

156-
Mark message as complete.
156+
Mark message as complete..
157157

158-
### Message#requeue([delay])
158+
### Message#requeue([delay], [fn])
159159

160160
Re-queue the message immediately, or with the
161161
given `delay` in milliseconds, or a string such
162162
as "5s", "10m" etc.
163163

164-
### Message#touch()
164+
### Message#touch([fn])
165165

166166
Reset the message's timeout, increasing the length
167167
of time before NSQD considers it timed out.

lib/connection.js

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -341,13 +341,17 @@ Connection.prototype.ready = function(n){
341341
* @api private
342342
*/
343343

344-
Connection.prototype.finish = function(id){
344+
Connection.prototype.finish = function(id, fn){
345345
assertValidMessageId(id);
346-
var fn = this.onerror;
347346
var self = this;
348-
if (!this._ready) return fn(new Error('cannot finish, connection not ready'));
349-
this.command('FIN', [id], function(){
347+
if (!this._ready) {
348+
var err = new Error('cannot finish, connection not ready');
349+
if (fn) fn(err);
350+
return this.onerror(err);
351+
}
352+
this.command('FIN', [id], function(err){
350353
self.emit('finish', id);
354+
if (fn) fn(err);
351355
--self.inFlight;
352356
});
353357
};
@@ -361,13 +365,17 @@ Connection.prototype.finish = function(id){
361365
* @api private
362366
*/
363367

364-
Connection.prototype.requeue = function(id, timeout){
368+
Connection.prototype.requeue = function(id, timeout, fn){
365369
assertValidMessageId(id);
366-
var fn = this.onerror;
367370
var self = this;
368-
if (!this._ready) return fn(new Error('cannot requeue, connection not ready'));
369-
this.command('REQ', [id, timeout || 0], function(){
371+
if (!this._ready) {
372+
var err = new Error('cannot requeue, connection not ready');
373+
if (fn) fn(err);
374+
return this.onerror(err);
375+
}
376+
this.command('REQ', [id, timeout || 0], function(err){
370377
self.emit('requeue', id);
378+
if (fn) fn(err);
371379
--self.inFlight;
372380
});
373381
};
@@ -379,13 +387,17 @@ Connection.prototype.requeue = function(id, timeout){
379387
* @api private
380388
*/
381389

382-
Connection.prototype.touch = function(id){
390+
Connection.prototype.touch = function(id, fn){
383391
assertValidMessageId(id);
384-
var fn = this.onerror;
385392
var self = this;
386-
if (!this._ready) return fn(new Error('cannot touch, connection not ready'));
387-
this.command('TOUCH', [id], function(){
393+
if (!this._ready) {
394+
var err = new Error('cannot touch, connection not ready');
395+
if (fn) fn(err);
396+
return this.onerror(err);
397+
}
398+
this.command('TOUCH', [id], function(err){
388399
self.emit('touch', id);
400+
if (fn) fn(err);
389401
});
390402
};
391403

lib/message.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ function Message(body, conn) {
4141
/**
4242
* Mark the message as finished.
4343
*
44+
* @param {Function} [fn]
4445
* @api public
4546
*/
4647

47-
Message.prototype.finish = function(){
48+
Message.prototype.finish = function(fn){
4849
this.responded = true;
49-
this.conn.finish(this.id);
50+
this.conn.finish(this.id, fn);
5051
this.trace('message:finish', { msg: this });
5152
};
5253

@@ -77,25 +78,27 @@ Message.prototype.timedout = function(){
7778
* Re-queue the message with optional `delay`.
7879
*
7980
* @param {Number|String} [delay]
81+
* @param {Function} [fn]
8082
* @api public
8183
*/
8284

83-
Message.prototype.requeue = function(delay){
85+
Message.prototype.requeue = function(delay, fn){
8486
if ('string' == typeof delay) delay = ms(delay);
8587
this.responded = true;
86-
this.conn.requeue(this.id, delay);
88+
this.conn.requeue(this.id, delay, fn);
8789
this.trace('message:requeue', { msg: this });
8890
};
8991

9092
/**
9193
* Touch the message.
9294
*
95+
* @param {Function} [fn]
9396
* @api public
9497
*/
9598

96-
Message.prototype.touch = function(){
99+
Message.prototype.touch = function(fn){
97100
this.lastTouch = Date.now();
98-
this.conn.touch(this.id);
101+
this.conn.touch(this.id, fn);
99102
this.trace('message:touch', { msg: this });
100103
};
101104

0 commit comments

Comments
 (0)