"use strict" var pathmodule = require('path'); var app = require('express')(); var http = require('http').Server(app); var https = require('https'); var io = require('socket.io')(http); //import { createAdapter } from 'socket.io-redis'; const createAdapter = require('socket.io-redis'); //import { RedisClient } from 'redis'; const RedisClient = require("redis"); const pubClient = RedisClient.createClient({ host: 'redisserver', port: 6379, }); //const pubClient = new RedisClient({ host: 'localhost', port: 6379 }); const subClient = pubClient.duplicate(); io.adapter(createAdapter({ pubClient, subClient })); pubClient.on("connect", function() { console.log("You are now connected"); }); const MongoClient = require('mongodb').MongoClient; //var chokidar = require("chokidar"); //var logpath = "/var/lab/playground-serverlogs"; //var watcher = chokidar.watch(logpath, { // ignored: /[\/\\]\./, // awaitWriteFinish: true, // persistent: true // }); var async = require("async"); const { check, validationResult } = require('express-validator'); const urlExistSync = require("url-exist-sync"); var express = require('express'); app.use(express.json()); const axios = require('axios'); axios.defaults.timeout = 30000 const helmet = require('helmet'); app.use(helmet()); const cors = require('cors') const whitelist = [ 'http://localhost:8080', 'http://localhost:3080', 'http://localhost:3081', 'http://localhost:3082' ] const corsOptions = { credentials: true, methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'], optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204 allowedHeaders: [ 'Content-Type', 'Authorization', 'X-Requested-With', 'device-remember-token', 'Access-Control-Allow-Origin', 'Access-Control-Allow-Headers', 'Origin', 'Accept' ], origin: function(origin, callback) { if (whitelist.indexOf(origin) !== -1) { callback(null, true) } else { callback(null, true) //callback(new Error('Not allowed by CORS')) } } } // *************************************************** // checktoken // *************************************************** async function checkToken(token) { const agent = new https.Agent({ rejectUnauthorized: false, }); const instance = axios.create({ baseURL: 'https://api.swarmlab.io', withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { 'Accept': 'application/json', 'Content-Type': 'multipart/form-data', 'Authorization': 'Bearer '+token } }) try { var pipelines = { "source":'ssologin' } var params = { pipeline: pipelines } var options = { headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, }; instance.defaults.timeout = 30000; const res = await instance.post('/istokenvalidsso',params,options); if(res.status == 200){ //console.log("check " +JSON.stringify(res.data)) return res.data }else{ console.log("noerror: " + res) return res.status } } catch (err) { console.error("error: "+err); var error = new Object(); error.action = '401' return error } } function convertDateToUTC(date) { return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds()); } // *************************************************** // get pipelines // *************************************************** async function getpipelines(token,pipelinename) { const agent = new https.Agent({ rejectUnauthorized: false, }); const instance = axios.create({ baseURL: 'https://api.swarmlab.io', withCredentials: true, rejectUnauthorized: false, crossdomain: true, httpsAgent: agent, headers: { 'Accept': 'application/json', 'Content-Type': 'multipart/form-data', 'Authorization': 'Bearer '+token } }) /* var params = { playbook: value } var options = { params: params, headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, }; const playbook = await api.GET('playbookCode',options); return playbook */ try { var pipelines = { "querytokenFilter":'uWr4FKRqrmpCRkJ9WLuI0DNuDWOGTkfcSzyZkJirZvJwwFDffLWrraqzzSPLeuQqL3TF9', "filter":pipelinename } //var params = { // pipeline: pipelines // } var params = { querytokenFilter:'uWr4FKRqrmpCRkJ9WLuI0DNuDWOGTkfcSzyZkJirZvJwwFDffLWrraqzzSPLeuQqL3TF9', filter:pipelinename } var options = { params: params, headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, }; //https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes instance.defaults.timeout = 30000; //const res = await instance.get('/getplaygrounds',params,options); const res = await instance.get('/getplaygrounds',options); if(res.status == 200){ return res.data }else{ console.log("noerror: " + res) return res.status } } catch (err) { console.error("error: "+err); var error = new Object(); error.action = '401' return error } } global.online='ob'; global.pipelines=[]; function sendlog(reslog,pathfileval){ var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); //for (var key in usertmp.data){ var user = usertmp.data[0].user25user; // if(usertmp.data){ console.log('-----------------------' + JSON.stringify(usertmp)); io.in(user).emit("logdata", reslog); // } //} } function onlogfile(path){ console.log('File', path, 'has been added'); var pathfileval = pathmodule.basename(path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) if (indexfind1 === -1 ){ (async() => { console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata = await getpipelines(token,pathfile) //resdata.data.pathlogfile = 'test' var resob = {} resob.pathlogfile = pathfileval var resobarray = [] for (let i in resdata.data) { var resob1 = {} resob1.data = resdata.data[i].res25swarmlabname resob1.user25user = resdata.data[i].res25user resob1.res25creator = resdata.data[i].res25creator resob1.res25fileforce = resdata.data[i].res25fileforce resobarray.push(resob1) } resob.data = resobarray var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) })() } } // *************************************************** // rest get // *************************************************** app.get('/run', [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], cors(corsOptions), (req, res, next) => { (async() => { var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1" const OPTS = { useNewUrlParser: true, useUnifiedTopology: true }; MongoClient.connect(mongourl, OPTS, function(err, client){ if(err){ console.log(err); } else { const db = client.db('fluent'); //db.collection('log', onCollection); console.log(JSON.stringify('mongo connected')) var stream = db.collection('logs').find({}, { tailable: true, awaitdata: true /* other options */ }).stream(); stream.on('data', function (doc) { console.log(JSON.stringify(doc)) //socket.write(JSON.stringify({'action': 'log','param': doc.log})); }); } }); var RES = new Object(); RES.code = req.query["filter"] RES.token = req.query["filter"] var isvalid = await checkToken(RES.token); if(isvalid.action == 'ok'){ console.log("Authserver ok " + RES.token); }else{ console.log("Authserver no " + RES.token); } RES.error = false RES.error_msg = "ok" res.json(RES) })() }); // *************************************************** // rest post // *************************************************** app.post('/run', [ //check('access_token').isLength({ min: 40 }), //check('llo').isBase64() ], cors(corsOptions), (req, res, next) => { (async() => { //console.log(JSON.stringify(req.headers)); //console.log(JSON.stringify(req.body)); //console.log("mongo "+JSON.stringify(req.body)); //console.log("LOG "+JSON.stringify(req.body[0].message)); //console.log("PATH "+JSON.stringify(req.body[0].tailed_path)); for (var i = 0; i < req.body.length; i++){ //var getpath = await onlogfile(req.body[i].tailed_path) var path = req.body[i].tailed_path console.log('File', path, 'has been added'); var pathfileval = pathmodule.basename(path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) if (indexfind1 === -1 ){ (async() => { console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata = await getpipelines(token,pathfile) //resdata.data.pathlogfile = 'test' var resob = {} resob.pathlogfile = pathfileval var resobarray = [] for (let i in resdata.data) { var resob1 = {} resob1.data = resdata.data[i].res25swarmlabname resob1.user25user = resdata.data[i].res25user resob1.res25creator = resdata.data[i].res25creator resob1.res25fileforce = resdata.data[i].res25fileforce resobarray.push(resob1) } resob.data = resobarray //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) //console.log('info', JSON.stringify(resdata)); //console.log('info------------- ', JSON.stringify(global.pipelines)); })() } // var obj = req.body[i]; //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname); //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists") var now = new Date(); var reslog = new Object(); reslog.log = obj reslog.date = convertDateToUTC(now) console.log(reslog); var pathfileval = pathmodule.basename(reslog.log.tailed_path); var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); //indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists") indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval) console.log("IOT "+JSON.stringify(reslog.log.tailed_path)); console.log("IOTindexfind "+JSON.stringify(indexfind)); console.log("IOTuser "+JSON.stringify(global.pipelines)); // io.in("iot").emit("message", reslog); // io.emit("logdata", reslog); } })() //io.in("iot").emit("message", RES); console.error('socket POST from client'); var RES = new Object(); RES.error = false RES.error_msg = "ok" RES.msg = req.body[0].messsage res.json(RES) }); // *************************************************** // socket // *************************************************** io.origins('*:*') // for latest version function onCollection(err, collection) { let options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 500 }; var cursor = collection.find({},options).stream(); var itemsProcessed = 0; var reslog = new Object(); var now = new Date(); cursor.on('data', function (data) { var pathfileval = pathmodule.basename(data.tailed_path); var arrfile = pathfileval.toString().split("-"); var pathfile = arrfile[0]; var indexupdate = true //var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); pubClient.hgetall(pathfileval, function(err, object) { if(object){ indexupdate = false }else{ console.log('redis '+JSON.stringify(object)); } }); var resob = {} if (indexupdate ){ (async() => { console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto var resdata = await getpipelines(token,pathfile) console.log('file2222222222222222222222222222222222222------------------------------ ' + JSON.stringify(resdata.data)) resob.pathlogfile = pathfileval var resobarray = [] //for (let i in resdata.data) { var resob1 = {} var i = 0 resob1.data = resdata.data[i].res25swarmlabname resob1.user25user = resdata.data[i].res25user resob1.res25creator = resdata.data[i].res25creator resob1.res25fileforce = resdata.data[i].res25fileforce resob1.tailed_path = pathfileval //resobarray.push(resob1) var resob1string = JSON.stringify(resob1); pubClient.hmset(pathfileval,resob1string) pubClient.hgetall(pathfileval, function(err, object) { console.log('redis>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(object)); }); //} resob.data = resobarray //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); //var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) reslog.log = data reslog.date = convertDateToUTC(now) console.log('data ' + JSON.stringify(reslog)); //var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); //var user = usertmp.data[0].user25user; var user = "anagnostopoulos@uniwa.gr" console.log('datauser ' + JSON.stringify(user)); io.in(user).emit("logdata", reslog); itemsProcessed++; })() }else{ reslog.log = data reslog.date = convertDateToUTC(now) console.log('dataelse ' + JSON.stringify(reslog)); //var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); //var user = usertmp.data[0].user25user; ///console.log('datauser ' + JSON.stringify(user)); var user = "anagnostopoulos@uniwa.gr" io.in(user).emit("logdata", reslog); itemsProcessed++; } }); setInterval(function () { console.log('itemsProcessed', itemsProcessed); }, 1000); } var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1" const OPTS = { useNewUrlParser: true, useUnifiedTopology: true }; //var mongolab_uri = "mongodb://:@:,:/?replicaSet="; var mongooptions = { autoReconnect: true, keepAlive: 1, connectTimeoutMS: 30000, socketTimeoutMS: 0 } MongoClient.connect(mongourl, OPTS, function(err, client){ if(err){ console.log(err); } else { const db = client.db('fluent'); db.collection('logs', onCollection); } }); io.on('connection', s => { console.error('socket connection'); // ------------------------------ // --- set // ------------------------------ var usersession = new Object(); usersession.SOCKET = {}; usersession.SOCKET.error = {}; console.error('socket ...'); s.auth = false; // ------------------------------ // --- authenticate // ------------------------------ s.on('authenticate', function(data){ const token = data console.log('invalid 1 ' + token); (async() => { var isvalid = await checkToken(token); if(isvalid.action == 'ok'){ console.log("Authserver ok ", s.id + ' - ' + token); usersession.SOCKET.user = isvalid.user usersession.SOCKET.scope = isvalid.scope // space delimeter usersession.SOCKET.token = isvalid.token s.auth = true; }else{ console.log("Authserver no ", s.id + ' - ' + token); s.auth = false; } })() }); setTimeout(function(){ if (!s.auth) { console.log("Disconnecting timeout socket ", s.id); s.disconnect('unauthorized'); }else{ var room = usersession.SOCKET.user //s.on("subscribe", function (room) { // console.log("joining room", room); s.join(room); // }); } }, 30000); var id = s.id s.on('log', obj => { console.error('from client '+ s.id + ' obj ' + obj); }); }); http.listen(3000, () => console.error('listening on http://localhost:3000/')); console.error('socket.io example');