|
|
@ -229,14 +229,8 @@ global.pipelines=[]; |
|
|
|
resobarray.push(resob1) |
|
|
|
} |
|
|
|
resob.data = resobarray |
|
|
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
|
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
|
|
|
|
|
|
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
|
|
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
|
|
|
|
|
|
|
//console.log('info', JSON.stringify(resdata));
|
|
|
|
//console.log('info------------- ', JSON.stringify(global.pipelines));
|
|
|
|
})() |
|
|
|
} |
|
|
|
|
|
|
@ -400,21 +394,58 @@ function onCollection(err, collection) { |
|
|
|
tailableRetryInterval: 500 |
|
|
|
}; |
|
|
|
var cursor = collection.find({},options).stream(); |
|
|
|
//var cursor = collection.find({}, options),
|
|
|
|
// cursorStream = cursor.stream(),
|
|
|
|
var itemsProcessed = 0; |
|
|
|
|
|
|
|
var reslog = new Object(); |
|
|
|
var now = new Date(); |
|
|
|
//cursorStream.on('data', function (data) {
|
|
|
|
cursor.on('data', function (data) { |
|
|
|
|
|
|
|
var pathfileval = pathmodule.basename(data.tailed_path); |
|
|
|
var arrfile = pathfileval.toString().split("-"); |
|
|
|
var pathfile = arrfile[0]; |
|
|
|
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
|
|
var resob = {} |
|
|
|
if (indexfind1 === -1 ){ |
|
|
|
(async() => { |
|
|
|
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) |
|
|
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
|
|
var resdata = await getpipelines(token,pathfile) |
|
|
|
resob.pathlogfile = pathfileval |
|
|
|
var resobarray = [] |
|
|
|
for (let i in resdata.data) { |
|
|
|
var resob1 = {} |
|
|
|
resob1.data = resdata.data[i].res25swarmlabname |
|
|
|
resob1.user25user = resdata.data[i].res25user |
|
|
|
resob1.res25creator = resdata.data[i].res25creator |
|
|
|
resob1.res25fileforce = resdata.data[i].res25fileforce |
|
|
|
resobarray.push(resob1) |
|
|
|
} |
|
|
|
resob.data = resobarray |
|
|
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
|
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
|
|
|
|
|
|
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
|
|
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
|
|
|
|
|
|
|
reslog.log = data |
|
|
|
reslog.date = convertDateToUTC(now) |
|
|
|
console.log('data ' + JSON.stringify(reslog)); |
|
|
|
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); |
|
|
|
var user = usertmp.data[0].user25user; |
|
|
|
io.in(user).emit("logdata", reslog); |
|
|
|
itemsProcessed++; |
|
|
|
|
|
|
|
})() |
|
|
|
}else{ |
|
|
|
|
|
|
|
reslog.log = data |
|
|
|
reslog.date = convertDateToUTC(now) |
|
|
|
console.log('data ' + JSON.stringify(reslog)); |
|
|
|
var user = "anagnostopoulos@uniwa.gr" |
|
|
|
//io.in(user).emit("logdata", data.value);
|
|
|
|
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); |
|
|
|
var user = usertmp.data[0].user25user; |
|
|
|
io.in(user).emit("logdata", reslog); |
|
|
|
itemsProcessed++; |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
@ -442,20 +473,8 @@ function onCollection(err, collection) { |
|
|
|
} else { |
|
|
|
const db = client.db('fluent'); |
|
|
|
db.collection('logs', onCollection); |
|
|
|
/* |
|
|
|
var stream = db.collection('logs').find({}, { |
|
|
|
tailable: true, |
|
|
|
awaitdata: true |
|
|
|
//other options
|
|
|
|
}).stream(); |
|
|
|
|
|
|
|
stream.on('data', function (doc) { |
|
|
|
console.log('doc ' + JSON.stringify(doc)) |
|
|
|
}); |
|
|
|
*/ |
|
|
|
|
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
io.on('connection', s => { |
|
|
|
console.error('socket connection'); |
|
|
|