Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distribution/all/groups.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const distribution = global.distribution;
let distribution = global.distribution;

function DistributedGroupsService(groupInfo) {
this.context = {};
Expand Down Expand Up @@ -35,6 +35,7 @@ DistributedGroupsService.prototype.get = function(gid, cb) {
* @param {Function} cb
*/
DistributedGroupsService.prototype.put = function(group, nodes, cb) {
distribution = global.distribution;
distribution.local.groups.put(group, nodes, (err, val) => {
const remote = {service: 'groups', method: 'put'};
distribution[this.context.gid].comm.send([group, nodes], remote, (e, v) => {
Expand Down
2 changes: 2 additions & 0 deletions distribution/local/local.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const mem = require('./mem');
// persistent
const store = require('./store');

const query = require('./query');

/* Routes Service */

Expand All @@ -56,6 +57,7 @@ routes.put(rpc, 'rpc', () => {});
routes.put(gossip, 'gossip', () => {});
routes.put(mem, 'mem', () => {});
routes.put(store, 'store', () => {});
routes.put(query, 'query', () => {});

module.exports = {
status: status,
Expand Down
27 changes: 22 additions & 5 deletions distribution/local/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const http = require('http');
const {routes} = require('./local');
const util = require('../util/util');
const node = global.nodeConfig;

const groupsTemplate = require('../all/groups');
const start = function(started) {
const server = http.createServer((req, res) => {
// Callback function to send back response
Expand Down Expand Up @@ -49,13 +49,30 @@ const start = function(started) {
});
});
});
const crawlerSetup = function (cb) {

// WE NEED TO CHANGE THIS TO MATCH FOR AWS INSTANCES. WE ALSO HAVE TO DEPLOY THIS NODE LAST OR HAVE THIS NODE SPAWN THE OTHERS
if (node.port === 8080) {
const crwalerGroup = {};
const n1 = {ip: '127.0.0.1', port: 7110};
const n2 = {ip: '127.0.0.1', port: 7111};
const n3 = {ip: '127.0.0.1', port: 7112};
crwalerGroup[global.distribution.util.id.getSID(n1)] = n1;
crwalerGroup[global.distribution.util.id.getSID(n2)] = n2;
crwalerGroup[global.distribution.util.id.getSID(n3)] = n3;
groupsTemplate({gid: 'crawler'}).put({gid: 'crawler'}, crwalerGroup, (e, v) => {
cb();
});
}
else {
cb()
}
}
server.listen(node.port, node.ip, () => {
global.nodeServer = server;
if (node.port === 8000) {
// console.log('node:', started.toString());
}
started(server);
crawlerSetup(()=> {
started(server);
})
});
};

Expand Down
34 changes: 34 additions & 0 deletions distribution/local/query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const queryWorkflow = require('../workflow/query');
function QueryService() {
// {serviceName: serviceObj}
this.hashed = {
};
}

function defaultCallback(error, value) {
if (error) {
console.log(error);
} else {
console.log(value);
}
}

QueryService.prototype.get = function(queryInput, cb=defaultCallback) {
// global.distribution["crawler"].mem.put(key, (e, v) => {
// global.distribution["crawler"].store.get(null, (e, v)=> {
// const filteredArray = array.filter(element => element.startsWith("index-"));
// queryConfig = {
// gid : 'crawler',
// keys : filtered_list,
// }
// queryService = queryWorkflow(queryConfig);
// global.distribution["crawler"].mr.exec(queryService, (e, v) => {
// cb(e, v);
// })
// });
// });
cb(null, "Further implementation required");
}

const query = new QueryService();
module.exports = query;