This repository was archived by the owner on Jul 28, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathscribe.js
More file actions
208 lines (177 loc) · 4.95 KB
/
scribe.js
File metadata and controls
208 lines (177 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/*!
* Node-scribe (scribe in npmjs)
* Scribe client for node.js
* Copyright(c) 2011 Applifier Ltd
* MIT Licensed
*/
/**
* Module dependencies.
*/
var util = require('util');
var events = require('events');
var thrift = require('thrift');
var scribe = require('./gen-nodejs/scribe');
var scribe_types = require('./gen-nodejs/scribe_types');
/**
* Constructor for the scribe client.
@param {String} host Hostname of scribe server
@param {Number} port The port where scribe server is running
@param {Object} options Options (etc. autoReconnect {Boolean}, autoReconnectTimeout {Number} or [{Number}], other types
will result in error)
@type Scribe scribe client
*/
var Scribe = exports.Scribe = function(host, port, opt) {
var self = this;
this.host = host;
this.port = Number(port);
this.autoReconnect = opt && opt.autoReconnect ? true : false;
this.autoReconnectTimeout = [];
this.reconnecting = false;
this.retries = 0;
this.opened = false;
if (opt && opt.autoReconnectTimeout) {
if (opt.autoReconnectTimeout.constructor === Number) {
this.autoReconnectTimeout = [opt.autoReconnectTimeout];
} else if (opt.autoReconnectTimeout.constructor === Array) {
for (var i = 0; i < opt.autoReconnectTimeout.length; i++) {
var timeout = opt.autoReconnectTimeout[i];
if (timeout.constructor !== Number) {
throw new Error("autoReconnectTimeout should be Number or Array of numbers");
}
this.autoReconnectTimeout.push(timeout);
}
} else {
throw new Error("autoReconnectTimeout should be Number or Array of numbers");
}
}
this.client = null;
this.connection = null;
this.queue = [];
// Define getters
this.__defineGetter__("connected", function() {
if (self.connection != null) {
return self.connection.connected;
} else {
return false;
}
});
this.__defineGetter__("writable", function() {
if (self.connection != null) {
return self.connection.connection.writable;
} else {
return false;
}
});
};
util.inherits(Scribe, events.EventEmitter);
/**
* Open connection to scribe server
@param {Function} callback which is called when connection is opened or and error occures
*/
Scribe.prototype.open = function(callback) {
var self = this;
this.opened = true;
this.connection = thrift.createConnection(this.host, this.port);
this.client = thrift.createClient(scribe, this.connection);
this.connection.once('error', function(err) {
if (callback) {
callback(err, self);
callback = null;
}
});
this.connection.once('connect', function() {
self.retries = 0;
self.emit('connect');
// Flush queue if not empty
if (self.queue.length > 0) {
self.flush();
}
if (callback) {
callback(null, self);
callback = null;
}
});
this.connection.on('error', function(err) {
self.processError(err);
});
};
/**
* Close connection
*/
Scribe.prototype.close = function() {
this.opened = false;
this.connection.end();
};
Scribe.prototype.flush = function() {
var self = this;
// Dont try to flush ih connection is not writable
if (!this.writable) {
// If connection was opened previously try to get it back on
if (this.opened && this.autoReconnect) {
this.retryConnection();
}
return;
}
// Dont flush if queue is empty
if (this.queue.length == 0) {
return;
}
var queue = this.queue;
this.queue = [];
this.client.Log(queue, function(err, resultCode) {
// If resultCode is 1 (0 = OK, 1 = Try again) add items back to queue
if (resultCode === 1 || err) {
self.queue = self.queue.concat(queue);
// Auto flush in 3 seconds
setTimeout(function() {
self.flush();
}, 3000);
}
});
};
Scribe.prototype.processError = function(err) {
this.emit('error', err);
// If autoreconnect has been enabled, try connecting
if (this.autoReconnect && !this.writable) {
this.retryConnection();
}
};
Scribe.prototype.retryConnection = function() {
if (this.reconnecting) {
return;
}
var self = this;
this.reconnecting = true;
var timeout = 3000;
if (this.autoReconnectTimeout.length > 0) {
if (this.retries < this.autoReconnectTimeout.length) {
timeout = this.autoReconnectTimeout[this.retries];
} else {
timeout = this.autoReconnectTimeout[this.autoReconnectTimeout.length - 1];
}
}
setTimeout(function() {
self.emit('reconnecting');
self.open(function() {
self.reconnecting = false;
});
}, timeout);
this.retries++;
};
/**
* Send log entry to scribe server
@param {String} category Log category
@param {String|Array} message Log message
*/
Scribe.prototype.send = function(category, messages) {
if (messages.constructor !== Array) {
messages = [messages];
}
for (var i = 0, len = messages.length; i < len; i++) {
this.queue.push(new scribe_types.LogEntry({
category : category,
message : messages[i]
}));
}
this.flush();
};