You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1092 lines
33 KiB
1092 lines
33 KiB
"use strict";
|
|
|
|
var pathmodule = require("path");
|
|
var app = require("express")();
|
|
var http = require("http").Server(app);
|
|
var https = require("https");
|
|
const mongoose = require("mongoose");
|
|
var CONFIG = require(pathmodule.resolve(__dirname, "runconfig.js"));
|
|
const io = require("socket.io")(http, {
|
|
// pingTimeout: 30000,
|
|
// allowUpgrades: false,
|
|
// serveClient: false,
|
|
// pingInterval: 10000,
|
|
// //transports: [ 'websocket', 'polling' ],
|
|
// transports: [ 'polling', 'websocket' ],
|
|
cors: {
|
|
origin: "http://localhost:8080",
|
|
methods: ["GET", "POST"],
|
|
allowedHeaders: ["my-custom-header"],
|
|
credentials: true,
|
|
},
|
|
cookie: {
|
|
name: "test",
|
|
httpOnly: false,
|
|
path: "/custom",
|
|
},
|
|
});
|
|
|
|
const createAdapter = require("socket.io-redis");
|
|
|
|
const Redis = require("ioredis");
|
|
|
|
const pubClient = new Redis({
|
|
host: "redisserver",
|
|
port: 6379,
|
|
});
|
|
|
|
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
|
|
const subClient = pubClient.duplicate();
|
|
|
|
io.adapter(createAdapter({ pubClient, subClient }));
|
|
|
|
pubClient.on("connect", function () {
|
|
console.log("You are now connected");
|
|
});
|
|
|
|
const MongoClient = require("mongodb").MongoClient;
|
|
const { DateTime } = require("luxon");
|
|
|
|
var async = require("async");
|
|
const { check, validationResult } = require("express-validator");
|
|
const urlExistSync = require("url-exist-sync");
|
|
|
|
var express = require("express");
|
|
app.use(express.json());
|
|
|
|
const axios = require("axios");
|
|
axios.defaults.timeout = 30000;
|
|
|
|
const helmet = require("helmet");
|
|
app.use(helmet());
|
|
|
|
const cors = require("cors");
|
|
const whitelist = [
|
|
"http://localhost:3000",
|
|
"http://localhost:8080",
|
|
"http://localhost:3080",
|
|
"http://localhost:3081",
|
|
"http://localhost:3082",
|
|
];
|
|
const corsOptions = {
|
|
credentials: true,
|
|
methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"],
|
|
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
|
allowedHeaders: [
|
|
"Content-Type",
|
|
"Authorization",
|
|
"X-Requested-With",
|
|
"device-remember-token",
|
|
"Access-Control-Allow-Origin",
|
|
"Access-Control-Allow-Headers",
|
|
"Origin",
|
|
"Accept",
|
|
],
|
|
origin: function (origin, callback) {
|
|
if (whitelist.indexOf(origin) !== -1) {
|
|
callback(null, true);
|
|
} else {
|
|
callback(null, true);
|
|
//callback(new Error('Not allowed by CORS'))
|
|
}
|
|
},
|
|
};
|
|
|
|
// ------------------------------
|
|
// --- LEFOS MONGO LOGGING
|
|
// ------------------------------
|
|
|
|
// Lefos - mongo test read
|
|
app.get("/test", cors(corsOptions), (req, res) => {
|
|
console.log("reading from db....");
|
|
|
|
var url = "mongodb://mongo:27017/";
|
|
|
|
MongoClient.connect(url, function (err, db) {
|
|
if (err) throw err;
|
|
var dbo = db.db("fluentdb");
|
|
dbo
|
|
.collection("test")
|
|
.find({})
|
|
.toArray(function (err, result) {
|
|
if (err) throw err;
|
|
// EPIDI EXW NESTED JSON PREPEI NA TO KANW PARSE DUO FORES
|
|
var obj = JSON.parse(JSON.stringify(result));
|
|
var jsonfinal = [];
|
|
obj.forEach((value) => {
|
|
if (value.log.includes("app_name")) {
|
|
jsonfinal.push(JSON.parse(value.log));
|
|
}
|
|
});
|
|
res.send(jsonfinal);
|
|
db.close();
|
|
});
|
|
});
|
|
});
|
|
|
|
console.log("reading from db....");
|
|
|
|
// Lefos-- variable poy krata to trexon room tou xrhsth kathe fora
|
|
var curRoom;
|
|
|
|
// var url = "mongodb://mongo:27017/";
|
|
// MongoClient.connect(url, function (err, db) {
|
|
// if (err) throw err;
|
|
// var dbo = db.db("fluentdb");
|
|
// dbo.collection("test", onCollectionNew);
|
|
// });
|
|
|
|
async function onCollectionNew(err, collection) {
|
|
/*
|
|
Prepei na elegxw kathe fora an to socket id tou user einai energo
|
|
wste na mhn diathreitai zwntanh h callback kai lamvanw dublicate
|
|
data ston client
|
|
*/
|
|
let options = {
|
|
tailable: true,
|
|
awaitdata: true,
|
|
numberOfRetries: -1,
|
|
tailableRetryInterval: 500,
|
|
};
|
|
var cursor = collection.find({}, options).stream();
|
|
var itemsProcessed = 0;
|
|
var room = this.user;
|
|
var sid = this.id;
|
|
console.log("Inside callback: " + room + " Id: " + sid);
|
|
// LEFOS --- STORE USER IN REDIS
|
|
var rep = setUser(sid, room);
|
|
|
|
cursor.on("data", async function (data) {
|
|
cursor.pause();
|
|
var res = await getkey(sid);
|
|
|
|
if (res == "1") {
|
|
cursor.resume();
|
|
var obj = JSON.parse(JSON.stringify(data));
|
|
// var getres = getkey(sid);
|
|
// if (getres == "1") {
|
|
// console.log("sending on event log");
|
|
|
|
// } else if (getres == "2") {
|
|
// cursor.close();
|
|
// }
|
|
io.in(room).emit("logsend", obj);
|
|
} else if (res == "2") {
|
|
cursor.resume();
|
|
console.log("Cursor is closing...");
|
|
cursor.close();
|
|
}
|
|
});
|
|
}
|
|
|
|
io.on("connection", (s) => {
|
|
console.error("socket connection");
|
|
|
|
// -------- Lefos section
|
|
//dbo.collection("test", onCollectionNew);
|
|
|
|
// --------
|
|
|
|
//s.set('transports', ['websocket']);
|
|
//s.set('pingTimeout', 30000);
|
|
//s.set('allowUpgrades', false);
|
|
//s.set('serveClient', false);
|
|
//s.set('pingInterval', 10000);
|
|
// ------------------------------
|
|
// --- set
|
|
// ------------------------------
|
|
var usersession = new Object();
|
|
usersession.SOCKET = {};
|
|
usersession.SOCKET.error = {};
|
|
console.error("socket ...");
|
|
s.auth = false;
|
|
|
|
// ------------------------------
|
|
// --- authenticate
|
|
// ------------------------------
|
|
s.on("authenticate", function (data) {
|
|
const token = data;
|
|
console.log("TEST LOG INSIDE ATHENTICATE SOCKET: " + token);
|
|
(async () => {
|
|
var isvalid = await checkToken(token);
|
|
if (isvalid.action == "ok") {
|
|
console.log("Authserver ok ", s.id + " - " + token);
|
|
// pubClient.set(session, resob1string, function(err, res) {
|
|
// });
|
|
usersession.SOCKET.user = isvalid.user;
|
|
usersession.SOCKET.scope = isvalid.scope; // space delimeter
|
|
usersession.SOCKET.token = isvalid.token;
|
|
|
|
//console.log("Reply: " + rep);
|
|
// -----
|
|
s.auth = true;
|
|
} else {
|
|
console.log("Authserver no ", s.id + " - " + token);
|
|
s.auth = false;
|
|
}
|
|
})();
|
|
});
|
|
|
|
s.on("onevent", function (data) {
|
|
//console.log("I GOT THE DATA: ", data);
|
|
var binddata = {
|
|
user: data,
|
|
id: s.id,
|
|
};
|
|
checkstream(binddata);
|
|
});
|
|
|
|
s.on("disconnect", function () {
|
|
//console.log("Socket: " + s.id + " Disconnected");
|
|
console.log("Deleting " + s.id + "From redis");
|
|
pubClient.del(s.id);
|
|
});
|
|
|
|
setTimeout(function () {
|
|
if (!s.auth) {
|
|
console.log("Disconnecting timeout socket ", s.id);
|
|
//s.disconnect('unauthorized');
|
|
} else {
|
|
var room = usersession.SOCKET.user;
|
|
//s.on("subscribe", function (room) {
|
|
s.join(room);
|
|
console.log("joining rooom", s.rooms);
|
|
console.log(room + " created ");
|
|
curRoom = room;
|
|
// });
|
|
}
|
|
}, 30000);
|
|
|
|
var id = s.id;
|
|
s.on("log", (obj) => {
|
|
console.error("from client " + s.id + " obj " + obj);
|
|
});
|
|
});
|
|
|
|
// ***************************************************
|
|
// checktoken
|
|
// ***************************************************
|
|
|
|
async function checkToken(token) {
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: "https://api.swarmlab.io",
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
Accept: "application/json",
|
|
"Content-Type": "multipart/form-data",
|
|
Authorization: "Bearer " + token,
|
|
},
|
|
});
|
|
try {
|
|
var pipelines = {
|
|
source: "ssologin",
|
|
};
|
|
var params = {
|
|
pipeline: pipelines,
|
|
};
|
|
|
|
var options = {
|
|
headers: {
|
|
"content-type": "application/x-www-form-urlencoded",
|
|
Authorization: `Bearer ${token}`,
|
|
},
|
|
};
|
|
|
|
instance.defaults.timeout = 30000;
|
|
const res = await instance.post("/istokenvalidsso", params, options);
|
|
if (res.status == 200) {
|
|
//console.log("check " +JSON.stringify(res.data))
|
|
return res.data;
|
|
} else {
|
|
console.log("noerror: " + res);
|
|
|
|
return res.status;
|
|
}
|
|
} catch (err) {
|
|
console.error("error: " + err);
|
|
var error = new Object();
|
|
error.action = "401";
|
|
return error;
|
|
}
|
|
}
|
|
|
|
function convertDateToUTC(date) {
|
|
return new Date(
|
|
date.getUTCFullYear(),
|
|
date.getUTCMonth(),
|
|
date.getUTCDate(),
|
|
date.getUTCHours(),
|
|
date.getUTCMinutes(),
|
|
date.getUTCSeconds(),
|
|
date.getUTCMilliseconds()
|
|
);
|
|
}
|
|
|
|
// ***************************************************
|
|
// get pipelines
|
|
// ***************************************************
|
|
|
|
async function getpipelines(token, pipelinename) {
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: "https://api.swarmlab.io",
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
Accept: "application/json",
|
|
"Content-Type": "multipart/form-data",
|
|
Authorization: "Bearer " + token,
|
|
},
|
|
});
|
|
/*
|
|
var params = {
|
|
playbook: value
|
|
}
|
|
var options = {
|
|
params: params,
|
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
|
|
};
|
|
|
|
const playbook = await api.GET('playbookCode',options);
|
|
return playbook
|
|
*/
|
|
try {
|
|
var pipelines = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename,
|
|
};
|
|
//var params = {
|
|
// pipeline: pipelines
|
|
// }
|
|
var params = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename,
|
|
};
|
|
|
|
var options = {
|
|
params: params,
|
|
headers: {
|
|
"content-type": "application/x-www-form-urlencoded",
|
|
Authorization: `Bearer ${token}`,
|
|
},
|
|
};
|
|
|
|
//https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes
|
|
instance.defaults.timeout = 30000;
|
|
//const res = await instance.get('/getplaygrounds',params,options);
|
|
const res = await instance.get("/getplaygrounds", options);
|
|
if (res.status == 200) {
|
|
return res.data;
|
|
} else {
|
|
console.log("noerror: " + res);
|
|
return await res.status;
|
|
}
|
|
} catch (err) {
|
|
console.error("error: " + err);
|
|
var error = new Object();
|
|
error.action = "401";
|
|
return await error;
|
|
}
|
|
}
|
|
|
|
// ***************************************************
|
|
// get user pipelines
|
|
// ***************************************************
|
|
|
|
async function getuserpipelines(token, user, swarmlabname) {
|
|
var pipelinename = user;
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: "https://api.swarmlab.io",
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
Accept: "application/json",
|
|
"Content-Type": "multipart/form-data",
|
|
Authorization: "Bearer " + token,
|
|
},
|
|
});
|
|
try {
|
|
var pipelines = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename,
|
|
swarmlabname: swarmlabname,
|
|
};
|
|
//var params = {
|
|
// pipeline: pipelines
|
|
// }
|
|
var params = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename,
|
|
swarmlabname: swarmlabname,
|
|
};
|
|
|
|
var options = {
|
|
params: params,
|
|
headers: {
|
|
"content-type": "application/x-www-form-urlencoded",
|
|
Authorization: `Bearer ${token}`,
|
|
},
|
|
};
|
|
|
|
instance.defaults.timeout = 30000;
|
|
const res = await instance.get("/getuserplaygrounds", options);
|
|
if (res.status == 200) {
|
|
return res.data;
|
|
} else {
|
|
console.log("noerror: " + res);
|
|
return await res.status;
|
|
}
|
|
} catch (err) {
|
|
console.error("error: " + err);
|
|
var error = new Object();
|
|
error.action = "401";
|
|
error.error = err;
|
|
return await error;
|
|
}
|
|
}
|
|
|
|
global.online = "ob";
|
|
global.pipelines = [];
|
|
|
|
function sendlog(reslog, pathfileval) {
|
|
var usertmp = global.pipelines.find((x) => x.pathlogfile == pathfileval);
|
|
//for (var key in usertmp.data){
|
|
var user = usertmp.data[0].user25user;
|
|
// if(usertmp.data){
|
|
console.log("-----------------------" + JSON.stringify(usertmp));
|
|
io.in(user).emit("logdata", reslog);
|
|
// }
|
|
//}
|
|
}
|
|
|
|
function onlogfile(path) {
|
|
console.log("File", path, "has been added");
|
|
var pathfileval = pathmodule.basename(path);
|
|
var arrfile = pathfileval.toString().split("-");
|
|
var pathfile = arrfile[0];
|
|
var indexfind1 = global.pipelines.findIndex(
|
|
(x) => x.pathlogfile == pathfileval
|
|
);
|
|
console.log(
|
|
"file11111111111111111111111111111111 " + JSON.stringify(pathfileval)
|
|
);
|
|
if (indexfind1 === -1) {
|
|
(async () => {
|
|
console.log(
|
|
"file2222222222222222222222222222222222222 " +
|
|
JSON.stringify(pathfileval)
|
|
);
|
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
var resdata = await getpipelines(token, pathfile);
|
|
//resdata.data.pathlogfile = 'test'
|
|
var resob = {};
|
|
resob.pathlogfile = pathfileval;
|
|
var resobarray = [];
|
|
for (let i in resdata.data) {
|
|
var resob1 = {};
|
|
resob1.data = resdata.data[i].res25swarmlabname;
|
|
resob1.user25user = resdata.data[i].res25user;
|
|
resob1.res25creator = resdata.data[i].res25creator;
|
|
resob1.res25fileforce = resdata.data[i].res25fileforce;
|
|
resobarray.push(resob1);
|
|
}
|
|
resob.data = resobarray;
|
|
var indexfind = global.pipelines.findIndex(
|
|
(x) => x.pathlogfile == pathfileval
|
|
);
|
|
indexfind === -1
|
|
? global.pipelines.push(resob)
|
|
: console.log("object already exists " + pathfileval);
|
|
})();
|
|
}
|
|
}
|
|
|
|
async function checkstream(data) {
|
|
var res = await getkey(data.id);
|
|
if (res == "1") {
|
|
console.log("Stream is on!");
|
|
} else {
|
|
console.log("Creating Stream....");
|
|
|
|
var url = "mongodb://mongo:27017/";
|
|
MongoClient.connect(url, function (err, db) {
|
|
if (err) throw err;
|
|
var dbo = db.db("fluentdb");
|
|
dbo.collection("test", onCollectionNew.bind(data));
|
|
});
|
|
}
|
|
}
|
|
// ***************************************************
|
|
// rest get
|
|
// ***************************************************
|
|
|
|
// app.get(
|
|
// "/get_log",
|
|
// [check("token").isLength({ min: 40 })],
|
|
// cors(corsOptions),
|
|
// (req, res, next) => {
|
|
// (async () => {
|
|
// var RES = new Object();
|
|
// RES.token = req.query["token"];
|
|
// RES.start = req.query["start"];
|
|
// RES.end = req.query["end"];
|
|
// RES.swarmlabname = req.query["swarmlabname"];
|
|
// RES.ok = "ok";
|
|
// /*
|
|
// *
|
|
// * validate
|
|
// *
|
|
// */
|
|
|
|
// var isvalid = await checkToken(RES.token);
|
|
// if (isvalid.action == "ok") {
|
|
// console.log("Authserver ok " + RES.token);
|
|
// RES.error = "ok";
|
|
// } else {
|
|
// console.log("Authserver no " + RES.token);
|
|
// RES.error = "no";
|
|
// }
|
|
// if (RES.error == "ok") {
|
|
// var resdata = await getuserpipelines(
|
|
// RES.token,
|
|
// isvalid.user,
|
|
// RES.swarmlabname
|
|
// );
|
|
// var mongourl =
|
|
// "mongodb://" +
|
|
// CONFIG.mongo.user +
|
|
// ":" +
|
|
// CONFIG.mongo.password +
|
|
// "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats";
|
|
// const OPTS = {
|
|
// useNewUrlParser: true,
|
|
// useUnifiedTopology: true,
|
|
// };
|
|
// MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
// if (err) {
|
|
// console.log(err);
|
|
// } else {
|
|
// const db = client.db("swarmlabplaygroundstats");
|
|
// //usersession.SOCKET.user = isvalid.user
|
|
// console.log(JSON.stringify("mongo ----------------connected"));
|
|
// console.log("-----test------- " + JSON.stringify(RES));
|
|
// if (
|
|
// typeof RES.start !== "undefined" &&
|
|
// typeof RES.end !== "undefined"
|
|
// ) {
|
|
// if (DateTime.fromISO(RES.start).isValid) {
|
|
// var datestart = DateTime.fromISO(RES.start);
|
|
// var dateend = DateTime.fromISO(RES.end);
|
|
// var search_term = {
|
|
// $and: [
|
|
// {
|
|
// time: {
|
|
// $gte: datestart,
|
|
// },
|
|
// },
|
|
// {
|
|
// time: {
|
|
// $lt: dateend,
|
|
// },
|
|
// },
|
|
// ],
|
|
// };
|
|
// } else {
|
|
// RES.ok = "no";
|
|
// }
|
|
// } else if (typeof RES.end !== "undefined") {
|
|
// var dateend = DateTime.fromISO(RES.end);
|
|
// if (DateTime.fromISO(RES.end).isValid) {
|
|
// var search_term = {
|
|
// $and: [
|
|
// {
|
|
// time: {
|
|
// $lt: dateend,
|
|
// },
|
|
// },
|
|
// ],
|
|
// };
|
|
// } else {
|
|
// RES.ok = "no";
|
|
// }
|
|
// } else if (typeof RES.start !== "undefined") {
|
|
// var datestart = DateTime.fromISO(RES.start);
|
|
// if (DateTime.fromISO(RES.start).isValid) {
|
|
// var search_term = {
|
|
// $and: [
|
|
// {
|
|
// time: {
|
|
// $gte: datestart,
|
|
// },
|
|
// },
|
|
// ],
|
|
// };
|
|
// } else {
|
|
// RES.ok = "no";
|
|
// }
|
|
// }
|
|
// if (RES.ok == "ok") {
|
|
// //var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }'
|
|
// //var search_term = {"time" : {$lte : datenow}}
|
|
// var resdataarray = [];
|
|
// var resraw = {};
|
|
// var reslab = "";
|
|
// var datestart1 = DateTime.fromISO(RES.start);
|
|
// console.log("-----now1------- " + JSON.stringify(search_term));
|
|
// console.log("-----now2------- " + JSON.stringify(datestart1));
|
|
// console.log("-----now3------- " + JSON.stringify(datestart));
|
|
|
|
// db.collection("logs")
|
|
// .find(search_term)
|
|
// .toArray()
|
|
// //db.collection('logs').find({"time" : {$gt : datestart}}).toArray()
|
|
// .then((item) => {
|
|
// console.log("item " + JSON.stringify(item));
|
|
// for (let i in item) {
|
|
// reslab = item[i].tailed_path;
|
|
// var segment_array = reslab.split("/");
|
|
// var last_segment = segment_array.pop();
|
|
// var fieldstmp = last_segment.split("-");
|
|
// var nameofswarmlab = fieldstmp[0];
|
|
|
|
// var regexlog = new RegExp(nameofswarmlab);
|
|
// for (let ii in resdata.data) {
|
|
// if (regexlog.test(resdata.data[ii].res25swarmlabname)) {
|
|
// resdataarray.push(item[i]);
|
|
// RES.found = item[i];
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
// RES.error_msg = "ok";
|
|
// RES.data = resdataarray;
|
|
// //RES.dataserver = resdataarray
|
|
// //RES.dataservertmp = resdata
|
|
// res.json(RES);
|
|
// })
|
|
// .catch((err) => {
|
|
// console.error(err);
|
|
// RES.error_msg = err;
|
|
// res.json(RES);
|
|
// });
|
|
// } else {
|
|
// // RES.ok
|
|
// RES.error_msg = "no date";
|
|
// res.json(RES);
|
|
// }
|
|
// } // error mongo connect
|
|
// }); // mongo connect
|
|
// } else {
|
|
// // token error
|
|
// RES.data = "no";
|
|
// RES.error_msg = "token err";
|
|
// res.json(RES);
|
|
// }
|
|
// })();
|
|
// }
|
|
// );
|
|
|
|
// app.get(
|
|
// "/run",
|
|
// [
|
|
// //check('access_token').isLength({ min: 40 }),
|
|
// //check('llo').isBase64()
|
|
// ],
|
|
// cors(corsOptions),
|
|
// (req, res, next) => {
|
|
// (async () => {
|
|
// var mongourl =
|
|
// "mongodb://" +
|
|
// CONFIG.mongo.user +
|
|
// ":" +
|
|
// CONFIG.mongo.password +
|
|
// "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats";
|
|
// const OPTS = {
|
|
// useNewUrlParser: true,
|
|
// useUnifiedTopology: true,
|
|
// };
|
|
// MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
// if (err) {
|
|
// console.log(err);
|
|
// } else {
|
|
// const db = client.db("swarmlabplaygroundstats");
|
|
// //db.collection('log', onCollection);
|
|
// console.log(JSON.stringify("mongo connected"));
|
|
// var stream = db
|
|
// .collection("logs")
|
|
// .find(
|
|
// {},
|
|
// {
|
|
// tailable: true,
|
|
// awaitdata: true,
|
|
// /* other options */
|
|
// }
|
|
// )
|
|
// .stream();
|
|
|
|
// stream.on("data", function (doc) {
|
|
// console.log(JSON.stringify(doc));
|
|
// //socket.write(JSON.stringify({'action': 'log','param': doc.log}));
|
|
// });
|
|
// }
|
|
// });
|
|
// var RES = new Object();
|
|
// RES.code = req.query["filter"];
|
|
// RES.token = req.query["filter"];
|
|
// var isvalid = await checkToken(RES.token);
|
|
// if (isvalid.action == "ok") {
|
|
// console.log("Authserver ok " + RES.token);
|
|
// } else {
|
|
// console.log("Authserver no " + RES.token);
|
|
// }
|
|
// RES.error = false;
|
|
// RES.error_msg = "ok";
|
|
// res.json(RES);
|
|
// })();
|
|
// }
|
|
// );
|
|
|
|
// // ***************************************************
|
|
// // rest post
|
|
// // ***************************************************
|
|
|
|
// app.post(
|
|
// "/run",
|
|
// [
|
|
// //check('access_token').isLength({ min: 40 }),
|
|
// //check('llo').isBase64()
|
|
// ],
|
|
// cors(corsOptions),
|
|
// (req, res, next) => {
|
|
// (async () => {
|
|
// //console.log(JSON.stringify(req.headers));
|
|
// //console.log(JSON.stringify(req.body));
|
|
// //console.log("mongo "+JSON.stringify(req.body));
|
|
// //console.log("LOG "+JSON.stringify(req.body[0].message));
|
|
// //console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
|
|
// for (var i = 0; i < req.body.length; i++) {
|
|
// //var getpath = await onlogfile(req.body[i].tailed_path)
|
|
|
|
// var path = req.body[i].tailed_path;
|
|
|
|
// console.log("File", path, "has been added");
|
|
// var pathfileval = pathmodule.basename(path);
|
|
// var arrfile = pathfileval.toString().split("-");
|
|
// var pathfile = arrfile[0];
|
|
// var indexfind1 = global.pipelines.findIndex(
|
|
// (x) => x.pathlogfile == pathfileval
|
|
// );
|
|
// console.log(
|
|
// "file11111111111111111111111111111111 " + JSON.stringify(pathfileval)
|
|
// );
|
|
// if (indexfind1 === -1) {
|
|
// (async () => {
|
|
// console.log(
|
|
// "file2222222222222222222222222222222222222 " +
|
|
// JSON.stringify(pathfileval)
|
|
// );
|
|
// var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
// var resdata = await getpipelines(token, pathfile);
|
|
// //resdata.data.pathlogfile = 'test'
|
|
// var resob = {};
|
|
// resob.pathlogfile = pathfileval;
|
|
// var resobarray = [];
|
|
// for (let i in resdata.data) {
|
|
// var resob1 = {};
|
|
// resob1.data = resdata.data[i].res25swarmlabname;
|
|
// resob1.user25user = resdata.data[i].res25user;
|
|
// resob1.res25creator = resdata.data[i].res25creator;
|
|
// resob1.res25fileforce = resdata.data[i].res25fileforce;
|
|
// resobarray.push(resob1);
|
|
// }
|
|
// resob.data = resobarray;
|
|
// //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
// var indexfind = global.pipelines.findIndex(
|
|
// (x) => x.pathlogfile == pathfileval
|
|
// );
|
|
|
|
// //indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
// indexfind === -1
|
|
// ? global.pipelines.push(resob)
|
|
// : console.log("object already exists " + pathfileval);
|
|
|
|
// //console.log('info', JSON.stringify(resdata));
|
|
// //console.log('info------------- ', JSON.stringify(global.pipelines));
|
|
// })();
|
|
// }
|
|
// //
|
|
// var obj = req.body[i];
|
|
|
|
// //var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
|
|
// //indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
|
|
// var now = new Date();
|
|
|
|
// var reslog = new Object();
|
|
// reslog.log = obj;
|
|
|
|
// reslog.date = convertDateToUTC(now);
|
|
// console.log(reslog);
|
|
// var pathfileval = pathmodule.basename(reslog.log.tailed_path);
|
|
// var indexfind = global.pipelines.findIndex(
|
|
// (x) => x.pathlogfile == pathfileval
|
|
// );
|
|
// //indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
|
|
// indexfind === -1
|
|
// ? console.log("object not found")
|
|
// : sendlog(reslog, pathfileval);
|
|
// console.log("IOT " + JSON.stringify(reslog.log.tailed_path));
|
|
// console.log("IOTindexfind " + JSON.stringify(indexfind));
|
|
// console.log("IOTuser " + JSON.stringify(global.pipelines));
|
|
// // io.in("iot").emit("message", reslog);
|
|
// // io.emit("logdata", reslog);
|
|
// }
|
|
// })();
|
|
|
|
// //io.in("iot").emit("message", RES);
|
|
|
|
// console.error("socket POST from client");
|
|
// var RES = new Object();
|
|
// RES.error = false;
|
|
// RES.error_msg = "ok";
|
|
// RES.msg = req.body[0].messsage;
|
|
|
|
// res.json(RES);
|
|
// }
|
|
// );
|
|
|
|
// ***************************************************
|
|
// socket
|
|
// ***************************************************
|
|
|
|
//function getSHA256ofJSON(input){
|
|
// return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
|
|
//}
|
|
|
|
function sleep(ms) {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
function getSHA256ofJSON(data, inputEncoding, encoding) {
|
|
if (!data) {
|
|
return "";
|
|
}
|
|
inputEncoding = inputEncoding || "utf-8";
|
|
encoding = encoding || "hex";
|
|
const hash = require("crypto").createHash("md5");
|
|
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding);
|
|
}
|
|
|
|
// --- LEFOS - get user via token from REDIS
|
|
async function getUser(token) {
|
|
return new Promise((resolve) => {
|
|
pubClient.get(token, function (err, reply) {
|
|
if (err) {
|
|
console.log("----------error------------");
|
|
|
|
resolve(null);
|
|
} else {
|
|
if (reply) {
|
|
console.log("---------found----------");
|
|
resolve(1);
|
|
} else {
|
|
console.log("----------not found------------");
|
|
resolve(2);
|
|
//return 2
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
//var getkey = function getkey(key) {
|
|
async function getkey(id) {
|
|
return new Promise((resolve) => {
|
|
pubClient.get(id, function (err, reply) {
|
|
if (err) {
|
|
console.log("----------error------------");
|
|
|
|
resolve(null);
|
|
} else {
|
|
if (reply) {
|
|
//console.log("---------fount----------");
|
|
resolve(1);
|
|
} else {
|
|
console.log("----------not fount------------");
|
|
resolve(2);
|
|
//return 2
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
// Lefos === Set the user to redis
|
|
var setUser = function setus(id, user) {
|
|
return new Promise((resolve) => {
|
|
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
|
pubClient.set(id, user, function (err, reply) {
|
|
if (err) {
|
|
resolve(null);
|
|
} else {
|
|
resolve(reply);
|
|
}
|
|
});
|
|
});
|
|
};
|
|
// ===
|
|
var setkey = function setkv(key, value) {
|
|
return new Promise((resolve) => {
|
|
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
|
pubClient.set(key, value, function (err, reply) {
|
|
if (err) {
|
|
resolve(null);
|
|
} else {
|
|
resolve(reply);
|
|
}
|
|
});
|
|
});
|
|
};
|
|
|
|
async function iosend(data, issend, io, pubClient, user1) {
|
|
var new1 = {};
|
|
new1.tailed_path = data.tailed_path;
|
|
new1.message = data.message;
|
|
|
|
var now = new Date();
|
|
var reslog1 = {};
|
|
//reslog1.data = resob1
|
|
reslog1.log = new1;
|
|
reslog1.date = convertDateToUTC(now);
|
|
var user = user1;
|
|
|
|
const randomTimeInMs = Math.random() * 2000;
|
|
await sleep(randomTimeInMs);
|
|
var getres = await getkey(issend);
|
|
|
|
if (getres == "1") {
|
|
console.log(issend + " ---1 " + JSON.stringify(reslog1));
|
|
//io.in(user).emit("logdata", reslog1);
|
|
} else if (getres == "2") {
|
|
console.log(issend + " ---2 " + JSON.stringify(reslog1));
|
|
setkey(issend, "1");
|
|
//pubClient.set(issend, '1', function(err, res) {
|
|
//});
|
|
io.in(user).emit("logdata", reslog1);
|
|
//}
|
|
}
|
|
}
|
|
|
|
function onCollection(err, collection) {
|
|
let options = {
|
|
tailable: true,
|
|
awaitdata: true,
|
|
numberOfRetries: -1,
|
|
tailableRetryInterval: 500,
|
|
};
|
|
var cursor = collection.find({}, options).stream();
|
|
var itemsProcessed = 0;
|
|
|
|
var reslog = new Object();
|
|
var now = new Date();
|
|
cursor.on("data", function (data) {
|
|
var issendob = new Object();
|
|
issendob.id = data._id;
|
|
issendob.message = data.message;
|
|
issendob.tailed_path = data.tailed_path;
|
|
|
|
var issend = getSHA256ofJSON(issendob);
|
|
|
|
console.log("++++++++" + JSON.stringify(data));
|
|
console.log("++++++++ob" + JSON.stringify(issendob));
|
|
console.log("++++++++sha" + JSON.stringify(issend));
|
|
|
|
var pathfileval = pathmodule.basename(data.tailed_path);
|
|
var arrfile = pathfileval.toString().split("-");
|
|
var pathfile = arrfile[0];
|
|
|
|
var indexupdate = "yes";
|
|
var resob = {};
|
|
pubClient.get(pathfileval, function (err, object) {
|
|
var objecttmp = JSON.parse(object);
|
|
if (object) {
|
|
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, "");
|
|
iosend(data, issend, io, pubClient, user1);
|
|
} else {
|
|
(async () => {
|
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
var resdata1 = await getpipelines(token, pathfile);
|
|
resob.pathlogfile = pathfileval;
|
|
var resob11 = {};
|
|
var i1 = 0;
|
|
resob11.data = resdata1.data[i1].res25swarmlabname;
|
|
resob11.user25user = resdata1.data[i1].res25user.replace(
|
|
/[\n\t\r]/g,
|
|
""
|
|
);
|
|
resob11.res25creator = resdata1.data[i1].res25creator;
|
|
resob11.res25fileforce = resdata1.data[i1].res25fileforce;
|
|
resob11.tailed_path = pathfileval;
|
|
var resob1string1 = JSON.stringify(resob11);
|
|
await pubClient.set(
|
|
pathfileval,
|
|
resob1string1,
|
|
function (err, res) {}
|
|
);
|
|
var user1 = resob11.user25user;
|
|
iosend(data, issend, io, pubClient, user1);
|
|
console.log(" ---no--- " + JSON.stringify(data));
|
|
})(); //await inside yes
|
|
}
|
|
});
|
|
});
|
|
|
|
setInterval(function () {
|
|
console.log("itemsProcessed", itemsProcessed);
|
|
// this method is also exposed by the Server instance
|
|
//console.log(adapter.rooms);
|
|
}, 8000);
|
|
}
|
|
|
|
/// ARXIKO MONGO CONNECTION ///
|
|
// var mongourl =
|
|
// "mongodb://" +
|
|
// CONFIG.mongo.user +
|
|
// ":" +
|
|
// CONFIG.mongo.password +
|
|
// "@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats";
|
|
// const OPTS = {
|
|
// useNewUrlParser: true,
|
|
// useUnifiedTopology: true,
|
|
// };
|
|
// var mongooptions = {
|
|
// autoReconnect: true,
|
|
// keepAlive: 1,
|
|
// connectTimeoutMS: 30000,
|
|
// socketTimeoutMS: 0,
|
|
// };
|
|
|
|
// MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
// if (err) {
|
|
// console.log(err);
|
|
// } else {
|
|
// const db = client.db("swarmlabplaygroundstats");
|
|
// db.collection("logs", onCollection);
|
|
// }
|
|
// });
|
|
|
|
http.listen(3000, () => console.error("listening on http://localhost:3000/"));
|
|
console.error("socket.io example");
|
|
|