-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmainquery.js
More file actions
100 lines (93 loc) · 2.37 KB
/
mainquery.js
File metadata and controls
100 lines (93 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
var _ = require('highland');
var QueryExecutor = require('./query-executor');
// var query = {
// select:['remote','host','method','code'],
// eval: {
// val1: {
// rolling: {
// evaluate: 'average',
// over: {
// count: 10
// },
// on: 'code' // measure
// }
// },
// val2: {
// rolling: {
// evaluate: 'average',
// over: {
// count: 10
// },
// on: 'insertions'
// }
// },
// },
// project: {
// // $highlight: {$condition: {val1: {$eq: eval['val2']}}}
// $highlight: {$condition: 'val1 ==val2'}
// },
// to: 'streamB'
// };
var WebSocketClient = require('websocket').client;
var WebSocket1 = new WebSocketClient();
var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port: 9000});
var serverWs;
wss.on('connection', function(ws) {
//console.log('CONNECTED');
// ws.send('Connected');
serverWs = ws;
//ws.setMaxListeners(ws.getMaxListeners() + 1);
});
var isClientConnected = false;
var stream;
var con;
var pipeline;
function bootstrapStream() {
console.log("connected in mainQuery");
stream=_('message',con);
stream.pipe(_.pipeline(_.map(function(msg) {
streamData=msg;
console.log('data received ');
return JSON.parse(streamData.utf8Data);
})
))
.pipe(pipeline)
.pipe(_.pipeline(
_.map(function(msg) {
if(serverWs) {
serverWs.send(JSON.stringify(msg));
}
})
)).done();
}
module.exports=function (queryy) {
var executor = new QueryExecutor(queryy);
pipeline = executor.getPipeline();
if (isClientConnected) {
// isClientConnected=false;
console.log('before stream destroyed ');
// for (var variable in stream) {
// console.log('>>> '+variable);
// }
stream.end(function() {
console.log('after stream destroyed');
bootstrapStream();
});
console.log('after stream destroyed');
}
else{
WebSocket1.on('connect', function(connection) {
isClientConnected = true;
console.log(" Connected..Waiting for some message inQuery");
var streamData = {};
connection.on('close', function() {
console.log('connection closed ***');
});
con=connection;
bootstrapStream();
// connection.setMaxListeners(connection.getMaxListeners() + 1);
});
WebSocket1.connect('ws://localhost:5050');
}
}