diff --git a/swarmlab-app/src/run/app.js b/swarmlab-app/src/run/app.js index 08a0ab8..fe5df33 100755 --- a/swarmlab-app/src/run/app.js +++ b/swarmlab-app/src/run/app.js @@ -1,29 +1,28 @@ -"use strict"; +"use strict" -var pathmodule = require("path"); -var app = require("express")(); -var http = require("http").Server(app); -var https = require("https"); -var CONFIG = require(pathmodule.resolve(__dirname, "runconfig.js")); +var pathmodule = require('path'); +var app = require('express')(); +var http = require('http').Server(app); +var https = require('https'); +var CONFIG = require( pathmodule.resolve( __dirname, "runconfig.js" ) ); const io = require("socket.io")(http, { - // pingTimeout: 30000, - // allowUpgrades: false, - // serveClient: false, - // pingInterval: 10000, - // //transports: [ 'websocket', 'polling' ], - - // transports: [ 'polling', 'websocket' ], +// pingTimeout: 30000, +// allowUpgrades: false, +// serveClient: false, +// pingInterval: 10000, +// //transports: [ 'websocket', 'polling' ], +// transports: [ 'polling', 'websocket' ], cors: { origin: "http://localhost:8080", methods: ["GET", "POST"], allowedHeaders: ["my-custom-header"], - credentials: true, + credentials: true }, cookie: { name: "test", httpOnly: false, - path: "/custom", - }, + path: "/custom" + } }); /* @@ -38,72 +37,77 @@ const pubtest = new Redis({ }); */ + //import { createAdapter } from 'socket.io-redis'; -const createAdapter = require("socket.io-redis"); +const createAdapter = require('socket.io-redis'); //const RedisClient = require("redis"); const Redis = require("ioredis"); //const pubClient = RedisClient.createClient({ const pubClient = new Redis({ - host: "redisserver", - port: 6379, -}); + host: 'redisserver', + port: 6379, + }); //const pubClient = new RedisClient({ host: 'localhost', port: 6379 }); const subClient = pubClient.duplicate(); io.adapter(createAdapter({ pubClient, subClient })); -pubClient.on("connect", function () { + +pubClient.on("connect", function() { console.log("You are now connected"); }); -const MongoClient = require("mongodb").MongoClient; +const MongoClient = require('mongodb').MongoClient; const { DateTime } = require("luxon"); -var async = require("async"); -const { check, validationResult } = require("express-validator"); + + var async = require("async"); +const { check, validationResult } = require('express-validator'); const urlExistSync = require("url-exist-sync"); -var express = require("express"); +var express = require('express'); app.use(express.json()); -const axios = require("axios"); -axios.defaults.timeout = 30000; +const axios = require('axios'); +axios.defaults.timeout = 30000 -const helmet = require("helmet"); + const helmet = require('helmet'); app.use(helmet()); -const cors = require("cors"); +const cors = require('cors') const whitelist = [ - "http://localhost:8080", - "http://localhost:3080", - "http://localhost:3081", - "http://localhost:3082", -]; + 'http://localhost:8080', + 'http://localhost:3080', + 'http://localhost:3081', + 'http://localhost:3082' + ] const corsOptions = { credentials: true, - methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"], + methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'], optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204 allowedHeaders: [ - "Content-Type", - "Authorization", - "X-Requested-With", - "device-remember-token", - "Access-Control-Allow-Origin", - "Access-Control-Allow-Headers", - "Origin", - "Accept", + 'Content-Type', + 'Authorization', + 'X-Requested-With', + 'device-remember-token', + 'Access-Control-Allow-Origin', + 'Access-Control-Allow-Headers', + 'Origin', + 'Accept' ], - origin: function (origin, callback) { + origin: function(origin, callback) { if (whitelist.indexOf(origin) !== -1) { - callback(null, true); + callback(null, true) } else { - callback(null, true); + callback(null, true) //callback(new Error('Not allowed by CORS')) } - }, -}; + } +} + + // *************************************************** // checktoken @@ -113,82 +117,74 @@ async function checkToken(token) { const agent = new https.Agent({ rejectUnauthorized: false, }); - const instance = axios.create({ - baseURL: "https://api.swarmlab.io", + const instance = axios.create({ + baseURL: 'https://api.swarmlab.io', withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { - Accept: "application/json", - "Content-Type": "multipart/form-data", - Authorization: "Bearer " + token, - }, - }); - try { - var pipelines = { - source: "ssologin", - }; - var params = { - pipeline: pipelines, - }; + 'Accept': 'application/json', + 'Content-Type': 'multipart/form-data', + 'Authorization': 'Bearer '+token + } + }) + try { + var pipelines = { + "source":'ssologin' + } + var params = { + pipeline: pipelines + } - var options = { - headers: { - "content-type": "application/x-www-form-urlencoded", - Authorization: `Bearer ${token}`, - }, - }; + var options = { + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; - instance.defaults.timeout = 30000; - const res = await instance.post("/istokenvalidsso", params, options); - if (res.status == 200) { - //console.log("check " +JSON.stringify(res.data)) - return res.data; - } else { - console.log("noerror: " + res); - return res.status; + instance.defaults.timeout = 30000; + const res = await instance.post('/istokenvalidsso',params,options); + if(res.status == 200){ + //console.log("check " +JSON.stringify(res.data)) + return res.data + }else{ + console.log("noerror: " + res) + return res.status + + } + } + catch (err) { + console.error("error: "+err); + var error = new Object(); + error.action = '401' + return error } - } catch (err) { - console.error("error: " + err); - var error = new Object(); - error.action = "401"; - return error; - } } + function convertDateToUTC(date) { - return new Date( - date.getUTCFullYear(), - date.getUTCMonth(), - date.getUTCDate(), - date.getUTCHours(), - date.getUTCMinutes(), - date.getUTCSeconds(), - date.getUTCMilliseconds() - ); +return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds()); } // *************************************************** // get pipelines // *************************************************** -async function getpipelines(token, pipelinename) { +async function getpipelines(token,pipelinename) { const agent = new https.Agent({ rejectUnauthorized: false, }); - const instance = axios.create({ - baseURL: "https://api.swarmlab.io", + const instance = axios.create({ + baseURL: 'https://api.swarmlab.io', withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { - Accept: "application/json", - "Content-Type": "multipart/form-data", - Authorization: "Bearer " + token, - }, - }); + 'Accept': 'application/json', + 'Content-Type': 'multipart/form-data', + 'Authorization': 'Bearer '+token + } + }) /* var params = { playbook: value @@ -201,498 +197,449 @@ async function getpipelines(token, pipelinename) { const playbook = await api.GET('playbookCode',options); return playbook */ - try { - var pipelines = { - querytokenFilter: CONFIG.api.token, - filter: pipelinename, - }; - //var params = { - // pipeline: pipelines - // } - var params = { - querytokenFilter: CONFIG.api.token, - filter: pipelinename, - }; + try { - var options = { - params: params, - headers: { - "content-type": "application/x-www-form-urlencoded", - Authorization: `Bearer ${token}`, - }, - }; + var pipelines = { + "querytokenFilter":CONFIG.api.token, + "filter":pipelinename + } + //var params = { + // pipeline: pipelines + // } + var params = { + querytokenFilter:CONFIG.api.token, + filter:pipelinename + } - //https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes - instance.defaults.timeout = 30000; - //const res = await instance.get('/getplaygrounds',params,options); - const res = await instance.get("/getplaygrounds", options); - if (res.status == 200) { - return res.data; - } else { - console.log("noerror: " + res); - return await res.status; + var options = { + params: params, + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; + + //https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes + instance.defaults.timeout = 30000; + //const res = await instance.get('/getplaygrounds',params,options); + const res = await instance.get('/getplaygrounds',options); + if(res.status == 200){ + return res.data + }else{ + console.log("noerror: " + res) + return await res.status + + } + } + catch (err) { + console.error("error: "+err); + var error = new Object(); + error.action = '401' + return await error } - } catch (err) { - console.error("error: " + err); - var error = new Object(); - error.action = "401"; - return await error; - } } // *************************************************** // get user pipelines // *************************************************** -async function getuserpipelines(token, user, swarmlabname) { - var pipelinename = user; +async function getuserpipelines(token,user,swarmlabname) { + var pipelinename = user const agent = new https.Agent({ rejectUnauthorized: false, }); - const instance = axios.create({ - baseURL: "https://api.swarmlab.io", + const instance = axios.create({ + baseURL: 'https://api.swarmlab.io', withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { - Accept: "application/json", - "Content-Type": "multipart/form-data", - Authorization: "Bearer " + token, - }, - }); - try { - var pipelines = { - querytokenFilter: CONFIG.api.token, - filter: pipelinename, - swarmlabname: swarmlabname, - }; - //var params = { - // pipeline: pipelines - // } - var params = { - querytokenFilter: CONFIG.api.token, - filter: pipelinename, - swarmlabname: swarmlabname, - }; + 'Accept': 'application/json', + 'Content-Type': 'multipart/form-data', + 'Authorization': 'Bearer '+token + } + }) + try { - var options = { - params: params, - headers: { - "content-type": "application/x-www-form-urlencoded", - Authorization: `Bearer ${token}`, - }, - }; + var pipelines = { + "querytokenFilter":CONFIG.api.token, + "filter":pipelinename, + swarmlabname:swarmlabname + } + //var params = { + // pipeline: pipelines + // } + var params = { + querytokenFilter:CONFIG.api.token, + filter:pipelinename, + swarmlabname:swarmlabname + } - instance.defaults.timeout = 30000; - const res = await instance.get("/getuserplaygrounds", options); - if (res.status == 200) { - return res.data; - } else { - console.log("noerror: " + res); - return await res.status; + var options = { + params: params, + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; + + instance.defaults.timeout = 30000; + const res = await instance.get('/getuserplaygrounds',options); + if(res.status == 200){ + return res.data + }else{ + console.log("noerror: " + res) + return await res.status + + } + } + catch (err) { + console.error("error: "+err); + var error = new Object(); + error.action = '401' + error.error = err + return await error } - } catch (err) { - console.error("error: " + err); - var error = new Object(); - error.action = "401"; - error.error = err; - return await error; - } } -global.online = "ob"; -global.pipelines = []; - -function sendlog(reslog, pathfileval) { - var usertmp = global.pipelines.find((x) => x.pathlogfile == pathfileval); - //for (var key in usertmp.data){ - var user = usertmp.data[0].user25user; - // if(usertmp.data){ - console.log("-----------------------" + JSON.stringify(usertmp)); - io.in(user).emit("logdata", reslog); - // } - //} -} -function onlogfile(path) { - console.log("File", path, "has been added"); - var pathfileval = pathmodule.basename(path); - var arrfile = pathfileval.toString().split("-"); - var pathfile = arrfile[0]; - var indexfind1 = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - console.log( - "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) - ); - if (indexfind1 === -1) { - (async () => { - console.log( - "file2222222222222222222222222222222222222 " + - JSON.stringify(pathfileval) - ); - var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto - var resdata = await getpipelines(token, pathfile); - //resdata.data.pathlogfile = 'test' - var resob = {}; - resob.pathlogfile = pathfileval; - var resobarray = []; - for (let i in resdata.data) { - var resob1 = {}; - resob1.data = resdata.data[i].res25swarmlabname; - resob1.user25user = resdata.data[i].res25user; - resob1.res25creator = resdata.data[i].res25creator; - resob1.res25fileforce = resdata.data[i].res25fileforce; - resobarray.push(resob1); + +global.online='ob'; +global.pipelines=[]; + + function sendlog(reslog,pathfileval){ + var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); + //for (var key in usertmp.data){ + var user = usertmp.data[0].user25user; + // if(usertmp.data){ + console.log('-----------------------' + JSON.stringify(usertmp)); + io.in(user).emit("logdata", reslog); + // } + //} } - resob.data = resobarray; - var indexfind = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - indexfind === -1 - ? global.pipelines.push(resob) - : console.log("object already exists " + pathfileval); - })(); - } + function onlogfile(path){ + + console.log('File', path, 'has been added'); + var pathfileval = pathmodule.basename(path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) + if (indexfind1 === -1 ){ + (async() => { + console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata = await getpipelines(token,pathfile) + //resdata.data.pathlogfile = 'test' + var resob = {} + resob.pathlogfile = pathfileval + var resobarray = [] + for (let i in resdata.data) { + var resob1 = {} + resob1.data = resdata.data[i].res25swarmlabname + resob1.user25user = resdata.data[i].res25user + resob1.res25creator = resdata.data[i].res25creator + resob1.res25fileforce = resdata.data[i].res25fileforce + resobarray.push(resob1) + } + resob.data = resobarray + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) + })() + } + } // *************************************************** // rest get // *************************************************** -app.get( - "/get_log", - [check("token").isLength({ min: 40 })], - cors(corsOptions), - (req, res, next) => { - (async () => { +app.get('/get_log', [ + check('token').isLength({ min: 40 }) + ], +cors(corsOptions), (req, res, next) => { + + (async() => { var RES = new Object(); - RES.token = req.query["token"]; - RES.start = req.query["start"]; - RES.end = req.query["end"]; - RES.swarmlabname = req.query["swarmlabname"]; - RES.ok = "ok"; - /* - * - * validate - * - */ + RES.token = req.query["token"] + RES.start = req.query["start"] + RES.end = req.query["end"] + RES.swarmlabname = req.query["swarmlabname"] + RES.ok = 'ok' +/* + * + * validate + * + */ + + var isvalid = await checkToken(RES.token); - if (isvalid.action == "ok") { - console.log("Authserver ok " + RES.token); - RES.error = "ok"; - } else { - console.log("Authserver no " + RES.token); - RES.error = "no"; + if(isvalid.action == 'ok'){ + console.log("Authserver ok " + RES.token); + RES.error = 'ok' + }else{ + console.log("Authserver no " + RES.token); + RES.error = 'no' } - if (RES.error == "ok") { - var resdata = await getuserpipelines( - RES.token, - isvalid.user, - RES.swarmlabname - ); - var mongourl = - "mongodb://" + - CONFIG.mongo.user + - ":" + - CONFIG.mongo.password + - "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats"; - const OPTS = { - useNewUrlParser: true, - useUnifiedTopology: true, - }; - MongoClient.connect(mongourl, OPTS, function (err, client) { - if (err) { - console.log(err); - } else { - const db = client.db("fluent"); - //usersession.SOCKET.user = isvalid.user - console.log(JSON.stringify("mongo ----------------connected")); - console.log("-----test------- " + JSON.stringify(RES)); - if ( - typeof RES.start !== "undefined" && - typeof RES.end !== "undefined" - ) { - if (DateTime.fromISO(RES.start).isValid) { - var datestart = DateTime.fromISO(RES.start); - var dateend = DateTime.fromISO(RES.end); - var search_term = { - $and: [ - { - time: { - $gte: datestart, - }, - }, - { - time: { - $lt: dateend, - }, - }, - ], - }; - } else { - RES.ok = "no"; - } - } else if (typeof RES.end !== "undefined") { - var dateend = DateTime.fromISO(RES.end); - if (DateTime.fromISO(RES.end).isValid) { - var search_term = { - $and: [ - { - time: { - $lt: dateend, - }, - }, - ], - }; - } else { - RES.ok = "no"; - } - } else if (typeof RES.start !== "undefined") { - var datestart = DateTime.fromISO(RES.start); - if (DateTime.fromISO(RES.start).isValid) { - var search_term = { - $and: [ - { - time: { - $gte: datestart, - }, - }, - ], - }; - } else { - RES.ok = "no"; - } - } - if (RES.ok == "ok") { - //var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }' - //var search_term = {"time" : {$lte : datenow}} - var resdataarray = []; - var resraw = {}; - var reslab = ""; - var datestart1 = DateTime.fromISO(RES.start); - console.log("-----now1------- " + JSON.stringify(search_term)); - console.log("-----now2------- " + JSON.stringify(datestart1)); - console.log("-----now3------- " + JSON.stringify(datestart)); - - db.collection("logs") - .find(search_term) - .toArray() - //db.collection('logs').find({"time" : {$gt : datestart}}).toArray() - .then((item) => { - console.log("item " + JSON.stringify(item)); - for (let i in item) { - reslab = item[i].tailed_path; - var segment_array = reslab.split("/"); - var last_segment = segment_array.pop(); - var fieldstmp = last_segment.split("-"); - var nameofswarmlab = fieldstmp[0]; - - var regexlog = new RegExp(nameofswarmlab); - for (let ii in resdata.data) { - if (regexlog.test(resdata.data[ii].res25swarmlabname)) { - resdataarray.push(item[i]); - RES.found = item[i]; + if(RES.error == 'ok'){ + + var resdata = await getuserpipelines(RES.token,isvalid.user,RES.swarmlabname) + var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" + const OPTS = { + useNewUrlParser: true, + useUnifiedTopology: true + }; + MongoClient.connect(mongourl, OPTS, function(err, client){ + if(err){ + console.log(err); + } else { + const db = client.db('fluent'); + //usersession.SOCKET.user = isvalid.user + console.log(JSON.stringify('mongo ----------------connected')) + console.log('-----test------- '+JSON.stringify(RES)) + if ((typeof RES.start !== "undefined") && (typeof RES.end !== "undefined")) { + if(DateTime.fromISO(RES.start).isValid){ + var datestart = DateTime.fromISO(RES.start) + var dateend = DateTime.fromISO(RES.end) + var search_term = { + "$and": [ + { + "time": { + $gte: datestart + } + }, + { + "time": { + $lt: dateend + } + }, + ] + } + }else{ + RES.ok = 'no' + } + }else if(typeof RES.end !== "undefined"){ + var dateend = DateTime.fromISO(RES.end) + if(DateTime.fromISO(RES.end).isValid){ + var search_term = { + "$and": [ + { + "time": { + $lt: dateend + } + } + ] } - } + }else{ + RES.ok = 'no' + } + }else if(typeof RES.start !== "undefined"){ + var datestart = DateTime.fromISO(RES.start) + if(DateTime.fromISO(RES.start).isValid){ + var search_term = { + "$and": [ + { + "time": { + $gte: datestart + } + } + ] + } + }else{ + RES.ok = 'no' + } } + if(RES.ok == 'ok'){ + //var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }' + //var search_term = {"time" : {$lte : datenow}} + var resdataarray = [] + var resraw = {} + var reslab = '' + var datestart1 = DateTime.fromISO(RES.start) + console.log('-----now1------- '+JSON.stringify(search_term)) + console.log('-----now2------- '+JSON.stringify(datestart1)) + console.log('-----now3------- '+JSON.stringify(datestart)) + + db.collection('logs').find(search_term).toArray() + //db.collection('logs').find({"time" : {$gt : datestart}}).toArray() + .then(item => { + console.log('item '+JSON.stringify(item)) + for (let i in item) { + reslab = item[i].tailed_path + var segment_array = reslab.split( '/' ); + var last_segment = segment_array.pop(); + var fieldstmp = last_segment.split('-'); + var nameofswarmlab = fieldstmp[0]; + + var regexlog = new RegExp(nameofswarmlab); + for (let ii in resdata.data) { + if( regexlog.test(resdata.data[ii].res25swarmlabname) ){ + resdataarray.push(item[i]) + RES.found = item[i] + } + } + } + + RES.error_msg = "ok" + RES.data = resdataarray + //RES.dataserver = resdataarray + //RES.dataservertmp = resdata + res.json(RES) + }) + .catch(err => { + console.error(err) + RES.error_msg = err + res.json(RES) + }) + } else{ // RES.ok + RES.error_msg = 'no date' + res.json(RES) + } + } // error mongo connect + }); // mongo connect + }else{ // token error + RES.data = 'no' + RES.error_msg = "token err" + res.json(RES) + } + })() - RES.error_msg = "ok"; - RES.data = resdataarray; - //RES.dataserver = resdataarray - //RES.dataservertmp = resdata - res.json(RES); - }) - .catch((err) => { - console.error(err); - RES.error_msg = err; - res.json(RES); - }); - } else { - // RES.ok - RES.error_msg = "no date"; - res.json(RES); - } - } // error mongo connect - }); // mongo connect - } else { - // token error - RES.data = "no"; - RES.error_msg = "token err"; - res.json(RES); - } - })(); - } -); +}); -app.get( - "/run", - [ +app.get('/run', [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], - cors(corsOptions), - (req, res, next) => { - (async () => { - var mongourl = - "mongodb://" + - CONFIG.mongo.user + - ":" + - CONFIG.mongo.password + - "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats"; - const OPTS = { - useNewUrlParser: true, - useUnifiedTopology: true, - }; - MongoClient.connect(mongourl, OPTS, function (err, client) { - if (err) { - console.log(err); - } else { - const db = client.db("fluent"); - //db.collection('log', onCollection); - console.log(JSON.stringify("mongo connected")); - var stream = db - .collection("logs") - .find( - {}, - { - tailable: true, - awaitdata: true, - /* other options */ - } - ) - .stream(); - - stream.on("data", function (doc) { - console.log(JSON.stringify(doc)); - //socket.write(JSON.stringify({'action': 'log','param': doc.log})); - }); - } - }); +cors(corsOptions), (req, res, next) => { + + (async() => { + +var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" +const OPTS = { + useNewUrlParser: true, + useUnifiedTopology: true +}; +MongoClient.connect(mongourl, OPTS, function(err, client){ + if(err){ + console.log(err); + } else { + const db = client.db('fluent'); + //db.collection('log', onCollection); + console.log(JSON.stringify('mongo connected')) + var stream = db.collection('logs').find({}, { + tailable: true, + awaitdata: true + /* other options */ +}).stream(); + +stream.on('data', function (doc) { + console.log(JSON.stringify(doc)) + //socket.write(JSON.stringify({'action': 'log','param': doc.log})); +}); + } +}); var RES = new Object(); - RES.code = req.query["filter"]; - RES.token = req.query["filter"]; + RES.code = req.query["filter"] + RES.token = req.query["filter"] var isvalid = await checkToken(RES.token); - if (isvalid.action == "ok") { - console.log("Authserver ok " + RES.token); - } else { - console.log("Authserver no " + RES.token); + if(isvalid.action == 'ok'){ + console.log("Authserver ok " + RES.token); + }else{ + console.log("Authserver no " + RES.token); } - RES.error = false; - RES.error_msg = "ok"; - res.json(RES); - })(); - } -); + RES.error = false + RES.error_msg = "ok" + res.json(RES) + })() + +}); // *************************************************** // rest post // *************************************************** -app.post( - "/run", - [ +app.post('/run', [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], - cors(corsOptions), - (req, res, next) => { - (async () => { +cors(corsOptions), (req, res, next) => { + + + (async() => { + //console.log(JSON.stringify(req.headers)); //console.log(JSON.stringify(req.body)); //console.log("mongo "+JSON.stringify(req.body)); //console.log("LOG "+JSON.stringify(req.body[0].message)); //console.log("PATH "+JSON.stringify(req.body[0].tailed_path)); - for (var i = 0; i < req.body.length; i++) { + for (var i = 0; i < req.body.length; i++){ //var getpath = await onlogfile(req.body[i].tailed_path) + + var path = req.body[i].tailed_path + + console.log('File', path, 'has been added'); + var pathfileval = pathmodule.basename(path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) + if (indexfind1 === -1 ){ + (async() => { + console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata = await getpipelines(token,pathfile) + //resdata.data.pathlogfile = 'test' + var resob = {} + resob.pathlogfile = pathfileval + var resobarray = [] + for (let i in resdata.data) { + var resob1 = {} + resob1.data = resdata.data[i].res25swarmlabname + resob1.user25user = resdata.data[i].res25user + resob1.res25creator = resdata.data[i].res25creator + resob1.res25fileforce = resdata.data[i].res25fileforce + resobarray.push(resob1) + } + resob.data = resobarray + //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + + //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") + indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) - var path = req.body[i].tailed_path; - - console.log("File", path, "has been added"); - var pathfileval = pathmodule.basename(path); - var arrfile = pathfileval.toString().split("-"); - var pathfile = arrfile[0]; - var indexfind1 = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - console.log( - "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) - ); - if (indexfind1 === -1) { - (async () => { - console.log( - "file2222222222222222222222222222222222222 " + - JSON.stringify(pathfileval) - ); - var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto - var resdata = await getpipelines(token, pathfile); - //resdata.data.pathlogfile = 'test' - var resob = {}; - resob.pathlogfile = pathfileval; - var resobarray = []; - for (let i in resdata.data) { - var resob1 = {}; - resob1.data = resdata.data[i].res25swarmlabname; - resob1.user25user = resdata.data[i].res25user; - resob1.res25creator = resdata.data[i].res25creator; - resob1.res25fileforce = resdata.data[i].res25fileforce; - resobarray.push(resob1); + //console.log('info', JSON.stringify(resdata)); + //console.log('info------------- ', JSON.stringify(global.pipelines)); + })() } - resob.data = resobarray; - //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); - var indexfind = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - - //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") - indexfind === -1 - ? global.pipelines.push(resob) - : console.log("object already exists " + pathfileval); - - //console.log('info', JSON.stringify(resdata)); - //console.log('info------------- ', JSON.stringify(global.pipelines)); - })(); - } // var obj = req.body[i]; - //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname); - //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists") + //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname); + //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists") var now = new Date(); var reslog = new Object(); - reslog.log = obj; + reslog.log = obj - reslog.date = convertDateToUTC(now); + reslog.date = convertDateToUTC(now) console.log(reslog); var pathfileval = pathmodule.basename(reslog.log.tailed_path); - var indexfind = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); //indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists") - indexfind === -1 - ? console.log("object not found") - : sendlog(reslog, pathfileval); - console.log("IOT " + JSON.stringify(reslog.log.tailed_path)); - console.log("IOTindexfind " + JSON.stringify(indexfind)); - console.log("IOTuser " + JSON.stringify(global.pipelines)); - // io.in("iot").emit("message", reslog); - // io.emit("logdata", reslog); + indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval) + console.log("IOT "+JSON.stringify(reslog.log.tailed_path)); + console.log("IOTindexfind "+JSON.stringify(indexfind)); + console.log("IOTuser "+JSON.stringify(global.pipelines)); + // io.in("iot").emit("message", reslog); + // io.emit("logdata", reslog); } - })(); + })() //io.in("iot").emit("message", RES); - console.error("socket POST from client"); - var RES = new Object(); - RES.error = false; - RES.error_msg = "ok"; - RES.msg = req.body[0].messsage; + console.error('socket POST from client'); + var RES = new Object(); + RES.error = false + RES.error_msg = "ok" + RES.msg = req.body[0].messsage - res.json(RES); - } -); + res.json(RES) +}); // *************************************************** // socket @@ -703,238 +650,240 @@ app.post( //} function sleep(ms) { - return new Promise((resolve) => setTimeout(resolve, ms)); + return new Promise(resolve => setTimeout(resolve, ms)); } -function getSHA256ofJSON(data, inputEncoding, encoding) { +function getSHA256ofJSON(data, inputEncoding, encoding){ if (!data) { - return ""; + return ''; } - inputEncoding = inputEncoding || "utf-8"; - encoding = encoding || "hex"; - const hash = require("crypto").createHash("md5"); + inputEncoding = inputEncoding || 'utf-8'; + encoding = encoding || 'hex'; + const hash = require("crypto").createHash('md5'); return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); } //var getkey = function getkey(key) { async function getkey(key) { - return new Promise((resolve) => { - pubClient.get(key, function (err, reply) { - if (err) { - console.log("----------error------------"); - - resolve(null); - } else { - if (reply) { - console.log("---------fount----------"); - resolve(1); - } else { - console.log("----------not fount------------"); - resolve(2); - //return 2 - } - } - }); - }); + return new Promise((resolve) => { + + + pubClient.get(key, function (err, reply) { + if(err){ + + console.log('----------error------------') + + resolve(null) + } + else { + if(reply){ + console.log('---------fount----------') + resolve(1) + }else{ + console.log('----------not fount------------') + resolve(2) + //return 2 + } + } + }) + }) } -var setkey = function setkv(key, value) { - return new Promise((resolve) => { - //pubClient.set(key,value, 'EX', expire, function(err,reply){ - pubClient.set(key, value, function (err, reply) { - if (err) { - resolve(null); - } else { - resolve(reply); - } - }); - }); -}; +var setkey = function setkv(key,value){ + return new Promise((resolve)=>{ + //pubClient.set(key,value, 'EX', expire, function(err,reply){ + pubClient.set(key,value, function(err,reply){ + if(err){ + resolve(null) + } + else{ + resolve(reply) + } + }) + }) +} + +async function iosend(data,issend,io,pubClient,user1){ + var new1 = {} + new1.tailed_path = data.tailed_path + new1.message = data.message + + var now = new Date(); + var reslog1 = {} + //reslog1.data = resob1 + reslog1.log = new1 + reslog1.date = convertDateToUTC(now) + var user = user1 + + const randomTimeInMs = Math.random() * (2000); + await sleep(randomTimeInMs); + var getres = await getkey(issend); + + if(getres == '1'){ + console.log(issend + ' ---1 '+ JSON.stringify(reslog1)) + //io.in(user).emit("logdata", reslog1); + }else if(getres == '2'){ + console.log(issend + ' ---2 '+ JSON.stringify(reslog1)) + setkey(issend,'1') + //pubClient.set(issend, '1', function(err, res) { + //}); + io.in(user).emit("logdata", reslog1); + //} + + } -async function iosend(data, issend, io, pubClient, user1) { - var new1 = {}; - new1.tailed_path = data.tailed_path; - new1.message = data.message; - - var now = new Date(); - var reslog1 = {}; - //reslog1.data = resob1 - reslog1.log = new1; - reslog1.date = convertDateToUTC(now); - var user = user1; - - const randomTimeInMs = Math.random() * 2000; - await sleep(randomTimeInMs); - var getres = await getkey(issend); - - if (getres == "1") { - console.log(issend + " ---1 " + JSON.stringify(reslog1)); - //io.in(user).emit("logdata", reslog1); - } else if (getres == "2") { - console.log(issend + " ---2 " + JSON.stringify(reslog1)); - setkey(issend, "1"); - //pubClient.set(issend, '1', function(err, res) { - //}); - io.in(user).emit("logdata", reslog1); - //} - } } function onCollection(err, collection) { - let options = { - tailable: true, - awaitdata: true, - numberOfRetries: -1, - tailableRetryInterval: 500, - }; - var cursor = collection.find({}, options).stream(); - var itemsProcessed = 0; - - var reslog = new Object(); - var now = new Date(); - cursor.on("data", function (data) { - var issendob = new Object(); - issendob.id = data._id; - issendob.message = data.message; - issendob.tailed_path = data.tailed_path; - - var issend = getSHA256ofJSON(issendob); - - console.log("++++++++" + JSON.stringify(data)); - console.log("++++++++ob" + JSON.stringify(issendob)); - console.log("++++++++sha" + JSON.stringify(issend)); - - var pathfileval = pathmodule.basename(data.tailed_path); - var arrfile = pathfileval.toString().split("-"); - var pathfile = arrfile[0]; - - var indexupdate = "yes"; - var resob = {}; - pubClient.get(pathfileval, function (err, object) { - var objecttmp = JSON.parse(object); - if (object) { - var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); - iosend(data, issend, io, pubClient, user1); - } else { - (async () => { - var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto - var resdata1 = await getpipelines(token, pathfile); - resob.pathlogfile = pathfileval; - var resob11 = {}; - var i1 = 0; - resob11.data = resdata1.data[i1].res25swarmlabname; - resob11.user25user = resdata1.data[i1].res25user.replace( - /[\n\t\r]/g, - "" - ); - resob11.res25creator = resdata1.data[i1].res25creator; - resob11.res25fileforce = resdata1.data[i1].res25fileforce; - resob11.tailed_path = pathfileval; - var resob1string1 = JSON.stringify(resob11); - await pubClient.set( - pathfileval, - resob1string1, - function (err, res) {} - ); - var user1 = resob11.user25user; - iosend(data, issend, io, pubClient, user1); - console.log(" ---no--- " + JSON.stringify(data)); - })(); //await inside yes - } + let options = { tailable: true, + awaitdata: true, + numberOfRetries: -1, + tailableRetryInterval: 500 + }; + var cursor = collection.find({},options).stream(); + var itemsProcessed = 0; + + var reslog = new Object(); + var now = new Date(); + cursor.on('data', function (data) { + var issendob = new Object(); + issendob.id = data._id + issendob.message = data.message + issendob.tailed_path = data.tailed_path + + var issend = getSHA256ofJSON(issendob) + + console.log('++++++++' + JSON.stringify(data)); + console.log('++++++++ob' + JSON.stringify(issendob)); + console.log('++++++++sha' + JSON.stringify(issend)); + + var pathfileval = pathmodule.basename(data.tailed_path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + + var indexupdate = "yes" + var resob = {}; + pubClient.get(pathfileval, function(err, object) { + var objecttmp = JSON.parse(object); + if(object){ + var user1 = objecttmp.user25user.replace(/[\n\t\r]/g,"") + iosend(data,issend,io,pubClient,user1) + }else{ + (async() => { + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata1 = await getpipelines(token,pathfile) + resob.pathlogfile = pathfileval + var resob11 = {} + var i1 = 0 + resob11.data = resdata1.data[i1].res25swarmlabname + resob11.user25user = resdata1.data[i1].res25user.replace(/[\n\t\r]/g,"") + resob11.res25creator = resdata1.data[i1].res25creator + resob11.res25fileforce = resdata1.data[i1].res25fileforce + resob11.tailed_path = pathfileval + var resob1string1 = JSON.stringify(resob11); + await pubClient.set(pathfileval, resob1string1, function(err, res) { + }); + var user1 = resob11.user25user + iosend(data,issend,io,pubClient,user1) + console.log(' ---no--- '+ JSON.stringify(data)) + })() //await inside yes + } + }); + }); - }); - setInterval(function () { - console.log("itemsProcessed", itemsProcessed); - // this method is also exposed by the Server instance - //console.log(adapter.rooms); - }, 8000); -} -var mongourl = - "mongodb://" + - CONFIG.mongo.user + - ":" + - CONFIG.mongo.password + - "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats"; -const OPTS = { - useNewUrlParser: true, - useUnifiedTopology: true, -}; -var mongooptions = { - autoReconnect: true, - keepAlive: 1, - connectTimeoutMS: 30000, - socketTimeoutMS: 0, -}; -MongoClient.connect(mongourl, OPTS, function (err, client) { - if (err) { - console.log(err); - } else { - const db = client.db("fluent"); - db.collection("logs", onCollection); - } -}); + setInterval(function () { + console.log('itemsProcessed', itemsProcessed); + // this method is also exposed by the Server instance + //console.log(adapter.rooms); + }, 8000); +} -io.on("connection", (s) => { - console.error("socket connection"); - - //s.set('transports', ['websocket']); - //s.set('pingTimeout', 30000); - //s.set('allowUpgrades', false); - //s.set('serveClient', false); - //s.set('pingInterval', 10000); - // ------------------------------ - // --- set - // ------------------------------ - var usersession = new Object(); - usersession.SOCKET = {}; - usersession.SOCKET.error = {}; - console.error("socket ..."); - s.auth = false; - - // ------------------------------ - // --- authenticate - // ------------------------------ - s.on("authenticate", function (data) { - const token = data; - console.log("invalid 1 " + token); - (async () => { - var isvalid = await checkToken(token); - if (isvalid.action == "ok") { - console.log("Authserver ok ", s.id + " - " + token); - // pubClient.set(session, resob1string, function(err, res) { - // }); - usersession.SOCKET.user = isvalid.user; - usersession.SOCKET.scope = isvalid.scope; // space delimeter - usersession.SOCKET.token = isvalid.token; - s.auth = true; - } else { - console.log("Authserver no ", s.id + " - " + token); - s.auth = false; - } - })(); - }); - setTimeout(function () { - if (!s.auth) { - console.log("Disconnecting timeout socket ", s.id); - //s.disconnect('unauthorized'); - } else { - var room = usersession.SOCKET.user; - //s.on("subscribe", function (room) { - s.join(room); - console.log("joining rooom", s.rooms); - console.log(room + " created "); - // }); + var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" + const OPTS = { + useNewUrlParser: true, + useUnifiedTopology: true + }; + var mongooptions = { + autoReconnect: true, + keepAlive: 1, + connectTimeoutMS: 30000, + socketTimeoutMS: 0 } - }, 30000); + MongoClient.connect(mongourl, OPTS, function(err, client){ + if(err){ + console.log(err); + } else { + const db = client.db('fluent'); + db.collection('logs', onCollection); + } + }); - var id = s.id; - s.on("log", (obj) => { - console.error("from client " + s.id + " obj " + obj); - }); -}); +io.on('connection', s => { + console.error('socket connection'); + +//s.set('transports', ['websocket']); +//s.set('pingTimeout', 30000); +//s.set('allowUpgrades', false); +//s.set('serveClient', false); +//s.set('pingInterval', 10000); + // ------------------------------ + // --- set + // ------------------------------ + var usersession = new Object(); + usersession.SOCKET = {}; + usersession.SOCKET.error = {}; + console.error('socket ...'); + s.auth = false; -http.listen(3000, () => console.error("listening on http://localhost:3000/")); -console.error("socket.io example"); + // ------------------------------ + // --- authenticate + // ------------------------------ + s.on('authenticate', function(data){ + const token = data + console.log('invalid 1 ' + token); + (async() => { + var isvalid = await checkToken(token); + if(isvalid.action == 'ok'){ + console.log("Authserver ok ", s.id + ' - ' + token); + // pubClient.set(session, resob1string, function(err, res) { + // }); + usersession.SOCKET.user = isvalid.user + usersession.SOCKET.scope = isvalid.scope // space delimeter + usersession.SOCKET.token = isvalid.token + s.auth = true; + }else{ + console.log("Authserver no ", s.id + ' - ' + token); + s.auth = false; + } + })() + }); + + setTimeout(function(){ + if (!s.auth) { + console.log("Disconnecting timeout socket ", s.id); + //s.disconnect('unauthorized'); + }else{ + var room = usersession.SOCKET.user + //s.on("subscribe", function (room) { + s.join(room); + console.log("joining rooom", s.rooms); + console.log(room + ' created ') + // }); + } + }, 30000); + + + + + var id = s.id + s.on('log', obj => { + console.error('from client '+ s.id + ' obj ' + obj); + }); + +}); + +http.listen(3000, () => console.error('listening on http://localhost:3000/')); +console.error('socket.io example');