"use strict"; var pathmodule = require("path"); var app = require("express")(); var http = require("http").Server(app); var https = require("https"); var CONFIG = require(pathmodule.resolve(__dirname, "runconfig.js")); const io = require("socket.io")(http, { // pingTimeout: 30000, // allowUpgrades: false, // serveClient: false, // pingInterval: 10000, // //transports: [ 'websocket', 'polling' ], // transports: [ 'polling', 'websocket' ], cors: { origin: "http://localhost:8080", methods: ["GET", "POST"], allowedHeaders: ["my-custom-header"], credentials: true, }, cookie: { name: "test", httpOnly: false, path: "/custom", }, }); const createAdapter = require("socket.io-redis"); const Redis = require("ioredis"); const pubClient = new Redis({ host: "redisserver", port: 6379, }); //const pubClient = new RedisClient({ host: 'localhost', port: 6379 }); const subClient = pubClient.duplicate(); io.adapter(createAdapter({ pubClient, subClient })); pubClient.on("connect", function () { console.log("You are now connected"); }); const MongoClient = require("mongodb").MongoClient; const { DateTime } = require("luxon"); var async = require("async"); const { check, validationResult } = require("express-validator"); const urlExistSync = require("url-exist-sync"); var express = require("express"); app.use(express.json()); const axios = require("axios"); axios.defaults.timeout = 30000; const helmet = require("helmet"); app.use(helmet()); const cors = require("cors"); const whitelist = [ "http://localhost:8080", "http://localhost:3080", "http://localhost:3081", "http://localhost:3082", ]; const corsOptions = { credentials: true, methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"], optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204 allowedHeaders: [ "Content-Type", "Authorization", "X-Requested-With", "device-remember-token", "Access-Control-Allow-Origin", "Access-Control-Allow-Headers", "Origin", "Accept", ], origin: function (origin, callback) { if (whitelist.indexOf(origin) !== -1) { callback(null, true); } else { callback(null, true); //callback(new Error('Not allowed by CORS')) } }, }; io.on("connection", (s) => { console.error("socket connection"); //s.set('transports', ['websocket']); //s.set('pingTimeout', 30000); //s.set('allowUpgrades', false); //s.set('serveClient', false); //s.set('pingInterval', 10000); // ------------------------------ // --- set // ------------------------------ var usersession = new Object(); usersession.SOCKET = {}; usersession.SOCKET.error = {}; console.error("socket ..."); s.auth = false; // ------------------------------ // --- authenticate // ------------------------------ s.on("authenticate", function (data) { const token = data; console.log("TEST LOG INSIDE ATHENTICATE SOCKET: invalid 1 " + token); (async () => { var isvalid = await checkToken(token); if (isvalid.action == "ok") { console.log("Authserver ok ", s.id + " - " + token); // pubClient.set(session, resob1string, function(err, res) { // }); usersession.SOCKET.user = isvalid.user; usersession.SOCKET.scope = isvalid.scope; // space delimeter usersession.SOCKET.token = isvalid.token; s.auth = true; } else { console.log("Authserver no ", s.id + " - " + token); s.auth = false; } })(); }); setTimeout(function () { if (!s.auth) { console.log("Disconnecting timeout socket ", s.id); //s.disconnect('unauthorized'); } else { var room = usersession.SOCKET.user; //s.on("subscribe", function (room) { s.join(room); console.log("joining rooom", s.rooms); console.log(room + " created "); // }); } }, 30000); var id = s.id; s.on("log", (obj) => { console.error("from client " + s.id + " obj " + obj); }); }); // *************************************************** // checktoken // *************************************************** async function checkToken(token) { const agent = new https.Agent({ rejectUnauthorized: false, }); const instance = axios.create({ baseURL: "https://api.swarmlab.io", withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { Accept: "application/json", "Content-Type": "multipart/form-data", Authorization: "Bearer " + token, }, }); try { var pipelines = { source: "ssologin", }; var params = { pipeline: pipelines, }; var options = { headers: { "content-type": "application/x-www-form-urlencoded", Authorization: `Bearer ${token}`, }, }; instance.defaults.timeout = 30000; const res = await instance.post("/istokenvalidsso", params, options); if (res.status == 200) { //console.log("check " +JSON.stringify(res.data)) return res.data; } else { console.log("noerror: " + res); return res.status; } } catch (err) { console.error("error: " + err); var error = new Object(); error.action = "401"; return error; } } function convertDateToUTC(date) { return new Date( date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(), date.getUTCMilliseconds() ); } // *************************************************** // get pipelines // *************************************************** async function getpipelines(token, pipelinename) { const agent = new https.Agent({ rejectUnauthorized: false, }); const instance = axios.create({ baseURL: "https://api.swarmlab.io", withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { Accept: "application/json", "Content-Type": "multipart/form-data", Authorization: "Bearer " + token, }, }); /* var params = { playbook: value } var options = { params: params, headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, }; const playbook = await api.GET('playbookCode',options); return playbook */ try { var pipelines = { querytokenFilter: CONFIG.api.token, filter: pipelinename, }; //var params = { // pipeline: pipelines // } var params = { querytokenFilter: CONFIG.api.token, filter: pipelinename, }; var options = { params: params, headers: { "content-type": "application/x-www-form-urlencoded", Authorization: `Bearer ${token}`, }, }; //https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes instance.defaults.timeout = 30000; //const res = await instance.get('/getplaygrounds',params,options); const res = await instance.get("/getplaygrounds", options); if (res.status == 200) { return res.data; } else { console.log("noerror: " + res); return await res.status; } } catch (err) { console.error("error: " + err); var error = new Object(); error.action = "401"; return await error; } } // *************************************************** // get user pipelines // *************************************************** async function getuserpipelines(token, user, swarmlabname) { var pipelinename = user; const agent = new https.Agent({ rejectUnauthorized: false, }); const instance = axios.create({ baseURL: "https://api.swarmlab.io", withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { Accept: "application/json", "Content-Type": "multipart/form-data", Authorization: "Bearer " + token, }, }); try { var pipelines = { querytokenFilter: CONFIG.api.token, filter: pipelinename, swarmlabname: swarmlabname, }; //var params = { // pipeline: pipelines // } var params = { querytokenFilter: CONFIG.api.token, filter: pipelinename, swarmlabname: swarmlabname, }; var options = { params: params, headers: { "content-type": "application/x-www-form-urlencoded", Authorization: `Bearer ${token}`, }, }; instance.defaults.timeout = 30000; const res = await instance.get("/getuserplaygrounds", options); if (res.status == 200) { return res.data; } else { console.log("noerror: " + res); return await res.status; } } catch (err) { console.error("error: " + err); var error = new Object(); error.action = "401"; error.error = err; return await error; } } global.online = "ob"; global.pipelines = []; function sendlog(reslog, pathfileval) { var usertmp = global.pipelines.find((x) => x.pathlogfile == pathfileval); //for (var key in usertmp.data){ var user = usertmp.data[0].user25user; // if(usertmp.data){ console.log("-----------------------" + JSON.stringify(usertmp)); io.in(user).emit("logdata", reslog); // } //} } function onlogfile(path) { console.log("File", path, "has been added"); var pathfileval = pathmodule.basename(path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexfind1 = global.pipelines.findIndex( (x) => x.pathlogfile == pathfileval ); console.log( "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) ); if (indexfind1 === -1) { (async () => { console.log( "file2222222222222222222222222222222222222 " + JSON.stringify(pathfileval) ); var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata = await getpipelines(token, pathfile); //resdata.data.pathlogfile = 'test' var resob = {}; resob.pathlogfile = pathfileval; var resobarray = []; for (let i in resdata.data) { var resob1 = {}; resob1.data = resdata.data[i].res25swarmlabname; resob1.user25user = resdata.data[i].res25user; resob1.res25creator = resdata.data[i].res25creator; resob1.res25fileforce = resdata.data[i].res25fileforce; resobarray.push(resob1); } resob.data = resobarray; var indexfind = global.pipelines.findIndex( (x) => x.pathlogfile == pathfileval ); indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists " + pathfileval); })(); } } // *************************************************** // rest get // *************************************************** app.get( "/get_log", [check("token").isLength({ min: 40 })], cors(corsOptions), (req, res, next) => { (async () => { var RES = new Object(); RES.token = req.query["token"]; RES.start = req.query["start"]; RES.end = req.query["end"]; RES.swarmlabname = req.query["swarmlabname"]; RES.ok = "ok"; /* * * validate * */ var isvalid = await checkToken(RES.token); if (isvalid.action == "ok") { console.log("Authserver ok " + RES.token); RES.error = "ok"; } else { console.log("Authserver no " + RES.token); RES.error = "no"; } if (RES.error == "ok") { var resdata = await getuserpipelines( RES.token, isvalid.user, RES.swarmlabname ); var mongourl = "mongodb://" + CONFIG.mongo.user + ":" + CONFIG.mongo.password + "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats"; const OPTS = { useNewUrlParser: true, useUnifiedTopology: true, }; MongoClient.connect(mongourl, OPTS, function (err, client) { if (err) { console.log(err); } else { const db = client.db("swarmlabplaygroundstats"); //usersession.SOCKET.user = isvalid.user console.log(JSON.stringify("mongo ----------------connected")); console.log("-----test------- " + JSON.stringify(RES)); if ( typeof RES.start !== "undefined" && typeof RES.end !== "undefined" ) { if (DateTime.fromISO(RES.start).isValid) { var datestart = DateTime.fromISO(RES.start); var dateend = DateTime.fromISO(RES.end); var search_term = { $and: [ { time: { $gte: datestart, }, }, { time: { $lt: dateend, }, }, ], }; } else { RES.ok = "no"; } } else if (typeof RES.end !== "undefined") { var dateend = DateTime.fromISO(RES.end); if (DateTime.fromISO(RES.end).isValid) { var search_term = { $and: [ { time: { $lt: dateend, }, }, ], }; } else { RES.ok = "no"; } } else if (typeof RES.start !== "undefined") { var datestart = DateTime.fromISO(RES.start); if (DateTime.fromISO(RES.start).isValid) { var search_term = { $and: [ { time: { $gte: datestart, }, }, ], }; } else { RES.ok = "no"; } } if (RES.ok == "ok") { //var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }' //var search_term = {"time" : {$lte : datenow}} var resdataarray = []; var resraw = {}; var reslab = ""; var datestart1 = DateTime.fromISO(RES.start); console.log("-----now1------- " + JSON.stringify(search_term)); console.log("-----now2------- " + JSON.stringify(datestart1)); console.log("-----now3------- " + JSON.stringify(datestart)); db.collection("logs") .find(search_term) .toArray() //db.collection('logs').find({"time" : {$gt : datestart}}).toArray() .then((item) => { console.log("item " + JSON.stringify(item)); for (let i in item) { reslab = item[i].tailed_path; var segment_array = reslab.split("/"); var last_segment = segment_array.pop(); var fieldstmp = last_segment.split("-"); var nameofswarmlab = fieldstmp[0]; var regexlog = new RegExp(nameofswarmlab); for (let ii in resdata.data) { if (regexlog.test(resdata.data[ii].res25swarmlabname)) { resdataarray.push(item[i]); RES.found = item[i]; } } } RES.error_msg = "ok"; RES.data = resdataarray; //RES.dataserver = resdataarray //RES.dataservertmp = resdata res.json(RES); }) .catch((err) => { console.error(err); RES.error_msg = err; res.json(RES); }); } else { // RES.ok RES.error_msg = "no date"; res.json(RES); } } // error mongo connect }); // mongo connect } else { // token error RES.data = "no"; RES.error_msg = "token err"; res.json(RES); } })(); } ); app.get( "/run", [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], cors(corsOptions), (req, res, next) => { (async () => { var mongourl = "mongodb://" + CONFIG.mongo.user + ":" + CONFIG.mongo.password + "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats"; const OPTS = { useNewUrlParser: true, useUnifiedTopology: true, }; MongoClient.connect(mongourl, OPTS, function (err, client) { if (err) { console.log(err); } else { const db = client.db("swarmlabplaygroundstats"); //db.collection('log', onCollection); console.log(JSON.stringify("mongo connected")); var stream = db .collection("logs") .find( {}, { tailable: true, awaitdata: true, /* other options */ } ) .stream(); stream.on("data", function (doc) { console.log(JSON.stringify(doc)); //socket.write(JSON.stringify({'action': 'log','param': doc.log})); }); } }); var RES = new Object(); RES.code = req.query["filter"]; RES.token = req.query["filter"]; var isvalid = await checkToken(RES.token); if (isvalid.action == "ok") { console.log("Authserver ok " + RES.token); } else { console.log("Authserver no " + RES.token); } RES.error = false; RES.error_msg = "ok"; res.json(RES); })(); } ); // *************************************************** // rest post // *************************************************** app.post( "/run", [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], cors(corsOptions), (req, res, next) => { (async () => { //console.log(JSON.stringify(req.headers)); //console.log(JSON.stringify(req.body)); //console.log("mongo "+JSON.stringify(req.body)); //console.log("LOG "+JSON.stringify(req.body[0].message)); //console.log("PATH "+JSON.stringify(req.body[0].tailed_path)); for (var i = 0; i < req.body.length; i++) { //var getpath = await onlogfile(req.body[i].tailed_path) var path = req.body[i].tailed_path; console.log("File", path, "has been added"); var pathfileval = pathmodule.basename(path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexfind1 = global.pipelines.findIndex( (x) => x.pathlogfile == pathfileval ); console.log( "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) ); if (indexfind1 === -1) { (async () => { console.log( "file2222222222222222222222222222222222222 " + JSON.stringify(pathfileval) ); var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata = await getpipelines(token, pathfile); //resdata.data.pathlogfile = 'test' var resob = {}; resob.pathlogfile = pathfileval; var resobarray = []; for (let i in resdata.data) { var resob1 = {}; resob1.data = resdata.data[i].res25swarmlabname; resob1.user25user = resdata.data[i].res25user; resob1.res25creator = resdata.data[i].res25creator; resob1.res25fileforce = resdata.data[i].res25fileforce; resobarray.push(resob1); } resob.data = resobarray; //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); var indexfind = global.pipelines.findIndex( (x) => x.pathlogfile == pathfileval ); //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists " + pathfileval); //console.log('info', JSON.stringify(resdata)); //console.log('info------------- ', JSON.stringify(global.pipelines)); })(); } // var obj = req.body[i]; //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname); //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists") var now = new Date(); var reslog = new Object(); reslog.log = obj; reslog.date = convertDateToUTC(now); console.log(reslog); var pathfileval = pathmodule.basename(reslog.log.tailed_path); var indexfind = global.pipelines.findIndex( (x) => x.pathlogfile == pathfileval ); //indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists") indexfind === -1 ? console.log("object not found") : sendlog(reslog, pathfileval); console.log("IOT " + JSON.stringify(reslog.log.tailed_path)); console.log("IOTindexfind " + JSON.stringify(indexfind)); console.log("IOTuser " + JSON.stringify(global.pipelines)); // io.in("iot").emit("message", reslog); // io.emit("logdata", reslog); } })(); //io.in("iot").emit("message", RES); console.error("socket POST from client"); var RES = new Object(); RES.error = false; RES.error_msg = "ok"; RES.msg = req.body[0].messsage; res.json(RES); } ); // *************************************************** // socket // *************************************************** //function getSHA256ofJSON(input){ // return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex"); //} function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } function getSHA256ofJSON(data, inputEncoding, encoding) { if (!data) { return ""; } inputEncoding = inputEncoding || "utf-8"; encoding = encoding || "hex"; const hash = require("crypto").createHash("md5"); return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); } //var getkey = function getkey(key) { async function getkey(key) { return new Promise((resolve) => { pubClient.get(key, function (err, reply) { if (err) { console.log("----------error------------"); resolve(null); } else { if (reply) { console.log("---------fount----------"); resolve(1); } else { console.log("----------not fount------------"); resolve(2); //return 2 } } }); }); } var setkey = function setkv(key, value) { return new Promise((resolve) => { //pubClient.set(key,value, 'EX', expire, function(err,reply){ pubClient.set(key, value, function (err, reply) { if (err) { resolve(null); } else { resolve(reply); } }); }); }; async function iosend(data, issend, io, pubClient, user1) { var new1 = {}; new1.tailed_path = data.tailed_path; new1.message = data.message; var now = new Date(); var reslog1 = {}; //reslog1.data = resob1 reslog1.log = new1; reslog1.date = convertDateToUTC(now); var user = user1; const randomTimeInMs = Math.random() * 2000; await sleep(randomTimeInMs); var getres = await getkey(issend); if (getres == "1") { console.log(issend + " ---1 " + JSON.stringify(reslog1)); //io.in(user).emit("logdata", reslog1); } else if (getres == "2") { console.log(issend + " ---2 " + JSON.stringify(reslog1)); setkey(issend, "1"); //pubClient.set(issend, '1', function(err, res) { //}); io.in(user).emit("logdata", reslog1); //} } } function onCollection(err, collection) { let options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 500, }; var cursor = collection.find({}, options).stream(); var itemsProcessed = 0; var reslog = new Object(); var now = new Date(); cursor.on("data", function (data) { var issendob = new Object(); issendob.id = data._id; issendob.message = data.message; issendob.tailed_path = data.tailed_path; var issend = getSHA256ofJSON(issendob); console.log("++++++++" + JSON.stringify(data)); console.log("++++++++ob" + JSON.stringify(issendob)); console.log("++++++++sha" + JSON.stringify(issend)); var pathfileval = pathmodule.basename(data.tailed_path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexupdate = "yes"; var resob = {}; pubClient.get(pathfileval, function (err, object) { var objecttmp = JSON.parse(object); if (object) { var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); iosend(data, issend, io, pubClient, user1); } else { (async () => { var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata1 = await getpipelines(token, pathfile); resob.pathlogfile = pathfileval; var resob11 = {}; var i1 = 0; resob11.data = resdata1.data[i1].res25swarmlabname; resob11.user25user = resdata1.data[i1].res25user.replace( /[\n\t\r]/g, "" ); resob11.res25creator = resdata1.data[i1].res25creator; resob11.res25fileforce = resdata1.data[i1].res25fileforce; resob11.tailed_path = pathfileval; var resob1string1 = JSON.stringify(resob11); await pubClient.set( pathfileval, resob1string1, function (err, res) {} ); var user1 = resob11.user25user; iosend(data, issend, io, pubClient, user1); console.log(" ---no--- " + JSON.stringify(data)); })(); //await inside yes } }); }); setInterval(function () { console.log("itemsProcessed", itemsProcessed); // this method is also exposed by the Server instance //console.log(adapter.rooms); }, 8000); } /// edw exw error me timeout /// // var mongourl = // "mongodb://" + // CONFIG.mongo.user + // ":" + // CONFIG.mongo.password + // "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats"; // const OPTS = { // useNewUrlParser: true, // useUnifiedTopology: true, // }; // var mongooptions = { // autoReconnect: true, // keepAlive: 1, // connectTimeoutMS: 30000, // socketTimeoutMS: 0, // }; // MongoClient.connect(mongourl, OPTS, function (err, client) { // if (err) { // console.log(err); // } else { // const db = client.db("swarmlabplaygroundstats"); // db.collection("logs", onCollection); // } // }); http.listen(3000, () => console.error("listening on http://localhost:3000/")); console.error("socket.io example");