lefteris
4 years ago
6 changed files with 498 additions and 1333 deletions
@ -0,0 +1,27 @@ |
|||||
|
module.exports = { |
||||
|
apps: [ |
||||
|
{ |
||||
|
name: "readmongo", |
||||
|
autorestart: true, |
||||
|
watch: true, |
||||
|
//"script" : "/home/node/swarmlab-app/run/app.js",
|
||||
|
cwd: "/usr/src/app/swarmlab-app/src", |
||||
|
script: "run/app.js", |
||||
|
run_as_user: "node", |
||||
|
args: "start", |
||||
|
//"node_args" : "--harmony",
|
||||
|
//"node_args" : "['--trace-deprecation']",
|
||||
|
pid_file: "/home/node/run/pid.pid", |
||||
|
log_type: "json", |
||||
|
log_file: "/home/node/logs/logfile", |
||||
|
error_file: "/home/node/logs/errorfile", |
||||
|
out_file: "/home/node/logs/outfile", |
||||
|
log_date_format: "YYYY-MM-DD HH:mm Z", |
||||
|
merge_logs: true, |
||||
|
exec_mode: "fork", |
||||
|
max_restarts: 10, |
||||
|
max_memory_restart: "500M", |
||||
|
restart_delay: 1000, |
||||
|
}, |
||||
|
], |
||||
|
}; |
@ -1,922 +0,0 @@ |
|||||
"use strict"; |
|
||||
|
|
||||
var pathmodule = require("path"); |
|
||||
var app = require("express")(); |
|
||||
var http = require("http").Server(app); |
|
||||
var https = require("https"); |
|
||||
var CONFIG = require(pathmodule.resolve(__dirname, "runconfig.js")); |
|
||||
const io = require("socket.io")(http, { |
|
||||
// pingTimeout: 30000,
|
|
||||
// allowUpgrades: false,
|
|
||||
// serveClient: false,
|
|
||||
// pingInterval: 10000,
|
|
||||
// //transports: [ 'websocket', 'polling' ],
|
|
||||
// transports: [ 'polling', 'websocket' ],
|
|
||||
cors: { |
|
||||
origin: "http://localhost:8080", |
|
||||
methods: ["GET", "POST"], |
|
||||
allowedHeaders: ["my-custom-header"], |
|
||||
credentials: true, |
|
||||
}, |
|
||||
cookie: { |
|
||||
name: "test", |
|
||||
httpOnly: false, |
|
||||
path: "/custom", |
|
||||
}, |
|
||||
}); |
|
||||
|
|
||||
const createAdapter = require("socket.io-redis"); |
|
||||
|
|
||||
const Redis = require("ioredis"); |
|
||||
|
|
||||
const pubClient = new Redis({ |
|
||||
host: "redisserver", |
|
||||
port: 6379, |
|
||||
}); |
|
||||
|
|
||||
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
|
|
||||
const subClient = pubClient.duplicate(); |
|
||||
|
|
||||
io.adapter(createAdapter({ pubClient, subClient })); |
|
||||
|
|
||||
pubClient.on("connect", function () { |
|
||||
console.log("You are now connected re mas koro"); |
|
||||
}); |
|
||||
|
|
||||
const MongoClient = require("mongodb").MongoClient; |
|
||||
const { DateTime } = require("luxon"); |
|
||||
|
|
||||
var async = require("async"); |
|
||||
const { check, validationResult } = require("express-validator"); |
|
||||
const urlExistSync = require("url-exist-sync"); |
|
||||
|
|
||||
var express = require("express"); |
|
||||
app.use(express.json()); |
|
||||
|
|
||||
const axios = require("axios"); |
|
||||
axios.defaults.timeout = 30000; |
|
||||
|
|
||||
const helmet = require("helmet"); |
|
||||
app.use(helmet()); |
|
||||
|
|
||||
const cors = require("cors"); |
|
||||
const whitelist = [ |
|
||||
"http://localhost:8080", |
|
||||
"http://localhost:3080", |
|
||||
"http://localhost:3081", |
|
||||
"http://localhost:3082", |
|
||||
]; |
|
||||
const corsOptions = { |
|
||||
credentials: true, |
|
||||
methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"], |
|
||||
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
|
||||
allowedHeaders: [ |
|
||||
"Content-Type", |
|
||||
"Authorization", |
|
||||
"X-Requested-With", |
|
||||
"device-remember-token", |
|
||||
"Access-Control-Allow-Origin", |
|
||||
"Access-Control-Allow-Headers", |
|
||||
"Origin", |
|
||||
"Accept", |
|
||||
], |
|
||||
origin: function (origin, callback) { |
|
||||
if (whitelist.indexOf(origin) !== -1) { |
|
||||
callback(null, true); |
|
||||
} else { |
|
||||
callback(null, true); |
|
||||
//callback(new Error('Not allowed by CORS'))
|
|
||||
} |
|
||||
}, |
|
||||
}; |
|
||||
|
|
||||
io.on("connection", (s) => { |
|
||||
console.error("socket connection1"); |
|
||||
|
|
||||
// ------------------------------
|
|
||||
// --- set
|
|
||||
// ------------------------------
|
|
||||
var usersession = new Object(); |
|
||||
usersession.SOCKET = {}; |
|
||||
usersession.SOCKET.error = {}; |
|
||||
console.error("socket ..."); |
|
||||
s.auth = false; |
|
||||
|
|
||||
// ------------------------------
|
|
||||
// --- authenticate
|
|
||||
// ------------------------------
|
|
||||
s.on("authenticate", function (data) { |
|
||||
const token = data; |
|
||||
console.log("invalid 1 " + token); |
|
||||
(async () => { |
|
||||
var isvalid = await checkToken(token); |
|
||||
if (isvalid.action == "ok") { |
|
||||
console.log("Authserver ok ", s.id + " - " + token); |
|
||||
// pubClient.set(session, resob1string, function(err, res) {
|
|
||||
// });
|
|
||||
usersession.SOCKET.user = isvalid.user; |
|
||||
usersession.SOCKET.scope = isvalid.scope; // space delimeter
|
|
||||
usersession.SOCKET.token = isvalid.token; |
|
||||
s.auth = true; |
|
||||
} else { |
|
||||
console.log("Authserver no ", s.id + " - " + token); |
|
||||
s.auth = false; |
|
||||
} |
|
||||
})(); |
|
||||
}); |
|
||||
|
|
||||
setTimeout(function () { |
|
||||
if (!s.auth) { |
|
||||
console.log("Disconnecting timeout socket ", s.id); |
|
||||
//s.disconnect('unauthorized');
|
|
||||
} else { |
|
||||
var room = usersession.SOCKET.user; |
|
||||
//s.on("subscribe", function (room) {
|
|
||||
s.join(room); |
|
||||
console.log("joining rooom", s.rooms); |
|
||||
console.log(room + " created "); |
|
||||
// });
|
|
||||
} |
|
||||
}, 30000); |
|
||||
|
|
||||
var id = s.id; |
|
||||
s.on("log", (obj) => { |
|
||||
console.error("from client " + s.id + " obj " + obj); |
|
||||
}); |
|
||||
}); |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// checktoken
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function checkToken(token) { |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: "https://api.swarmlab.io", |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
Accept: "application/json", |
|
||||
"Content-Type": "multipart/form-data", |
|
||||
Authorization: "Bearer " + token, |
|
||||
}, |
|
||||
}); |
|
||||
try { |
|
||||
var pipelines = { |
|
||||
source: "ssologin", |
|
||||
}; |
|
||||
var params = { |
|
||||
pipeline: pipelines, |
|
||||
}; |
|
||||
|
|
||||
var options = { |
|
||||
headers: { |
|
||||
"content-type": "application/x-www-form-urlencoded", |
|
||||
Authorization: `Bearer ${token}`, |
|
||||
}, |
|
||||
}; |
|
||||
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
const res = await instance.post("/istokenvalidsso", params, options); |
|
||||
if (res.status == 200) { |
|
||||
//console.log("check " +JSON.stringify(res.data))
|
|
||||
return res.data; |
|
||||
} else { |
|
||||
console.log("noerror: " + res); |
|
||||
return res.status; |
|
||||
} |
|
||||
} catch (err) { |
|
||||
console.error("error: " + err); |
|
||||
var error = new Object(); |
|
||||
error.action = "401"; |
|
||||
return error; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
function convertDateToUTC(date) { |
|
||||
return new Date( |
|
||||
date.getUTCFullYear(), |
|
||||
date.getUTCMonth(), |
|
||||
date.getUTCDate(), |
|
||||
date.getUTCHours(), |
|
||||
date.getUTCMinutes(), |
|
||||
date.getUTCSeconds(), |
|
||||
date.getUTCMilliseconds() |
|
||||
); |
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// get pipelines
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function getpipelines(token, pipelinename) { |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: "https://api.swarmlab.io", |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
Accept: "application/json", |
|
||||
"Content-Type": "multipart/form-data", |
|
||||
Authorization: "Bearer " + token, |
|
||||
}, |
|
||||
}); |
|
||||
/* |
|
||||
var params = { |
|
||||
playbook: value |
|
||||
} |
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
|
||||
}; |
|
||||
|
|
||||
const playbook = await api.GET('playbookCode',options); |
|
||||
return playbook |
|
||||
*/ |
|
||||
try { |
|
||||
var pipelines = { |
|
||||
querytokenFilter: CONFIG.api.token, |
|
||||
filter: pipelinename, |
|
||||
}; |
|
||||
//var params = {
|
|
||||
// pipeline: pipelines
|
|
||||
// }
|
|
||||
var params = { |
|
||||
querytokenFilter: CONFIG.api.token, |
|
||||
filter: pipelinename, |
|
||||
}; |
|
||||
|
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { |
|
||||
"content-type": "application/x-www-form-urlencoded", |
|
||||
Authorization: `Bearer ${token}`, |
|
||||
}, |
|
||||
}; |
|
||||
|
|
||||
//https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
//const res = await instance.get('/getplaygrounds',params,options);
|
|
||||
const res = await instance.get("/getplaygrounds", options); |
|
||||
if (res.status == 200) { |
|
||||
return res.data; |
|
||||
} else { |
|
||||
console.log("noerror: " + res); |
|
||||
return await res.status; |
|
||||
} |
|
||||
} catch (err) { |
|
||||
console.error("error: " + err); |
|
||||
var error = new Object(); |
|
||||
error.action = "401"; |
|
||||
return await error; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// get user pipelines
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function getuserpipelines(token, user, swarmlabname) { |
|
||||
var pipelinename = user; |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: "https://api.swarmlab.io", |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
Accept: "application/json", |
|
||||
"Content-Type": "multipart/form-data", |
|
||||
Authorization: "Bearer " + token, |
|
||||
}, |
|
||||
}); |
|
||||
try { |
|
||||
var pipelines = { |
|
||||
querytokenFilter: CONFIG.api.token, |
|
||||
filter: pipelinename, |
|
||||
swarmlabname: swarmlabname, |
|
||||
}; |
|
||||
//var params = {
|
|
||||
// pipeline: pipelines
|
|
||||
// }
|
|
||||
var params = { |
|
||||
querytokenFilter: CONFIG.api.token, |
|
||||
filter: pipelinename, |
|
||||
swarmlabname: swarmlabname, |
|
||||
}; |
|
||||
|
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { |
|
||||
"content-type": "application/x-www-form-urlencoded", |
|
||||
Authorization: `Bearer ${token}`, |
|
||||
}, |
|
||||
}; |
|
||||
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
const res = await instance.get("/getuserplaygrounds", options); |
|
||||
if (res.status == 200) { |
|
||||
return res.data; |
|
||||
} else { |
|
||||
console.log("noerror: " + res); |
|
||||
return await res.status; |
|
||||
} |
|
||||
} catch (err) { |
|
||||
console.error("error: " + err); |
|
||||
var error = new Object(); |
|
||||
error.action = "401"; |
|
||||
error.error = err; |
|
||||
return await error; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
global.online = "ob"; |
|
||||
global.pipelines = []; |
|
||||
|
|
||||
function sendlog(reslog, pathfileval) { |
|
||||
var usertmp = global.pipelines.find((x) => x.pathlogfile == pathfileval); |
|
||||
//for (var key in usertmp.data){
|
|
||||
var user = usertmp.data[0].user25user; |
|
||||
// if(usertmp.data){
|
|
||||
console.log("-----------------------" + JSON.stringify(usertmp)); |
|
||||
io.in(user).emit("logdata", reslog); |
|
||||
// }
|
|
||||
//}
|
|
||||
} |
|
||||
|
|
||||
function onlogfile(path) { |
|
||||
console.log("File", path, "has been added"); |
|
||||
var pathfileval = pathmodule.basename(path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
var indexfind1 = global.pipelines.findIndex( |
|
||||
(x) => x.pathlogfile == pathfileval |
|
||||
); |
|
||||
console.log( |
|
||||
"file11111111111111111111111111111111 " + JSON.stringify(pathfileval) |
|
||||
); |
|
||||
if (indexfind1 === -1) { |
|
||||
(async () => { |
|
||||
console.log( |
|
||||
"file2222222222222222222222222222222222222 " + |
|
||||
JSON.stringify(pathfileval) |
|
||||
); |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata = await getpipelines(token, pathfile); |
|
||||
//resdata.data.pathlogfile = 'test'
|
|
||||
var resob = {}; |
|
||||
resob.pathlogfile = pathfileval; |
|
||||
var resobarray = []; |
|
||||
for (let i in resdata.data) { |
|
||||
var resob1 = {}; |
|
||||
resob1.data = resdata.data[i].res25swarmlabname; |
|
||||
resob1.user25user = resdata.data[i].res25user; |
|
||||
resob1.res25creator = resdata.data[i].res25creator; |
|
||||
resob1.res25fileforce = resdata.data[i].res25fileforce; |
|
||||
resobarray.push(resob1); |
|
||||
} |
|
||||
resob.data = resobarray; |
|
||||
var indexfind = global.pipelines.findIndex( |
|
||||
(x) => x.pathlogfile == pathfileval |
|
||||
); |
|
||||
indexfind === -1 |
|
||||
? global.pipelines.push(resob) |
|
||||
: console.log("object already exists " + pathfileval); |
|
||||
})(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// rest get
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
app.get( |
|
||||
"/get_log", |
|
||||
[check("token").isLength({ min: 40 })], |
|
||||
cors(corsOptions), |
|
||||
(req, res, next) => { |
|
||||
(async () => { |
|
||||
var RES = new Object(); |
|
||||
RES.token = req.query["token"]; |
|
||||
RES.start = req.query["start"]; |
|
||||
RES.end = req.query["end"]; |
|
||||
RES.swarmlabname = req.query["swarmlabname"]; |
|
||||
RES.ok = "ok"; |
|
||||
/* |
|
||||
* |
|
||||
* validate |
|
||||
* |
|
||||
*/ |
|
||||
|
|
||||
var isvalid = await checkToken(RES.token); |
|
||||
if (isvalid.action == "ok") { |
|
||||
console.log("Authserver ok " + RES.token); |
|
||||
RES.error = "ok"; |
|
||||
} else { |
|
||||
console.log("Authserver no " + RES.token); |
|
||||
RES.error = "no"; |
|
||||
} |
|
||||
if (RES.error == "ok") { |
|
||||
var resdata = await getuserpipelines( |
|
||||
RES.token, |
|
||||
isvalid.user, |
|
||||
RES.swarmlabname |
|
||||
); |
|
||||
var mongourl = |
|
||||
"mongodb://" + |
|
||||
CONFIG.mongo.user + |
|
||||
":" + |
|
||||
CONFIG.mongo.password + |
|
||||
"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats"; |
|
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
|
||||
useUnifiedTopology: true, |
|
||||
}; |
|
||||
MongoClient.connect(mongourl, OPTS, function (err, client) { |
|
||||
if (err) { |
|
||||
console.log(err); |
|
||||
} else { |
|
||||
const db = client.db("fluent"); |
|
||||
//usersession.SOCKET.user = isvalid.user
|
|
||||
console.log(JSON.stringify("mongo ----------------connected")); |
|
||||
console.log("-----test------- " + JSON.stringify(RES)); |
|
||||
if ( |
|
||||
typeof RES.start !== "undefined" && |
|
||||
typeof RES.end !== "undefined" |
|
||||
) { |
|
||||
if (DateTime.fromISO(RES.start).isValid) { |
|
||||
var datestart = DateTime.fromISO(RES.start); |
|
||||
var dateend = DateTime.fromISO(RES.end); |
|
||||
var search_term = { |
|
||||
$and: [ |
|
||||
{ |
|
||||
time: { |
|
||||
$gte: datestart, |
|
||||
}, |
|
||||
}, |
|
||||
{ |
|
||||
time: { |
|
||||
$lt: dateend, |
|
||||
}, |
|
||||
}, |
|
||||
], |
|
||||
}; |
|
||||
} else { |
|
||||
RES.ok = "no"; |
|
||||
} |
|
||||
} else if (typeof RES.end !== "undefined") { |
|
||||
var dateend = DateTime.fromISO(RES.end); |
|
||||
if (DateTime.fromISO(RES.end).isValid) { |
|
||||
var search_term = { |
|
||||
$and: [ |
|
||||
{ |
|
||||
time: { |
|
||||
$lt: dateend, |
|
||||
}, |
|
||||
}, |
|
||||
], |
|
||||
}; |
|
||||
} else { |
|
||||
RES.ok = "no"; |
|
||||
} |
|
||||
} else if (typeof RES.start !== "undefined") { |
|
||||
var datestart = DateTime.fromISO(RES.start); |
|
||||
if (DateTime.fromISO(RES.start).isValid) { |
|
||||
var search_term = { |
|
||||
$and: [ |
|
||||
{ |
|
||||
time: { |
|
||||
$gte: datestart, |
|
||||
}, |
|
||||
}, |
|
||||
], |
|
||||
}; |
|
||||
} else { |
|
||||
RES.ok = "no"; |
|
||||
} |
|
||||
} |
|
||||
if (RES.ok == "ok") { |
|
||||
//var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }'
|
|
||||
//var search_term = {"time" : {$lte : datenow}}
|
|
||||
var resdataarray = []; |
|
||||
var resraw = {}; |
|
||||
var reslab = ""; |
|
||||
var datestart1 = DateTime.fromISO(RES.start); |
|
||||
console.log("-----now1------- " + JSON.stringify(search_term)); |
|
||||
console.log("-----now2------- " + JSON.stringify(datestart1)); |
|
||||
console.log("-----now3------- " + JSON.stringify(datestart)); |
|
||||
|
|
||||
db.collection("logs") |
|
||||
.find(search_term) |
|
||||
.toArray() |
|
||||
//db.collection('logs').find({"time" : {$gt : datestart}}).toArray()
|
|
||||
.then((item) => { |
|
||||
console.log("item " + JSON.stringify(item)); |
|
||||
for (let i in item) { |
|
||||
reslab = item[i].tailed_path; |
|
||||
var segment_array = reslab.split("/"); |
|
||||
var last_segment = segment_array.pop(); |
|
||||
var fieldstmp = last_segment.split("-"); |
|
||||
var nameofswarmlab = fieldstmp[0]; |
|
||||
|
|
||||
var regexlog = new RegExp(nameofswarmlab); |
|
||||
for (let ii in resdata.data) { |
|
||||
if (regexlog.test(resdata.data[ii].res25swarmlabname)) { |
|
||||
resdataarray.push(item[i]); |
|
||||
RES.found = item[i]; |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
RES.error_msg = "ok"; |
|
||||
RES.data = resdataarray; |
|
||||
//RES.dataserver = resdataarray
|
|
||||
//RES.dataservertmp = resdata
|
|
||||
res.json(RES); |
|
||||
}) |
|
||||
.catch((err) => { |
|
||||
console.error(err); |
|
||||
RES.error_msg = err; |
|
||||
res.json(RES); |
|
||||
}); |
|
||||
} else { |
|
||||
// RES.ok
|
|
||||
RES.error_msg = "no date"; |
|
||||
res.json(RES); |
|
||||
} |
|
||||
} // error mongo connect
|
|
||||
}); // mongo connect
|
|
||||
} else { |
|
||||
// token error
|
|
||||
RES.data = "no"; |
|
||||
RES.error_msg = "token err"; |
|
||||
res.json(RES); |
|
||||
} |
|
||||
})(); |
|
||||
} |
|
||||
); |
|
||||
|
|
||||
app.get( |
|
||||
"/run", |
|
||||
[ |
|
||||
//check('access_token').isLength({ min: 40 }),
|
|
||||
//check('llo').isBase64()
|
|
||||
], |
|
||||
cors(corsOptions), |
|
||||
(req, res, next) => { |
|
||||
(async () => { |
|
||||
var mongourl = |
|
||||
"mongodb://" + |
|
||||
CONFIG.mongo.user + |
|
||||
":" + |
|
||||
CONFIG.mongo.password + |
|
||||
"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats"; |
|
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
|
||||
useUnifiedTopology: true, |
|
||||
}; |
|
||||
MongoClient.connect(mongourl, OPTS, function (err, client) { |
|
||||
if (err) { |
|
||||
console.log(err); |
|
||||
} else { |
|
||||
const db = client.db("fluent"); |
|
||||
//db.collection('log', onCollection);
|
|
||||
console.log(JSON.stringify("mongo connected")); |
|
||||
var stream = db |
|
||||
.collection("logs") |
|
||||
.find( |
|
||||
{}, |
|
||||
{ |
|
||||
tailable: true, |
|
||||
awaitdata: true, |
|
||||
/* other options */ |
|
||||
} |
|
||||
) |
|
||||
.stream(); |
|
||||
|
|
||||
stream.on("data", function (doc) { |
|
||||
console.log(JSON.stringify(doc)); |
|
||||
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
|
|
||||
}); |
|
||||
} |
|
||||
}); |
|
||||
var RES = new Object(); |
|
||||
RES.code = req.query["filter"]; |
|
||||
RES.token = req.query["filter"]; |
|
||||
var isvalid = await checkToken(RES.token); |
|
||||
if (isvalid.action == "ok") { |
|
||||
console.log("Authserver ok " + RES.token); |
|
||||
} else { |
|
||||
console.log("Authserver no " + RES.token); |
|
||||
} |
|
||||
RES.error = false; |
|
||||
RES.error_msg = "ok"; |
|
||||
res.json(RES); |
|
||||
})(); |
|
||||
} |
|
||||
); |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// rest post
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
app.post( |
|
||||
"/run", |
|
||||
[ |
|
||||
//check('access_token').isLength({ min: 40 }),
|
|
||||
//check('llo').isBase64()
|
|
||||
], |
|
||||
cors(corsOptions), |
|
||||
(req, res, next) => { |
|
||||
(async () => { |
|
||||
//console.log(JSON.stringify(req.headers));
|
|
||||
//console.log(JSON.stringify(req.body));
|
|
||||
//console.log("mongo "+JSON.stringify(req.body));
|
|
||||
//console.log("LOG "+JSON.stringify(req.body[0].message));
|
|
||||
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
|
|
||||
for (var i = 0; i < req.body.length; i++) { |
|
||||
//var getpath = await onlogfile(req.body[i].tailed_path)
|
|
||||
|
|
||||
var path = req.body[i].tailed_path; |
|
||||
|
|
||||
console.log("File", path, "has been added"); |
|
||||
var pathfileval = pathmodule.basename(path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
var indexfind1 = global.pipelines.findIndex( |
|
||||
(x) => x.pathlogfile == pathfileval |
|
||||
); |
|
||||
console.log( |
|
||||
"file11111111111111111111111111111111 " + JSON.stringify(pathfileval) |
|
||||
); |
|
||||
if (indexfind1 === -1) { |
|
||||
(async () => { |
|
||||
console.log( |
|
||||
"file2222222222222222222222222222222222222 " + |
|
||||
JSON.stringify(pathfileval) |
|
||||
); |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata = await getpipelines(token, pathfile); |
|
||||
//resdata.data.pathlogfile = 'test'
|
|
||||
var resob = {}; |
|
||||
resob.pathlogfile = pathfileval; |
|
||||
var resobarray = []; |
|
||||
for (let i in resdata.data) { |
|
||||
var resob1 = {}; |
|
||||
resob1.data = resdata.data[i].res25swarmlabname; |
|
||||
resob1.user25user = resdata.data[i].res25user; |
|
||||
resob1.res25creator = resdata.data[i].res25creator; |
|
||||
resob1.res25fileforce = resdata.data[i].res25fileforce; |
|
||||
resobarray.push(resob1); |
|
||||
} |
|
||||
resob.data = resobarray; |
|
||||
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
||||
var indexfind = global.pipelines.findIndex( |
|
||||
(x) => x.pathlogfile == pathfileval |
|
||||
); |
|
||||
|
|
||||
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
||||
indexfind === -1 |
|
||||
? global.pipelines.push(resob) |
|
||||
: console.log("object already exists " + pathfileval); |
|
||||
|
|
||||
//console.log('info', JSON.stringify(resdata));
|
|
||||
//console.log('info------------- ', JSON.stringify(global.pipelines));
|
|
||||
})(); |
|
||||
} |
|
||||
//
|
|
||||
var obj = req.body[i]; |
|
||||
|
|
||||
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
|
|
||||
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
|
|
||||
var now = new Date(); |
|
||||
|
|
||||
var reslog = new Object(); |
|
||||
reslog.log = obj; |
|
||||
|
|
||||
reslog.date = convertDateToUTC(now); |
|
||||
console.log(reslog); |
|
||||
var pathfileval = pathmodule.basename(reslog.log.tailed_path); |
|
||||
var indexfind = global.pipelines.findIndex( |
|
||||
(x) => x.pathlogfile == pathfileval |
|
||||
); |
|
||||
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
|
|
||||
indexfind === -1 |
|
||||
? console.log("object not found") |
|
||||
: sendlog(reslog, pathfileval); |
|
||||
console.log("IOT " + JSON.stringify(reslog.log.tailed_path)); |
|
||||
console.log("IOTindexfind " + JSON.stringify(indexfind)); |
|
||||
console.log("IOTuser " + JSON.stringify(global.pipelines)); |
|
||||
// io.in("iot").emit("message", reslog);
|
|
||||
// io.emit("logdata", reslog);
|
|
||||
} |
|
||||
})(); |
|
||||
|
|
||||
//io.in("iot").emit("message", RES);
|
|
||||
|
|
||||
console.error("socket POST from client"); |
|
||||
var RES = new Object(); |
|
||||
RES.error = false; |
|
||||
RES.error_msg = "ok"; |
|
||||
RES.msg = req.body[0].messsage; |
|
||||
|
|
||||
res.json(RES); |
|
||||
} |
|
||||
); |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// socket
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
//function getSHA256ofJSON(input){
|
|
||||
// return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
|
|
||||
//}
|
|
||||
|
|
||||
function sleep(ms) { |
|
||||
return new Promise((resolve) => setTimeout(resolve, ms)); |
|
||||
} |
|
||||
|
|
||||
function getSHA256ofJSON(data, inputEncoding, encoding) { |
|
||||
if (!data) { |
|
||||
return ""; |
|
||||
} |
|
||||
inputEncoding = inputEncoding || "utf-8"; |
|
||||
encoding = encoding || "hex"; |
|
||||
const hash = require("crypto").createHash("md5"); |
|
||||
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); |
|
||||
} |
|
||||
|
|
||||
//var getkey = function getkey(key) {
|
|
||||
async function getkey(key) { |
|
||||
return new Promise((resolve) => { |
|
||||
pubClient.get(key, function (err, reply) { |
|
||||
if (err) { |
|
||||
console.log("----------error------------"); |
|
||||
|
|
||||
resolve(null); |
|
||||
} else { |
|
||||
if (reply) { |
|
||||
console.log("---------fount----------"); |
|
||||
resolve(1); |
|
||||
} else { |
|
||||
console.log("----------not fount------------"); |
|
||||
resolve(2); |
|
||||
//return 2
|
|
||||
} |
|
||||
} |
|
||||
}); |
|
||||
}); |
|
||||
} |
|
||||
|
|
||||
var setkey = function setkv(key, value) { |
|
||||
return new Promise((resolve) => { |
|
||||
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
|
||||
pubClient.set(key, value, function (err, reply) { |
|
||||
if (err) { |
|
||||
resolve(null); |
|
||||
} else { |
|
||||
resolve(reply); |
|
||||
} |
|
||||
}); |
|
||||
}); |
|
||||
}; |
|
||||
|
|
||||
async function iosend(data, issend, io, pubClient, user1) { |
|
||||
var new1 = {}; |
|
||||
new1.tailed_path = data.tailed_path; |
|
||||
new1.message = data.message; |
|
||||
|
|
||||
var now = new Date(); |
|
||||
var reslog1 = {}; |
|
||||
//reslog1.data = resob1
|
|
||||
reslog1.log = new1; |
|
||||
reslog1.date = convertDateToUTC(now); |
|
||||
var user = user1; |
|
||||
|
|
||||
const randomTimeInMs = Math.random() * 2000; |
|
||||
await sleep(randomTimeInMs); |
|
||||
var getres = await getkey(issend); |
|
||||
|
|
||||
if (getres == "1") { |
|
||||
console.log(issend + " ---1 " + JSON.stringify(reslog1)); |
|
||||
//io.in(user).emit("logdata", reslog1);
|
|
||||
} else if (getres == "2") { |
|
||||
console.log(issend + " ---2 " + JSON.stringify(reslog1)); |
|
||||
setkey(issend, "1"); |
|
||||
//pubClient.set(issend, '1', function(err, res) {
|
|
||||
//});
|
|
||||
io.in(user).emit("logdata", reslog1); |
|
||||
//}
|
|
||||
} |
|
||||
} |
|
||||
|
|
||||
function onCollection(err, collection) { |
|
||||
let options = { |
|
||||
tailable: true, |
|
||||
awaitdata: true, |
|
||||
numberOfRetries: -1, |
|
||||
tailableRetryInterval: 500, |
|
||||
}; |
|
||||
var cursor = collection.find({}, options).stream(); |
|
||||
var itemsProcessed = 0; |
|
||||
|
|
||||
var reslog = new Object(); |
|
||||
var now = new Date(); |
|
||||
cursor.on("data", function (data) { |
|
||||
var issendob = new Object(); |
|
||||
issendob.id = data._id; |
|
||||
issendob.message = data.message; |
|
||||
issendob.tailed_path = data.tailed_path; |
|
||||
|
|
||||
var issend = getSHA256ofJSON(issendob); |
|
||||
|
|
||||
console.log("++++++++" + JSON.stringify(data)); |
|
||||
console.log("++++++++ob" + JSON.stringify(issendob)); |
|
||||
console.log("++++++++sha" + JSON.stringify(issend)); |
|
||||
|
|
||||
var pathfileval = pathmodule.basename(data.tailed_path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
|
|
||||
var indexupdate = "yes"; |
|
||||
var resob = {}; |
|
||||
pubClient.get(pathfileval, function (err, object) { |
|
||||
var objecttmp = JSON.parse(object); |
|
||||
if (object) { |
|
||||
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); |
|
||||
iosend(data, issend, io, pubClient, user1); |
|
||||
} else { |
|
||||
(async () => { |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata1 = await getpipelines(token, pathfile); |
|
||||
resob.pathlogfile = pathfileval; |
|
||||
var resob11 = {}; |
|
||||
var i1 = 0; |
|
||||
resob11.data = resdata1.data[i1].res25swarmlabname; |
|
||||
resob11.user25user = resdata1.data[i1].res25user.replace( |
|
||||
/[\n\t\r]/g, |
|
||||
"" |
|
||||
); |
|
||||
resob11.res25creator = resdata1.data[i1].res25creator; |
|
||||
resob11.res25fileforce = resdata1.data[i1].res25fileforce; |
|
||||
resob11.tailed_path = pathfileval; |
|
||||
var resob1string1 = JSON.stringify(resob11); |
|
||||
await pubClient.set( |
|
||||
pathfileval, |
|
||||
resob1string1, |
|
||||
function (err, res) {} |
|
||||
); |
|
||||
var user1 = resob11.user25user; |
|
||||
iosend(data, issend, io, pubClient, user1); |
|
||||
console.log(" ---no--- " + JSON.stringify(data)); |
|
||||
})(); //await inside yes
|
|
||||
} |
|
||||
}); |
|
||||
}); |
|
||||
|
|
||||
setInterval(function () { |
|
||||
console.log("itemsProcessed", itemsProcessed); |
|
||||
// this method is also exposed by the Server instance
|
|
||||
//console.log(adapter.rooms);
|
|
||||
}, 8000); |
|
||||
} |
|
||||
|
|
||||
/// edw exw error me timeout ///
|
|
||||
var mongourl = |
|
||||
"mongodb://" + |
|
||||
CONFIG.mongo.user + |
|
||||
":" + |
|
||||
CONFIG.mongo.password + |
|
||||
"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/swarmlabplaygroundstats?replicaSet=rs1&authSource=swarmlabplaygroundstats"; |
|
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
|
||||
useUnifiedTopology: true, |
|
||||
}; |
|
||||
var mongooptions = { |
|
||||
autoReconnect: true, |
|
||||
keepAlive: 1, |
|
||||
connectTimeoutMS: 30000, |
|
||||
socketTimeoutMS: 0, |
|
||||
}; |
|
||||
MongoClient.connect(mongourl, OPTS, function (err, client) { |
|
||||
if (err) { |
|
||||
console.log(err); |
|
||||
} else { |
|
||||
const db = client.db("swarmlabplaygroundstats"); |
|
||||
db.collection("logs", onCollection); |
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
http.listen(3000, () => console.error("listening on http://localhost:3000/")); |
|
||||
console.error("socket.io example"); |
|
@ -0,0 +1,15 @@ |
|||||
|
const mongoose = require("mongoose"); |
||||
|
const Schema = mongoose.Schema; |
||||
|
|
||||
|
const ItemSchema = new Schema({ |
||||
|
name: { |
||||
|
type: String, |
||||
|
required: true, |
||||
|
}, |
||||
|
date: { |
||||
|
type: Date, |
||||
|
default: Date.now, |
||||
|
}, |
||||
|
}); |
||||
|
|
||||
|
module.exports = Item = mongoose.model("item", ItemSchema); |
Loading…
Reference in new issue