From 4c5ef71b24e63223d521391ccd8f719ffc4fa3f8 Mon Sep 17 00:00:00 2001 From: zeus Date: Wed, 25 Nov 2020 19:45:27 +0200 Subject: [PATCH] redis --- swarmlab-app/src/run/app.backup3.js | 48 +++ swarmlab-app/src/run/app.js | 558 +++++++++++++++++++++++++++- 2 files changed, 592 insertions(+), 14 deletions(-) create mode 100755 swarmlab-app/src/run/app.backup3.js diff --git a/swarmlab-app/src/run/app.backup3.js b/swarmlab-app/src/run/app.backup3.js new file mode 100755 index 0000000..54ab921 --- /dev/null +++ b/swarmlab-app/src/run/app.backup3.js @@ -0,0 +1,48 @@ +"use strict" + +var pathmodule = require('path'); +var app = require('express')(); +var http = require('http').Server(app); +var https = require('https'); +var io = require('socket.io')(http); + +//import { createAdapter } from 'socket.io-redis'; +const createAdapter = require('socket.io-redis'); +//import { RedisClient } from 'redis'; +const RedisClient = require("redis"); +const pubClient = RedisClient.createClient({ + host: 'redisserver', + port: 6379, + }); + +//const pubClient = new RedisClient({ host: 'localhost', port: 6379 }); +const subClient = pubClient.duplicate(); + +io.adapter(createAdapter({ pubClient, subClient })); + +pubClient.on("connect", function() { + console.log("You are now connected"); +}); + + setInterval(function () { +var resob1 = {} +resob1.data = '1' +resob1.user25user = 'user' + +var resob1string = JSON.stringify(resob1); + console.log('-------------------- '+JSON.stringify(resob1string)); + +var resob1string = 'test'; +pubClient.hmset('ekjgpiegwerpowfmfsdfsdgsk', resob1string, function(err, res) { + console.log('>>>>>>>>>eroor>>>>>>>>>>>>>>>>>>> '+JSON.stringify(err)); + console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(res)); +}); + +pubClient.hgetall('ekjgpiegwerpowfmfsdfsdgsk', function(err, object) { + console.log('<<<<<<<<< console.error('listening on http://localhost:3000/')); +console.error('socket.io example'); diff --git a/swarmlab-app/src/run/app.js b/swarmlab-app/src/run/app.js index 54ab921..a88961b 100755 --- a/swarmlab-app/src/run/app.js +++ b/swarmlab-app/src/run/app.js @@ -24,25 +24,555 @@ pubClient.on("connect", function() { console.log("You are now connected"); }); - setInterval(function () { -var resob1 = {} -resob1.data = '1' -resob1.user25user = 'user' +const MongoClient = require('mongodb').MongoClient; + +//var chokidar = require("chokidar"); +//var logpath = "/var/lab/playground-serverlogs"; +//var watcher = chokidar.watch(logpath, { +// ignored: /[\/\\]\./, +// awaitWriteFinish: true, +// persistent: true +// }); + + + var async = require("async"); +const { check, validationResult } = require('express-validator'); +const urlExistSync = require("url-exist-sync"); + +var express = require('express'); +app.use(express.json()); + +const axios = require('axios'); +axios.defaults.timeout = 30000 + + const helmet = require('helmet'); +app.use(helmet()); + +const cors = require('cors') +const whitelist = [ + 'http://localhost:8080', + 'http://localhost:3080', + 'http://localhost:3081', + 'http://localhost:3082' + ] +const corsOptions = { + credentials: true, + methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'], + optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204 + allowedHeaders: [ + 'Content-Type', + 'Authorization', + 'X-Requested-With', + 'device-remember-token', + 'Access-Control-Allow-Origin', + 'Access-Control-Allow-Headers', + 'Origin', + 'Accept' + ], + origin: function(origin, callback) { + if (whitelist.indexOf(origin) !== -1) { + callback(null, true) + } else { + callback(null, true) + //callback(new Error('Not allowed by CORS')) + } + } +} + + +// *************************************************** +// checktoken +// *************************************************** -var resob1string = JSON.stringify(resob1); - console.log('-------------------- '+JSON.stringify(resob1string)); +async function checkToken(token) { + const agent = new https.Agent({ + rejectUnauthorized: false, + }); + const instance = axios.create({ + baseURL: 'https://api.swarmlab.io', + withCredentials: true, + rejectUnauthorized: false, + crossdomain: true, + httpsAgent: agent, + headers: { + 'Accept': 'application/json', + 'Content-Type': 'multipart/form-data', + 'Authorization': 'Bearer '+token + } + }) + try { + var pipelines = { + "source":'ssologin' + } + var params = { + pipeline: pipelines + } + + var options = { + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; + + instance.defaults.timeout = 30000; + const res = await instance.post('/istokenvalidsso',params,options); + if(res.status == 200){ + //console.log("check " +JSON.stringify(res.data)) + return res.data + }else{ + console.log("noerror: " + res) + return res.status + + } + } + catch (err) { + console.error("error: "+err); + var error = new Object(); + error.action = '401' + return error + } +} + + +function convertDateToUTC(date) { +return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds()); +} + +// *************************************************** +// get pipelines +// *************************************************** + +async function getpipelines(token,pipelinename) { + const agent = new https.Agent({ + rejectUnauthorized: false, + }); + const instance = axios.create({ + baseURL: 'https://api.swarmlab.io', + withCredentials: true, + rejectUnauthorized: false, + crossdomain: true, + httpsAgent: agent, + headers: { + 'Accept': 'application/json', + 'Content-Type': 'multipart/form-data', + 'Authorization': 'Bearer '+token + } + }) + /* + var params = { + playbook: value + } + var options = { + params: params, + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; -var resob1string = 'test'; -pubClient.hmset('ekjgpiegwerpowfmfsdfsdgsk', resob1string, function(err, res) { - console.log('>>>>>>>>>eroor>>>>>>>>>>>>>>>>>>> '+JSON.stringify(err)); - console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(res)); + const playbook = await api.GET('playbookCode',options); + return playbook +*/ + try { + + var pipelines = { + "querytokenFilter":'uWr4FKRqrmpCRkJ9WLuI0DNuDWOGTkfcSzyZkJirZvJwwFDffLWrraqzzSPLeuQqL3TF9', + "filter":pipelinename + } + //var params = { + // pipeline: pipelines + // } + var params = { + querytokenFilter:'uWr4FKRqrmpCRkJ9WLuI0DNuDWOGTkfcSzyZkJirZvJwwFDffLWrraqzzSPLeuQqL3TF9', + filter:pipelinename + } + + var options = { + params: params, + headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, + }; + + //https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes + instance.defaults.timeout = 30000; + //const res = await instance.get('/getplaygrounds',params,options); + const res = await instance.get('/getplaygrounds',options); + if(res.status == 200){ + return res.data + }else{ + console.log("noerror: " + res) + return res.status + + } + } + catch (err) { + console.error("error: "+err); + var error = new Object(); + error.action = '401' + return error + } +} + + +global.online='ob'; +global.pipelines=[]; + + function sendlog(reslog,pathfileval){ + var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); + //for (var key in usertmp.data){ + var user = usertmp.data[0].user25user; + // if(usertmp.data){ + console.log('-----------------------' + JSON.stringify(usertmp)); + io.in(user).emit("logdata", reslog); + // } + //} + } + function onlogfile(path){ + + console.log('File', path, 'has been added'); + var pathfileval = pathmodule.basename(path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) + if (indexfind1 === -1 ){ + (async() => { + console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata = await getpipelines(token,pathfile) + //resdata.data.pathlogfile = 'test' + var resob = {} + resob.pathlogfile = pathfileval + var resobarray = [] + for (let i in resdata.data) { + var resob1 = {} + resob1.data = resdata.data[i].res25swarmlabname + resob1.user25user = resdata.data[i].res25user + resob1.res25creator = resdata.data[i].res25creator + resob1.res25fileforce = resdata.data[i].res25fileforce + resobarray.push(resob1) + } + resob.data = resobarray + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) + })() + } + +} + +// *************************************************** +// rest get +// *************************************************** + +app.get('/run', [ + //check('access_token').isLength({ min: 40 }), + //check('llo').isBase64() + ], +cors(corsOptions), (req, res, next) => { + + (async() => { + +var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1" +const OPTS = { + useNewUrlParser: true, + useUnifiedTopology: true +}; +MongoClient.connect(mongourl, OPTS, function(err, client){ + if(err){ + console.log(err); + } else { + const db = client.db('fluent'); + //db.collection('log', onCollection); + console.log(JSON.stringify('mongo connected')) + var stream = db.collection('logs').find({}, { + tailable: true, + awaitdata: true + /* other options */ +}).stream(); + +stream.on('data', function (doc) { + console.log(JSON.stringify(doc)) + //socket.write(JSON.stringify({'action': 'log','param': doc.log})); }); + } +}); + var RES = new Object(); + RES.code = req.query["filter"] + RES.token = req.query["filter"] + var isvalid = await checkToken(RES.token); + if(isvalid.action == 'ok'){ + console.log("Authserver ok " + RES.token); + }else{ + console.log("Authserver no " + RES.token); + } + RES.error = false + RES.error_msg = "ok" + res.json(RES) + })() -pubClient.hgetall('ekjgpiegwerpowfmfsdfsdgsk', function(err, object) { - console.log('<<<<<<<<< { + + + (async() => { + + //console.log(JSON.stringify(req.headers)); + //console.log(JSON.stringify(req.body)); + //console.log("mongo "+JSON.stringify(req.body)); + //console.log("LOG "+JSON.stringify(req.body[0].message)); + //console.log("PATH "+JSON.stringify(req.body[0].tailed_path)); + for (var i = 0; i < req.body.length; i++){ + //var getpath = await onlogfile(req.body[i].tailed_path) + + var path = req.body[i].tailed_path + + console.log('File', path, 'has been added'); + var pathfileval = pathmodule.basename(path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) + if (indexfind1 === -1 ){ + (async() => { + console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata = await getpipelines(token,pathfile) + //resdata.data.pathlogfile = 'test' + var resob = {} + resob.pathlogfile = pathfileval + var resobarray = [] + for (let i in resdata.data) { + var resob1 = {} + resob1.data = resdata.data[i].res25swarmlabname + resob1.user25user = resdata.data[i].res25user + resob1.res25creator = resdata.data[i].res25creator + resob1.res25fileforce = resdata.data[i].res25fileforce + resobarray.push(resob1) + } + resob.data = resobarray + //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + + //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") + indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) + + //console.log('info', JSON.stringify(resdata)); + //console.log('info------------- ', JSON.stringify(global.pipelines)); + })() + } + // + var obj = req.body[i]; + + //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname); + //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists") + var now = new Date(); + + var reslog = new Object(); + reslog.log = obj + + reslog.date = convertDateToUTC(now) + console.log(reslog); + var pathfileval = pathmodule.basename(reslog.log.tailed_path); + var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + //indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists") + indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval) + console.log("IOT "+JSON.stringify(reslog.log.tailed_path)); + console.log("IOTindexfind "+JSON.stringify(indexfind)); + console.log("IOTuser "+JSON.stringify(global.pipelines)); + // io.in("iot").emit("message", reslog); + // io.emit("logdata", reslog); + } + })() + + //io.in("iot").emit("message", RES); + + console.error('socket POST from client'); + var RES = new Object(); + RES.error = false + RES.error_msg = "ok" + RES.msg = req.body[0].messsage + + res.json(RES) +}); + +// *************************************************** +// socket +// *************************************************** + +io.origins('*:*') // for latest version + + +function onCollection(err, collection) { + let options = { tailable: true, + awaitdata: true, + numberOfRetries: -1, + tailableRetryInterval: 500 + }; + var cursor = collection.find({},options).stream(); + var itemsProcessed = 0; + + var reslog = new Object(); + var now = new Date(); + cursor.on('data', function (data) { + + var pathfileval = pathmodule.basename(data.tailed_path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + + var indexupdate = true + var resob = {} + //var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + pubClient.get(pathfileval, function(err, object) { + if(object){ + indexupdate = false + }else{ + console.log('redis '+JSON.stringify(object)); + } + }); + if (indexupdate ){ + (async() => { + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata = await getpipelines(token,pathfile) + console.log('------------------------------ ' + JSON.stringify(resdata.data)) + resob.pathlogfile = pathfileval + var resobarray = [] + //for (let i in resdata.data) { + var resob1 = {} + var i = 0 + resob1.data = resdata.data[i].res25swarmlabname + resob1.user25user = resdata.data[i].res25user + resob1.res25creator = resdata.data[i].res25creator + resob1.res25fileforce = resdata.data[i].res25fileforce + resob1.tailed_path = pathfileval + resobarray.push(resob1) + var resob1string = JSON.stringify(resob1); + console.log('+++++++++++++++++' + resob1string + '<<<<<<<<<<<<<<<<<<<<<<' + pathfileval); + //pubClient.hmset(pathfileval,resob1string) + pubClient.set(pathfileval, resob1string, function(err, res) { + + }); + await pubClient.get(pathfileval, function(err, object) { + console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(object)); + }); + //} + resob.data = resobarray + //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname); + //var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); + + //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists") + //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) + + reslog.log = data + reslog.date = convertDateToUTC(now) + //console.log('data ' + JSON.stringify(reslog)); + //var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); + //var user = usertmp.data[0].user25user; + var user = "anagnostopoulos@uniwa.gr" + console.log('datauser ' + JSON.stringify(user)); + io.in(user).emit("logdata", reslog); + itemsProcessed++; + + })() + }else{ + + reslog.log = data + reslog.date = convertDateToUTC(now) + //console.log('dataelse ' + JSON.stringify(reslog)); + //var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); + //var user = usertmp.data[0].user25user; + ///console.log('datauser ' + JSON.stringify(user)); + var user = "anagnostopoulos@uniwa.gr" + io.in(user).emit("logdata", reslog); + itemsProcessed++; + } + }); + + + setInterval(function () { + console.log('itemsProcessed', itemsProcessed); + }, 1000); +} + + + var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1" + const OPTS = { + useNewUrlParser: true, + useUnifiedTopology: true + }; + //var mongolab_uri = "mongodb://:@:,:/?replicaSet="; + var mongooptions = { + autoReconnect: true, + keepAlive: 1, + connectTimeoutMS: 30000, + socketTimeoutMS: 0 + } + MongoClient.connect(mongourl, OPTS, function(err, client){ + if(err){ + console.log(err); + } else { + const db = client.db('fluent'); + db.collection('logs', onCollection); + } + }); + +io.on('connection', s => { + console.error('socket connection'); + + // ------------------------------ + // --- set + // ------------------------------ + var usersession = new Object(); + usersession.SOCKET = {}; + usersession.SOCKET.error = {}; + console.error('socket ...'); + s.auth = false; + + // ------------------------------ + // --- authenticate + // ------------------------------ + s.on('authenticate', function(data){ + const token = data + console.log('invalid 1 ' + token); + (async() => { + var isvalid = await checkToken(token); + if(isvalid.action == 'ok'){ + console.log("Authserver ok ", s.id + ' - ' + token); + usersession.SOCKET.user = isvalid.user + usersession.SOCKET.scope = isvalid.scope // space delimeter + usersession.SOCKET.token = isvalid.token + s.auth = true; + }else{ + console.log("Authserver no ", s.id + ' - ' + token); + s.auth = false; + } + })() + }); + + setTimeout(function(){ + if (!s.auth) { + console.log("Disconnecting timeout socket ", s.id); + s.disconnect('unauthorized'); + }else{ + var room = usersession.SOCKET.user + //s.on("subscribe", function (room) { + // console.log("joining room", room); + s.join(room); + // }); + } + }, 30000); + + + + + var id = s.id + s.on('log', obj => { + console.error('from client '+ s.id + ' obj ' + obj); + }); + +}); http.listen(3000, () => console.error('listening on http://localhost:3000/')); console.error('socket.io example');