lefteris
4 years ago
3 changed files with 951 additions and 951 deletions
@ -1,91 +0,0 @@ |
|||||
var path = require('path'); |
|
||||
var app = require('express')(); |
|
||||
var http = require('http').Server(app); |
|
||||
var io = require('socket.io')(http); |
|
||||
const cors = require("cors"); |
|
||||
|
|
||||
//const socketAuth = require('socketio-auth');
|
|
||||
|
|
||||
|
|
||||
|
|
||||
//const whitelist = ["http://localhost:8080"];
|
|
||||
const whitelist = ["*"]; |
|
||||
const corsOptions = { |
|
||||
credentials: true, |
|
||||
methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"], |
|
||||
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
|
||||
allowedHeaders: [ |
|
||||
"Content-Type", |
|
||||
"Authorization", |
|
||||
"X-Requested-With", |
|
||||
"device-remember-token", |
|
||||
"Access-Control-Allow-Origin", |
|
||||
"Access-Control-Allow-Headers", |
|
||||
"Origin", |
|
||||
"Accept", |
|
||||
], |
|
||||
origin: function (origin, callback) { |
|
||||
if (whitelist.indexOf(origin) !== -1) { |
|
||||
callback(null, true); |
|
||||
} else { |
|
||||
callback(null, true); |
|
||||
//callback(new Error('Not allowed by CORS'))
|
|
||||
} |
|
||||
}, |
|
||||
}; |
|
||||
|
|
||||
app.get( |
|
||||
"/run", |
|
||||
[ |
|
||||
//check('access_token').isLength({ min: 40 }),
|
|
||||
//check('llo').isBase64()
|
|
||||
], |
|
||||
cors(corsOptions), |
|
||||
(req, res, next) => { |
|
||||
var RES = new Object(); |
|
||||
RES.code = req.query["code"]; |
|
||||
console.error("socket GET from client " + RES.code); |
|
||||
|
|
||||
RES.error = false; |
|
||||
RES.error_msg = "ok"; |
|
||||
|
|
||||
io.emit("iotdata", RES); |
|
||||
io.in("iot").emit("message", RES); |
|
||||
res.json(RES); |
|
||||
} |
|
||||
); |
|
||||
|
|
||||
|
|
||||
app.get( |
|
||||
"/test", |
|
||||
[ |
|
||||
//check('access_token').isLength({ min: 40 }),
|
|
||||
//check('llo').isBase64()
|
|
||||
], |
|
||||
cors(corsOptions), |
|
||||
(req, res) => { |
|
||||
var data = req.query["input"]; |
|
||||
var RES = new Object(); |
|
||||
console.error(`Client called GET from axios`); |
|
||||
res.json(data); |
|
||||
} |
|
||||
); |
|
||||
|
|
||||
io.on("log", (data) => { |
|
||||
console.log(JSON.stringify("d " + data)); |
|
||||
console.error(JSON.stringify("c " + data)); |
|
||||
}); |
|
||||
io.on("connection", (s) => { |
|
||||
console.error(`\nSomeone connected to port 3000`); |
|
||||
var id = s.id; |
|
||||
|
|
||||
s.on("log", (data) => { |
|
||||
console.log(JSON.stringify(data)); |
|
||||
console.error(JSON.stringify(data)); |
|
||||
}); |
|
||||
|
|
||||
}); |
|
||||
|
|
||||
http.listen(3000, () => console.error("listening on http://0.0.0.0:3000/")); |
|
||||
console.error("Run demo project"); |
|
||||
console.log("Hello World!"); |
|
@ -1,889 +1,91 @@ |
|||||
"use strict" |
var path = require('path'); |
||||
|
|
||||
var pathmodule = require('path'); |
|
||||
var app = require('express')(); |
var app = require('express')(); |
||||
var http = require('http').Server(app); |
var http = require('http').Server(app); |
||||
var https = require('https'); |
var io = require('socket.io')(http); |
||||
var CONFIG = require( pathmodule.resolve( __dirname, "runconfig.js" ) ); |
const cors = require("cors"); |
||||
const io = require("socket.io")(http, { |
|
||||
// pingTimeout: 30000,
|
|
||||
// allowUpgrades: false,
|
|
||||
// serveClient: false,
|
|
||||
// pingInterval: 10000,
|
|
||||
// //transports: [ 'websocket', 'polling' ],
|
|
||||
// transports: [ 'polling', 'websocket' ],
|
|
||||
cors: { |
|
||||
origin: "http://localhost:8080", |
|
||||
methods: ["GET", "POST"], |
|
||||
allowedHeaders: ["my-custom-header"], |
|
||||
credentials: true |
|
||||
}, |
|
||||
cookie: { |
|
||||
name: "test", |
|
||||
httpOnly: false, |
|
||||
path: "/custom" |
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
/* |
|
||||
const Redis = require("ioredis"); |
|
||||
const redistest = new Redis({ |
|
||||
host: 'redisserver', |
|
||||
port: 6379, |
|
||||
}); |
|
||||
const pubtest = new Redis({ |
|
||||
host: 'redisserver', |
|
||||
port: 6379, |
|
||||
}); |
|
||||
*/ |
|
||||
|
|
||||
|
|
||||
//import { createAdapter } from 'socket.io-redis';
|
|
||||
const createAdapter = require('socket.io-redis'); |
|
||||
//const RedisClient = require("redis");
|
|
||||
const Redis = require("ioredis"); |
|
||||
//const pubClient = RedisClient.createClient({
|
|
||||
|
|
||||
const pubClient = new Redis({ |
|
||||
host: 'redisserver', |
|
||||
port: 6379, |
|
||||
}); |
|
||||
|
|
||||
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
|
|
||||
const subClient = pubClient.duplicate(); |
|
||||
|
|
||||
io.adapter(createAdapter({ pubClient, subClient })); |
|
||||
|
|
||||
|
|
||||
pubClient.on("connect", function() { |
|
||||
console.log("You are now connected"); |
|
||||
}); |
|
||||
|
|
||||
const MongoClient = require('mongodb').MongoClient; |
|
||||
const { DateTime } = require("luxon"); |
|
||||
|
|
||||
|
//const socketAuth = require('socketio-auth');
|
||||
|
|
||||
var async = require("async"); |
|
||||
const { check, validationResult } = require('express-validator'); |
|
||||
const urlExistSync = require("url-exist-sync"); |
|
||||
|
|
||||
var express = require('express'); |
|
||||
app.use(express.json()); |
|
||||
|
|
||||
const axios = require('axios'); |
//const whitelist = ["http://localhost:8080"];
|
||||
axios.defaults.timeout = 30000 |
const whitelist = ["*"]; |
||||
|
|
||||
const helmet = require('helmet'); |
|
||||
app.use(helmet()); |
|
||||
|
|
||||
const cors = require('cors') |
|
||||
const whitelist = [ |
|
||||
'http://localhost:8080', |
|
||||
'http://localhost:3080', |
|
||||
'http://localhost:3081', |
|
||||
'http://localhost:3082' |
|
||||
] |
|
||||
const corsOptions = { |
const corsOptions = { |
||||
credentials: true, |
credentials: true, |
||||
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'], |
methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"], |
||||
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
||||
allowedHeaders: [ |
allowedHeaders: [ |
||||
'Content-Type', |
"Content-Type", |
||||
'Authorization', |
"Authorization", |
||||
'X-Requested-With', |
"X-Requested-With", |
||||
'device-remember-token', |
"device-remember-token", |
||||
'Access-Control-Allow-Origin', |
"Access-Control-Allow-Origin", |
||||
'Access-Control-Allow-Headers', |
"Access-Control-Allow-Headers", |
||||
'Origin', |
"Origin", |
||||
'Accept' |
"Accept", |
||||
], |
], |
||||
origin: function(origin, callback) { |
origin: function (origin, callback) { |
||||
if (whitelist.indexOf(origin) !== -1) { |
if (whitelist.indexOf(origin) !== -1) { |
||||
callback(null, true) |
callback(null, true); |
||||
} else { |
} else { |
||||
callback(null, true) |
callback(null, true); |
||||
//callback(new Error('Not allowed by CORS'))
|
//callback(new Error('Not allowed by CORS'))
|
||||
} |
} |
||||
} |
}, |
||||
} |
}; |
||||
|
|
||||
|
|
||||
|
|
||||
// ***************************************************
|
|
||||
// checktoken
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function checkToken(token) { |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: 'https://api.swarmlab.io', |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
'Accept': 'application/json', |
|
||||
'Content-Type': 'multipart/form-data', |
|
||||
'Authorization': 'Bearer '+token |
|
||||
} |
|
||||
}) |
|
||||
try { |
|
||||
var pipelines = { |
|
||||
"source":'ssologin' |
|
||||
} |
|
||||
var params = { |
|
||||
pipeline: pipelines |
|
||||
} |
|
||||
|
|
||||
var options = { |
|
||||
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
|
||||
}; |
|
||||
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
const res = await instance.post('/istokenvalidsso',params,options); |
|
||||
if(res.status == 200){ |
|
||||
//console.log("check " +JSON.stringify(res.data))
|
|
||||
return res.data |
|
||||
}else{ |
|
||||
console.log("noerror: " + res) |
|
||||
return res.status |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
catch (err) { |
|
||||
console.error("error: "+err); |
|
||||
var error = new Object(); |
|
||||
error.action = '401' |
|
||||
return error |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
|
|
||||
function convertDateToUTC(date) { |
|
||||
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds()); |
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// get pipelines
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function getpipelines(token,pipelinename) { |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: 'https://api.swarmlab.io', |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
'Accept': 'application/json', |
|
||||
'Content-Type': 'multipart/form-data', |
|
||||
'Authorization': 'Bearer '+token |
|
||||
} |
|
||||
}) |
|
||||
/* |
|
||||
var params = { |
|
||||
playbook: value |
|
||||
} |
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
|
||||
}; |
|
||||
|
|
||||
const playbook = await api.GET('playbookCode',options); |
|
||||
return playbook |
|
||||
*/ |
|
||||
try { |
|
||||
|
|
||||
var pipelines = { |
|
||||
"querytokenFilter":CONFIG.api.token, |
|
||||
"filter":pipelinename |
|
||||
} |
|
||||
//var params = {
|
|
||||
// pipeline: pipelines
|
|
||||
// }
|
|
||||
var params = { |
|
||||
querytokenFilter:CONFIG.api.token, |
|
||||
filter:pipelinename |
|
||||
} |
|
||||
|
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
|
||||
}; |
|
||||
|
|
||||
//https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
//const res = await instance.get('/getplaygrounds',params,options);
|
|
||||
const res = await instance.get('/getplaygrounds',options); |
|
||||
if(res.status == 200){ |
|
||||
return res.data |
|
||||
}else{ |
|
||||
console.log("noerror: " + res) |
|
||||
return await res.status |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
catch (err) { |
|
||||
console.error("error: "+err); |
|
||||
var error = new Object(); |
|
||||
error.action = '401' |
|
||||
return await error |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// get user pipelines
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
async function getuserpipelines(token,user,swarmlabname) { |
|
||||
var pipelinename = user |
|
||||
const agent = new https.Agent({ |
|
||||
rejectUnauthorized: false, |
|
||||
}); |
|
||||
const instance = axios.create({ |
|
||||
baseURL: 'https://api.swarmlab.io', |
|
||||
withCredentials: true, |
|
||||
rejectUnauthorized: false, |
|
||||
crossdomain: true, |
|
||||
httpsAgent: agent, |
|
||||
headers: { |
|
||||
'Accept': 'application/json', |
|
||||
'Content-Type': 'multipart/form-data', |
|
||||
'Authorization': 'Bearer '+token |
|
||||
} |
|
||||
}) |
|
||||
try { |
|
||||
|
|
||||
var pipelines = { |
|
||||
"querytokenFilter":CONFIG.api.token, |
|
||||
"filter":pipelinename, |
|
||||
swarmlabname:swarmlabname |
|
||||
} |
|
||||
//var params = {
|
|
||||
// pipeline: pipelines
|
|
||||
// }
|
|
||||
var params = { |
|
||||
querytokenFilter:CONFIG.api.token, |
|
||||
filter:pipelinename, |
|
||||
swarmlabname:swarmlabname |
|
||||
} |
|
||||
|
|
||||
var options = { |
|
||||
params: params, |
|
||||
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
|
||||
}; |
|
||||
|
|
||||
instance.defaults.timeout = 30000; |
|
||||
const res = await instance.get('/getuserplaygrounds',options); |
|
||||
if(res.status == 200){ |
|
||||
return res.data |
|
||||
}else{ |
|
||||
console.log("noerror: " + res) |
|
||||
return await res.status |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
catch (err) { |
|
||||
console.error("error: "+err); |
|
||||
var error = new Object(); |
|
||||
error.action = '401' |
|
||||
error.error = err |
|
||||
return await error |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
|
|
||||
global.online='ob'; |
|
||||
global.pipelines=[]; |
|
||||
|
|
||||
function sendlog(reslog,pathfileval){ |
|
||||
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); |
|
||||
//for (var key in usertmp.data){
|
|
||||
var user = usertmp.data[0].user25user; |
|
||||
// if(usertmp.data){
|
|
||||
console.log('-----------------------' + JSON.stringify(usertmp)); |
|
||||
io.in(user).emit("logdata", reslog); |
|
||||
// }
|
|
||||
//}
|
|
||||
} |
|
||||
function onlogfile(path){ |
|
||||
|
|
||||
console.log('File', path, 'has been added'); |
|
||||
var pathfileval = pathmodule.basename(path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
||||
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) |
|
||||
if (indexfind1 === -1 ){ |
|
||||
(async() => { |
|
||||
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata = await getpipelines(token,pathfile) |
|
||||
//resdata.data.pathlogfile = 'test'
|
|
||||
var resob = {} |
|
||||
resob.pathlogfile = pathfileval |
|
||||
var resobarray = [] |
|
||||
for (let i in resdata.data) { |
|
||||
var resob1 = {} |
|
||||
resob1.data = resdata.data[i].res25swarmlabname |
|
||||
resob1.user25user = resdata.data[i].res25user |
|
||||
resob1.res25creator = resdata.data[i].res25creator |
|
||||
resob1.res25fileforce = resdata.data[i].res25fileforce |
|
||||
resobarray.push(resob1) |
|
||||
} |
|
||||
resob.data = resobarray |
|
||||
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
||||
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
|
||||
})() |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// rest get
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
app.get('/get_log', [ |
|
||||
check('token').isLength({ min: 40 }) |
|
||||
], |
|
||||
cors(corsOptions), (req, res, next) => { |
|
||||
|
|
||||
(async() => { |
|
||||
var RES = new Object(); |
|
||||
RES.token = req.query["token"] |
|
||||
RES.start = req.query["start"] |
|
||||
RES.end = req.query["end"] |
|
||||
RES.swarmlabname = req.query["swarmlabname"] |
|
||||
RES.ok = 'ok' |
|
||||
/* |
|
||||
* |
|
||||
* validate |
|
||||
* |
|
||||
*/ |
|
||||
|
|
||||
|
|
||||
|
|
||||
var isvalid = await checkToken(RES.token); |
|
||||
if(isvalid.action == 'ok'){ |
|
||||
console.log("Authserver ok " + RES.token); |
|
||||
RES.error = 'ok' |
|
||||
}else{ |
|
||||
console.log("Authserver no " + RES.token); |
|
||||
RES.error = 'no' |
|
||||
} |
|
||||
if(RES.error == 'ok'){ |
|
||||
|
|
||||
var resdata = await getuserpipelines(RES.token,isvalid.user,RES.swarmlabname) |
|
||||
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
|
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
|
||||
useUnifiedTopology: true |
|
||||
}; |
|
||||
MongoClient.connect(mongourl, OPTS, function(err, client){ |
|
||||
if(err){ |
|
||||
console.log(err); |
|
||||
} else { |
|
||||
const db = client.db('fluent'); |
|
||||
//usersession.SOCKET.user = isvalid.user
|
|
||||
console.log(JSON.stringify('mongo ----------------connected')) |
|
||||
console.log('-----test------- '+JSON.stringify(RES)) |
|
||||
if ((typeof RES.start !== "undefined") && (typeof RES.end !== "undefined")) { |
|
||||
if(DateTime.fromISO(RES.start).isValid){ |
|
||||
var datestart = DateTime.fromISO(RES.start) |
|
||||
var dateend = DateTime.fromISO(RES.end) |
|
||||
var search_term = { |
|
||||
"$and": [ |
|
||||
{ |
|
||||
"time": { |
|
||||
$gte: datestart |
|
||||
} |
|
||||
}, |
|
||||
{ |
|
||||
"time": { |
|
||||
$lt: dateend |
|
||||
} |
|
||||
}, |
|
||||
] |
|
||||
} |
|
||||
}else{ |
|
||||
RES.ok = 'no' |
|
||||
} |
|
||||
}else if(typeof RES.end !== "undefined"){ |
|
||||
var dateend = DateTime.fromISO(RES.end) |
|
||||
if(DateTime.fromISO(RES.end).isValid){ |
|
||||
var search_term = { |
|
||||
"$and": [ |
|
||||
{ |
|
||||
"time": { |
|
||||
$lt: dateend |
|
||||
} |
|
||||
} |
|
||||
] |
|
||||
} |
|
||||
}else{ |
|
||||
RES.ok = 'no' |
|
||||
} |
|
||||
}else if(typeof RES.start !== "undefined"){ |
|
||||
var datestart = DateTime.fromISO(RES.start) |
|
||||
if(DateTime.fromISO(RES.start).isValid){ |
|
||||
var search_term = { |
|
||||
"$and": [ |
|
||||
{ |
|
||||
"time": { |
|
||||
$gte: datestart |
|
||||
} |
|
||||
} |
|
||||
] |
|
||||
} |
|
||||
}else{ |
|
||||
RES.ok = 'no' |
|
||||
} |
|
||||
} |
|
||||
if(RES.ok == 'ok'){ |
|
||||
//var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }'
|
|
||||
//var search_term = {"time" : {$lte : datenow}}
|
|
||||
var resdataarray = [] |
|
||||
var resraw = {} |
|
||||
var reslab = '' |
|
||||
var datestart1 = DateTime.fromISO(RES.start) |
|
||||
console.log('-----now1------- '+JSON.stringify(search_term)) |
|
||||
console.log('-----now2------- '+JSON.stringify(datestart1)) |
|
||||
console.log('-----now3------- '+JSON.stringify(datestart)) |
|
||||
|
|
||||
db.collection('logs').find(search_term).toArray() |
|
||||
//db.collection('logs').find({"time" : {$gt : datestart}}).toArray()
|
|
||||
.then(item => { |
|
||||
console.log('item '+JSON.stringify(item)) |
|
||||
for (let i in item) { |
|
||||
reslab = item[i].tailed_path |
|
||||
var segment_array = reslab.split( '/' ); |
|
||||
var last_segment = segment_array.pop(); |
|
||||
var fieldstmp = last_segment.split('-'); |
|
||||
var nameofswarmlab = fieldstmp[0]; |
|
||||
|
|
||||
var regexlog = new RegExp(nameofswarmlab); |
|
||||
for (let ii in resdata.data) { |
|
||||
if( regexlog.test(resdata.data[ii].res25swarmlabname) ){ |
|
||||
resdataarray.push(item[i]) |
|
||||
RES.found = item[i] |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
RES.error_msg = "ok" |
|
||||
RES.data = resdataarray |
|
||||
//RES.dataserver = resdataarray
|
|
||||
//RES.dataservertmp = resdata
|
|
||||
res.json(RES) |
|
||||
}) |
|
||||
.catch(err => { |
|
||||
console.error(err) |
|
||||
RES.error_msg = err |
|
||||
res.json(RES) |
|
||||
}) |
|
||||
} else{ // RES.ok
|
|
||||
RES.error_msg = 'no date' |
|
||||
res.json(RES) |
|
||||
} |
|
||||
} // error mongo connect
|
|
||||
}); // mongo connect
|
|
||||
}else{ // token error
|
|
||||
RES.data = 'no' |
|
||||
RES.error_msg = "token err" |
|
||||
res.json(RES) |
|
||||
} |
|
||||
})() |
|
||||
|
|
||||
}); |
|
||||
|
|
||||
app.get('/run', [ |
app.get( |
||||
|
"/run", |
||||
|
[ |
||||
//check('access_token').isLength({ min: 40 }),
|
//check('access_token').isLength({ min: 40 }),
|
||||
//check('llo').isBase64()
|
//check('llo').isBase64()
|
||||
], |
], |
||||
cors(corsOptions), (req, res, next) => { |
cors(corsOptions), |
||||
|
(req, res, next) => { |
||||
(async() => { |
var RES = new Object(); |
||||
|
RES.code = req.query["code"]; |
||||
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
console.error("socket GET from client " + RES.code); |
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
RES.error = false; |
||||
useUnifiedTopology: true |
RES.error_msg = "ok"; |
||||
}; |
|
||||
MongoClient.connect(mongourl, OPTS, function(err, client){ |
io.emit("iotdata", RES); |
||||
if(err){ |
io.in("iot").emit("message", RES); |
||||
console.log(err); |
res.json(RES); |
||||
} else { |
} |
||||
const db = client.db('fluent'); |
); |
||||
//db.collection('log', onCollection);
|
|
||||
console.log(JSON.stringify('mongo connected')) |
|
||||
var stream = db.collection('logs').find({}, { |
|
||||
tailable: true, |
|
||||
awaitdata: true |
|
||||
/* other options */ |
|
||||
}).stream(); |
|
||||
|
|
||||
stream.on('data', function (doc) { |
|
||||
console.log(JSON.stringify(doc)) |
|
||||
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
|
|
||||
}); |
|
||||
} |
|
||||
}); |
|
||||
var RES = new Object(); |
|
||||
RES.code = req.query["filter"] |
|
||||
RES.token = req.query["filter"] |
|
||||
var isvalid = await checkToken(RES.token); |
|
||||
if(isvalid.action == 'ok'){ |
|
||||
console.log("Authserver ok " + RES.token); |
|
||||
}else{ |
|
||||
console.log("Authserver no " + RES.token); |
|
||||
} |
|
||||
RES.error = false |
|
||||
RES.error_msg = "ok" |
|
||||
res.json(RES) |
|
||||
})() |
|
||||
|
|
||||
}); |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// rest post
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
app.post('/run', [ |
app.get( |
||||
|
"/test", |
||||
|
[ |
||||
//check('access_token').isLength({ min: 40 }),
|
//check('access_token').isLength({ min: 40 }),
|
||||
//check('llo').isBase64()
|
//check('llo').isBase64()
|
||||
], |
], |
||||
cors(corsOptions), (req, res, next) => { |
cors(corsOptions), |
||||
|
(req, res) => { |
||||
|
var data = req.query["input"]; |
||||
(async() => { |
var RES = new Object(); |
||||
|
console.error(`Client called GET from axios`); |
||||
//console.log(JSON.stringify(req.headers));
|
res.json(data); |
||||
//console.log(JSON.stringify(req.body));
|
|
||||
//console.log("mongo "+JSON.stringify(req.body));
|
|
||||
//console.log("LOG "+JSON.stringify(req.body[0].message));
|
|
||||
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
|
|
||||
for (var i = 0; i < req.body.length; i++){ |
|
||||
//var getpath = await onlogfile(req.body[i].tailed_path)
|
|
||||
|
|
||||
var path = req.body[i].tailed_path |
|
||||
|
|
||||
console.log('File', path, 'has been added'); |
|
||||
var pathfileval = pathmodule.basename(path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
||||
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) |
|
||||
if (indexfind1 === -1 ){ |
|
||||
(async() => { |
|
||||
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata = await getpipelines(token,pathfile) |
|
||||
//resdata.data.pathlogfile = 'test'
|
|
||||
var resob = {} |
|
||||
resob.pathlogfile = pathfileval |
|
||||
var resobarray = [] |
|
||||
for (let i in resdata.data) { |
|
||||
var resob1 = {} |
|
||||
resob1.data = resdata.data[i].res25swarmlabname |
|
||||
resob1.user25user = resdata.data[i].res25user |
|
||||
resob1.res25creator = resdata.data[i].res25creator |
|
||||
resob1.res25fileforce = resdata.data[i].res25fileforce |
|
||||
resobarray.push(resob1) |
|
||||
} |
|
||||
resob.data = resobarray |
|
||||
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
||||
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
||||
|
|
||||
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
||||
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
|
||||
|
|
||||
//console.log('info', JSON.stringify(resdata));
|
|
||||
//console.log('info------------- ', JSON.stringify(global.pipelines));
|
|
||||
})() |
|
||||
} |
|
||||
//
|
|
||||
var obj = req.body[i]; |
|
||||
|
|
||||
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
|
|
||||
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
|
|
||||
var now = new Date(); |
|
||||
|
|
||||
var reslog = new Object(); |
|
||||
reslog.log = obj |
|
||||
|
|
||||
reslog.date = convertDateToUTC(now) |
|
||||
console.log(reslog); |
|
||||
var pathfileval = pathmodule.basename(reslog.log.tailed_path); |
|
||||
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
|
||||
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
|
|
||||
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval) |
|
||||
console.log("IOT "+JSON.stringify(reslog.log.tailed_path)); |
|
||||
console.log("IOTindexfind "+JSON.stringify(indexfind)); |
|
||||
console.log("IOTuser "+JSON.stringify(global.pipelines)); |
|
||||
// io.in("iot").emit("message", reslog);
|
|
||||
// io.emit("logdata", reslog);
|
|
||||
} |
|
||||
})() |
|
||||
|
|
||||
//io.in("iot").emit("message", RES);
|
|
||||
|
|
||||
console.error('socket POST from client'); |
|
||||
var RES = new Object(); |
|
||||
RES.error = false |
|
||||
RES.error_msg = "ok" |
|
||||
RES.msg = req.body[0].messsage |
|
||||
|
|
||||
res.json(RES) |
|
||||
}); |
|
||||
|
|
||||
// ***************************************************
|
|
||||
// socket
|
|
||||
// ***************************************************
|
|
||||
|
|
||||
//function getSHA256ofJSON(input){
|
|
||||
// return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
|
|
||||
//}
|
|
||||
|
|
||||
function sleep(ms) { |
|
||||
return new Promise(resolve => setTimeout(resolve, ms)); |
|
||||
} |
|
||||
|
|
||||
function getSHA256ofJSON(data, inputEncoding, encoding){ |
|
||||
if (!data) { |
|
||||
return ''; |
|
||||
} |
} |
||||
inputEncoding = inputEncoding || 'utf-8'; |
); |
||||
encoding = encoding || 'hex'; |
|
||||
const hash = require("crypto").createHash('md5'); |
|
||||
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); |
|
||||
} |
|
||||
|
|
||||
//var getkey = function getkey(key) {
|
|
||||
async function getkey(key) { |
|
||||
return new Promise((resolve) => { |
|
||||
|
|
||||
|
|
||||
pubClient.get(key, function (err, reply) { |
|
||||
if(err){ |
|
||||
|
|
||||
console.log('----------error------------') |
|
||||
|
|
||||
resolve(null) |
|
||||
} |
|
||||
else { |
|
||||
if(reply){ |
|
||||
console.log('---------fount----------') |
|
||||
resolve(1) |
|
||||
}else{ |
|
||||
console.log('----------not fount------------') |
|
||||
resolve(2) |
|
||||
//return 2
|
|
||||
} |
|
||||
} |
|
||||
}) |
|
||||
}) |
|
||||
} |
|
||||
|
|
||||
var setkey = function setkv(key,value){ |
|
||||
return new Promise((resolve)=>{ |
|
||||
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
|
||||
pubClient.set(key,value, function(err,reply){ |
|
||||
if(err){ |
|
||||
resolve(null) |
|
||||
} |
|
||||
else{ |
|
||||
resolve(reply) |
|
||||
} |
|
||||
}) |
|
||||
}) |
|
||||
} |
|
||||
|
|
||||
async function iosend(data,issend,io,pubClient,user1){ |
|
||||
var new1 = {} |
|
||||
new1.tailed_path = data.tailed_path |
|
||||
new1.message = data.message |
|
||||
|
|
||||
var now = new Date(); |
|
||||
var reslog1 = {} |
|
||||
//reslog1.data = resob1
|
|
||||
reslog1.log = new1 |
|
||||
reslog1.date = convertDateToUTC(now) |
|
||||
var user = user1 |
|
||||
|
|
||||
const randomTimeInMs = Math.random() * (2000); |
|
||||
await sleep(randomTimeInMs); |
|
||||
var getres = await getkey(issend); |
|
||||
|
|
||||
if(getres == '1'){ |
|
||||
console.log(issend + ' ---1 '+ JSON.stringify(reslog1)) |
|
||||
//io.in(user).emit("logdata", reslog1);
|
|
||||
}else if(getres == '2'){ |
|
||||
console.log(issend + ' ---2 '+ JSON.stringify(reslog1)) |
|
||||
setkey(issend,'1') |
|
||||
//pubClient.set(issend, '1', function(err, res) {
|
|
||||
//});
|
|
||||
io.in(user).emit("logdata", reslog1); |
|
||||
//}
|
|
||||
|
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
function onCollection(err, collection) { |
|
||||
let options = { tailable: true, |
|
||||
awaitdata: true, |
|
||||
numberOfRetries: -1, |
|
||||
tailableRetryInterval: 500 |
|
||||
}; |
|
||||
var cursor = collection.find({},options).stream(); |
|
||||
var itemsProcessed = 0; |
|
||||
|
|
||||
var reslog = new Object(); |
|
||||
var now = new Date(); |
|
||||
cursor.on('data', function (data) { |
|
||||
var issendob = new Object(); |
|
||||
issendob.id = data._id |
|
||||
issendob.message = data.message |
|
||||
issendob.tailed_path = data.tailed_path |
|
||||
|
|
||||
var issend = getSHA256ofJSON(issendob) |
|
||||
|
|
||||
console.log('++++++++' + JSON.stringify(data)); |
|
||||
console.log('++++++++ob' + JSON.stringify(issendob)); |
|
||||
console.log('++++++++sha' + JSON.stringify(issend)); |
|
||||
|
|
||||
var pathfileval = pathmodule.basename(data.tailed_path); |
|
||||
var arrfile = pathfileval.toString().split("-"); |
|
||||
var pathfile = arrfile[0]; |
|
||||
|
|
||||
var indexupdate = "yes" |
|
||||
var resob = {}; |
|
||||
pubClient.get(pathfileval, function(err, object) { |
|
||||
var objecttmp = JSON.parse(object); |
|
||||
if(object){ |
|
||||
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g,"") |
|
||||
iosend(data,issend,io,pubClient,user1) |
|
||||
}else{ |
|
||||
(async() => { |
|
||||
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
||||
var resdata1 = await getpipelines(token,pathfile) |
|
||||
resob.pathlogfile = pathfileval |
|
||||
var resob11 = {} |
|
||||
var i1 = 0 |
|
||||
resob11.data = resdata1.data[i1].res25swarmlabname |
|
||||
resob11.user25user = resdata1.data[i1].res25user.replace(/[\n\t\r]/g,"") |
|
||||
resob11.res25creator = resdata1.data[i1].res25creator |
|
||||
resob11.res25fileforce = resdata1.data[i1].res25fileforce |
|
||||
resob11.tailed_path = pathfileval |
|
||||
var resob1string1 = JSON.stringify(resob11); |
|
||||
await pubClient.set(pathfileval, resob1string1, function(err, res) { |
|
||||
}); |
|
||||
var user1 = resob11.user25user |
|
||||
iosend(data,issend,io,pubClient,user1) |
|
||||
console.log(' ---no--- '+ JSON.stringify(data)) |
|
||||
})() //await inside yes
|
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
}); |
|
||||
|
|
||||
|
|
||||
setInterval(function () { |
|
||||
console.log('itemsProcessed', itemsProcessed); |
|
||||
// this method is also exposed by the Server instance
|
|
||||
//console.log(adapter.rooms);
|
|
||||
}, 8000); |
|
||||
} |
|
||||
|
|
||||
|
|
||||
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
|
||||
const OPTS = { |
|
||||
useNewUrlParser: true, |
|
||||
useUnifiedTopology: true |
|
||||
}; |
|
||||
var mongooptions = { |
|
||||
autoReconnect: true, |
|
||||
keepAlive: 1, |
|
||||
connectTimeoutMS: 30000, |
|
||||
socketTimeoutMS: 0 |
|
||||
} |
|
||||
MongoClient.connect(mongourl, OPTS, function(err, client){ |
|
||||
if(err){ |
|
||||
console.log(err); |
|
||||
} else { |
|
||||
const db = client.db('fluent'); |
|
||||
db.collection('logs', onCollection); |
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
io.on('connection', s => { |
|
||||
console.error('socket connection'); |
|
||||
|
|
||||
//s.set('transports', ['websocket']);
|
|
||||
//s.set('pingTimeout', 30000);
|
|
||||
//s.set('allowUpgrades', false);
|
|
||||
//s.set('serveClient', false);
|
|
||||
//s.set('pingInterval', 10000);
|
|
||||
// ------------------------------
|
|
||||
// --- set
|
|
||||
// ------------------------------
|
|
||||
var usersession = new Object(); |
|
||||
usersession.SOCKET = {}; |
|
||||
usersession.SOCKET.error = {}; |
|
||||
console.error('socket ...'); |
|
||||
s.auth = false; |
|
||||
|
|
||||
// ------------------------------
|
|
||||
// --- authenticate
|
|
||||
// ------------------------------
|
|
||||
s.on('authenticate', function(data){ |
|
||||
const token = data |
|
||||
console.log('invalid 1 ' + token); |
|
||||
(async() => { |
|
||||
var isvalid = await checkToken(token); |
|
||||
if(isvalid.action == 'ok'){ |
|
||||
console.log("Authserver ok ", s.id + ' - ' + token); |
|
||||
// pubClient.set(session, resob1string, function(err, res) {
|
|
||||
// });
|
|
||||
usersession.SOCKET.user = isvalid.user |
|
||||
usersession.SOCKET.scope = isvalid.scope // space delimeter
|
|
||||
usersession.SOCKET.token = isvalid.token |
|
||||
s.auth = true; |
|
||||
}else{ |
|
||||
console.log("Authserver no ", s.id + ' - ' + token); |
|
||||
s.auth = false; |
|
||||
} |
|
||||
})() |
|
||||
}); |
|
||||
|
|
||||
setTimeout(function(){ |
|
||||
if (!s.auth) { |
|
||||
console.log("Disconnecting timeout socket ", s.id); |
|
||||
//s.disconnect('unauthorized');
|
|
||||
}else{ |
|
||||
var room = usersession.SOCKET.user |
|
||||
//s.on("subscribe", function (room) {
|
|
||||
s.join(room); |
|
||||
console.log("joining rooom", s.rooms); |
|
||||
console.log(room + ' created ') |
|
||||
// });
|
|
||||
} |
|
||||
}, 30000); |
|
||||
|
|
||||
|
|
||||
|
io.on("log", (data) => { |
||||
|
console.log(JSON.stringify("d " + data)); |
||||
|
console.error(JSON.stringify("c " + data)); |
||||
|
}); |
||||
|
io.on("connection", (s) => { |
||||
|
console.error(`\nSomeone connected to port 3000`); |
||||
|
var id = s.id; |
||||
|
|
||||
|
s.on("log", (data) => { |
||||
|
console.log(JSON.stringify(data)); |
||||
|
console.error(JSON.stringify(data)); |
||||
|
}); |
||||
|
|
||||
var id = s.id |
}); |
||||
s.on('log', obj => { |
|
||||
console.error('from client '+ s.id + ' obj ' + obj); |
|
||||
}); |
|
||||
|
|
||||
}); |
|
||||
|
|
||||
http.listen(3000, () => console.error('listening on http://localhost:3000/')); |
http.listen(3000, () => console.error("listening on http://0.0.0.0:3000/")); |
||||
console.error('socket.io example'); |
console.error("Run demo project"); |
||||
|
console.log("Hello World!"); |
||||
|
@ -0,0 +1,889 @@ |
|||||
|
"use strict" |
||||
|
|
||||
|
var pathmodule = require('path'); |
||||
|
var app = require('express')(); |
||||
|
var http = require('http').Server(app); |
||||
|
var https = require('https'); |
||||
|
var CONFIG = require( pathmodule.resolve( __dirname, "runconfig.js" ) ); |
||||
|
const io = require("socket.io")(http, { |
||||
|
// pingTimeout: 30000,
|
||||
|
// allowUpgrades: false,
|
||||
|
// serveClient: false,
|
||||
|
// pingInterval: 10000,
|
||||
|
// //transports: [ 'websocket', 'polling' ],
|
||||
|
// transports: [ 'polling', 'websocket' ],
|
||||
|
cors: { |
||||
|
origin: "http://localhost:8080", |
||||
|
methods: ["GET", "POST"], |
||||
|
allowedHeaders: ["my-custom-header"], |
||||
|
credentials: true |
||||
|
}, |
||||
|
cookie: { |
||||
|
name: "test", |
||||
|
httpOnly: false, |
||||
|
path: "/custom" |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
/* |
||||
|
const Redis = require("ioredis"); |
||||
|
const redistest = new Redis({ |
||||
|
host: 'redisserver', |
||||
|
port: 6379, |
||||
|
}); |
||||
|
const pubtest = new Redis({ |
||||
|
host: 'redisserver', |
||||
|
port: 6379, |
||||
|
}); |
||||
|
*/ |
||||
|
|
||||
|
|
||||
|
//import { createAdapter } from 'socket.io-redis';
|
||||
|
const createAdapter = require('socket.io-redis'); |
||||
|
//const RedisClient = require("redis");
|
||||
|
const Redis = require("ioredis"); |
||||
|
//const pubClient = RedisClient.createClient({
|
||||
|
|
||||
|
const pubClient = new Redis({ |
||||
|
host: 'redisserver', |
||||
|
port: 6379, |
||||
|
}); |
||||
|
|
||||
|
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
|
||||
|
const subClient = pubClient.duplicate(); |
||||
|
|
||||
|
io.adapter(createAdapter({ pubClient, subClient })); |
||||
|
|
||||
|
|
||||
|
pubClient.on("connect", function() { |
||||
|
console.log("You are now connected"); |
||||
|
}); |
||||
|
|
||||
|
const MongoClient = require('mongodb').MongoClient; |
||||
|
const { DateTime } = require("luxon"); |
||||
|
|
||||
|
|
||||
|
var async = require("async"); |
||||
|
const { check, validationResult } = require('express-validator'); |
||||
|
const urlExistSync = require("url-exist-sync"); |
||||
|
|
||||
|
var express = require('express'); |
||||
|
app.use(express.json()); |
||||
|
|
||||
|
const axios = require('axios'); |
||||
|
axios.defaults.timeout = 30000 |
||||
|
|
||||
|
const helmet = require('helmet'); |
||||
|
app.use(helmet()); |
||||
|
|
||||
|
const cors = require('cors') |
||||
|
const whitelist = [ |
||||
|
'http://localhost:8080', |
||||
|
'http://localhost:3080', |
||||
|
'http://localhost:3081', |
||||
|
'http://localhost:3082' |
||||
|
] |
||||
|
const corsOptions = { |
||||
|
credentials: true, |
||||
|
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'], |
||||
|
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
||||
|
allowedHeaders: [ |
||||
|
'Content-Type', |
||||
|
'Authorization', |
||||
|
'X-Requested-With', |
||||
|
'device-remember-token', |
||||
|
'Access-Control-Allow-Origin', |
||||
|
'Access-Control-Allow-Headers', |
||||
|
'Origin', |
||||
|
'Accept' |
||||
|
], |
||||
|
origin: function(origin, callback) { |
||||
|
if (whitelist.indexOf(origin) !== -1) { |
||||
|
callback(null, true) |
||||
|
} else { |
||||
|
callback(null, true) |
||||
|
//callback(new Error('Not allowed by CORS'))
|
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
// ***************************************************
|
||||
|
// checktoken
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
async function checkToken(token) { |
||||
|
const agent = new https.Agent({ |
||||
|
rejectUnauthorized: false, |
||||
|
}); |
||||
|
const instance = axios.create({ |
||||
|
baseURL: 'https://api.swarmlab.io', |
||||
|
withCredentials: true, |
||||
|
rejectUnauthorized: false, |
||||
|
crossdomain: true, |
||||
|
httpsAgent: agent, |
||||
|
headers: { |
||||
|
'Accept': 'application/json', |
||||
|
'Content-Type': 'multipart/form-data', |
||||
|
'Authorization': 'Bearer '+token |
||||
|
} |
||||
|
}) |
||||
|
try { |
||||
|
var pipelines = { |
||||
|
"source":'ssologin' |
||||
|
} |
||||
|
var params = { |
||||
|
pipeline: pipelines |
||||
|
} |
||||
|
|
||||
|
var options = { |
||||
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
||||
|
}; |
||||
|
|
||||
|
instance.defaults.timeout = 30000; |
||||
|
const res = await instance.post('/istokenvalidsso',params,options); |
||||
|
if(res.status == 200){ |
||||
|
//console.log("check " +JSON.stringify(res.data))
|
||||
|
return res.data |
||||
|
}else{ |
||||
|
console.log("noerror: " + res) |
||||
|
return res.status |
||||
|
|
||||
|
} |
||||
|
} |
||||
|
catch (err) { |
||||
|
console.error("error: "+err); |
||||
|
var error = new Object(); |
||||
|
error.action = '401' |
||||
|
return error |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
function convertDateToUTC(date) { |
||||
|
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds()); |
||||
|
} |
||||
|
|
||||
|
// ***************************************************
|
||||
|
// get pipelines
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
async function getpipelines(token,pipelinename) { |
||||
|
const agent = new https.Agent({ |
||||
|
rejectUnauthorized: false, |
||||
|
}); |
||||
|
const instance = axios.create({ |
||||
|
baseURL: 'https://api.swarmlab.io', |
||||
|
withCredentials: true, |
||||
|
rejectUnauthorized: false, |
||||
|
crossdomain: true, |
||||
|
httpsAgent: agent, |
||||
|
headers: { |
||||
|
'Accept': 'application/json', |
||||
|
'Content-Type': 'multipart/form-data', |
||||
|
'Authorization': 'Bearer '+token |
||||
|
} |
||||
|
}) |
||||
|
/* |
||||
|
var params = { |
||||
|
playbook: value |
||||
|
} |
||||
|
var options = { |
||||
|
params: params, |
||||
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
||||
|
}; |
||||
|
|
||||
|
const playbook = await api.GET('playbookCode',options); |
||||
|
return playbook |
||||
|
*/ |
||||
|
try { |
||||
|
|
||||
|
var pipelines = { |
||||
|
"querytokenFilter":CONFIG.api.token, |
||||
|
"filter":pipelinename |
||||
|
} |
||||
|
//var params = {
|
||||
|
// pipeline: pipelines
|
||||
|
// }
|
||||
|
var params = { |
||||
|
querytokenFilter:CONFIG.api.token, |
||||
|
filter:pipelinename |
||||
|
} |
||||
|
|
||||
|
var options = { |
||||
|
params: params, |
||||
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
||||
|
}; |
||||
|
|
||||
|
//https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes
|
||||
|
instance.defaults.timeout = 30000; |
||||
|
//const res = await instance.get('/getplaygrounds',params,options);
|
||||
|
const res = await instance.get('/getplaygrounds',options); |
||||
|
if(res.status == 200){ |
||||
|
return res.data |
||||
|
}else{ |
||||
|
console.log("noerror: " + res) |
||||
|
return await res.status |
||||
|
|
||||
|
} |
||||
|
} |
||||
|
catch (err) { |
||||
|
console.error("error: "+err); |
||||
|
var error = new Object(); |
||||
|
error.action = '401' |
||||
|
return await error |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// ***************************************************
|
||||
|
// get user pipelines
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
async function getuserpipelines(token,user,swarmlabname) { |
||||
|
var pipelinename = user |
||||
|
const agent = new https.Agent({ |
||||
|
rejectUnauthorized: false, |
||||
|
}); |
||||
|
const instance = axios.create({ |
||||
|
baseURL: 'https://api.swarmlab.io', |
||||
|
withCredentials: true, |
||||
|
rejectUnauthorized: false, |
||||
|
crossdomain: true, |
||||
|
httpsAgent: agent, |
||||
|
headers: { |
||||
|
'Accept': 'application/json', |
||||
|
'Content-Type': 'multipart/form-data', |
||||
|
'Authorization': 'Bearer '+token |
||||
|
} |
||||
|
}) |
||||
|
try { |
||||
|
|
||||
|
var pipelines = { |
||||
|
"querytokenFilter":CONFIG.api.token, |
||||
|
"filter":pipelinename, |
||||
|
swarmlabname:swarmlabname |
||||
|
} |
||||
|
//var params = {
|
||||
|
// pipeline: pipelines
|
||||
|
// }
|
||||
|
var params = { |
||||
|
querytokenFilter:CONFIG.api.token, |
||||
|
filter:pipelinename, |
||||
|
swarmlabname:swarmlabname |
||||
|
} |
||||
|
|
||||
|
var options = { |
||||
|
params: params, |
||||
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` }, |
||||
|
}; |
||||
|
|
||||
|
instance.defaults.timeout = 30000; |
||||
|
const res = await instance.get('/getuserplaygrounds',options); |
||||
|
if(res.status == 200){ |
||||
|
return res.data |
||||
|
}else{ |
||||
|
console.log("noerror: " + res) |
||||
|
return await res.status |
||||
|
|
||||
|
} |
||||
|
} |
||||
|
catch (err) { |
||||
|
console.error("error: "+err); |
||||
|
var error = new Object(); |
||||
|
error.action = '401' |
||||
|
error.error = err |
||||
|
return await error |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
global.online='ob'; |
||||
|
global.pipelines=[]; |
||||
|
|
||||
|
function sendlog(reslog,pathfileval){ |
||||
|
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval); |
||||
|
//for (var key in usertmp.data){
|
||||
|
var user = usertmp.data[0].user25user; |
||||
|
// if(usertmp.data){
|
||||
|
console.log('-----------------------' + JSON.stringify(usertmp)); |
||||
|
io.in(user).emit("logdata", reslog); |
||||
|
// }
|
||||
|
//}
|
||||
|
} |
||||
|
function onlogfile(path){ |
||||
|
|
||||
|
console.log('File', path, 'has been added'); |
||||
|
var pathfileval = pathmodule.basename(path); |
||||
|
var arrfile = pathfileval.toString().split("-"); |
||||
|
var pathfile = arrfile[0]; |
||||
|
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
||||
|
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) |
||||
|
if (indexfind1 === -1 ){ |
||||
|
(async() => { |
||||
|
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) |
||||
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
||||
|
var resdata = await getpipelines(token,pathfile) |
||||
|
//resdata.data.pathlogfile = 'test'
|
||||
|
var resob = {} |
||||
|
resob.pathlogfile = pathfileval |
||||
|
var resobarray = [] |
||||
|
for (let i in resdata.data) { |
||||
|
var resob1 = {} |
||||
|
resob1.data = resdata.data[i].res25swarmlabname |
||||
|
resob1.user25user = resdata.data[i].res25user |
||||
|
resob1.res25creator = resdata.data[i].res25creator |
||||
|
resob1.res25fileforce = resdata.data[i].res25fileforce |
||||
|
resobarray.push(resob1) |
||||
|
} |
||||
|
resob.data = resobarray |
||||
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
||||
|
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
||||
|
})() |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
// ***************************************************
|
||||
|
// rest get
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
app.get('/get_log', [ |
||||
|
check('token').isLength({ min: 40 }) |
||||
|
], |
||||
|
cors(corsOptions), (req, res, next) => { |
||||
|
|
||||
|
(async() => { |
||||
|
var RES = new Object(); |
||||
|
RES.token = req.query["token"] |
||||
|
RES.start = req.query["start"] |
||||
|
RES.end = req.query["end"] |
||||
|
RES.swarmlabname = req.query["swarmlabname"] |
||||
|
RES.ok = 'ok' |
||||
|
/* |
||||
|
* |
||||
|
* validate |
||||
|
* |
||||
|
*/ |
||||
|
|
||||
|
|
||||
|
|
||||
|
var isvalid = await checkToken(RES.token); |
||||
|
if(isvalid.action == 'ok'){ |
||||
|
console.log("Authserver ok " + RES.token); |
||||
|
RES.error = 'ok' |
||||
|
}else{ |
||||
|
console.log("Authserver no " + RES.token); |
||||
|
RES.error = 'no' |
||||
|
} |
||||
|
if(RES.error == 'ok'){ |
||||
|
|
||||
|
var resdata = await getuserpipelines(RES.token,isvalid.user,RES.swarmlabname) |
||||
|
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
||||
|
const OPTS = { |
||||
|
useNewUrlParser: true, |
||||
|
useUnifiedTopology: true |
||||
|
}; |
||||
|
MongoClient.connect(mongourl, OPTS, function(err, client){ |
||||
|
if(err){ |
||||
|
console.log(err); |
||||
|
} else { |
||||
|
const db = client.db('fluent'); |
||||
|
//usersession.SOCKET.user = isvalid.user
|
||||
|
console.log(JSON.stringify('mongo ----------------connected')) |
||||
|
console.log('-----test------- '+JSON.stringify(RES)) |
||||
|
if ((typeof RES.start !== "undefined") && (typeof RES.end !== "undefined")) { |
||||
|
if(DateTime.fromISO(RES.start).isValid){ |
||||
|
var datestart = DateTime.fromISO(RES.start) |
||||
|
var dateend = DateTime.fromISO(RES.end) |
||||
|
var search_term = { |
||||
|
"$and": [ |
||||
|
{ |
||||
|
"time": { |
||||
|
$gte: datestart |
||||
|
} |
||||
|
}, |
||||
|
{ |
||||
|
"time": { |
||||
|
$lt: dateend |
||||
|
} |
||||
|
}, |
||||
|
] |
||||
|
} |
||||
|
}else{ |
||||
|
RES.ok = 'no' |
||||
|
} |
||||
|
}else if(typeof RES.end !== "undefined"){ |
||||
|
var dateend = DateTime.fromISO(RES.end) |
||||
|
if(DateTime.fromISO(RES.end).isValid){ |
||||
|
var search_term = { |
||||
|
"$and": [ |
||||
|
{ |
||||
|
"time": { |
||||
|
$lt: dateend |
||||
|
} |
||||
|
} |
||||
|
] |
||||
|
} |
||||
|
}else{ |
||||
|
RES.ok = 'no' |
||||
|
} |
||||
|
}else if(typeof RES.start !== "undefined"){ |
||||
|
var datestart = DateTime.fromISO(RES.start) |
||||
|
if(DateTime.fromISO(RES.start).isValid){ |
||||
|
var search_term = { |
||||
|
"$and": [ |
||||
|
{ |
||||
|
"time": { |
||||
|
$gte: datestart |
||||
|
} |
||||
|
} |
||||
|
] |
||||
|
} |
||||
|
}else{ |
||||
|
RES.ok = 'no' |
||||
|
} |
||||
|
} |
||||
|
if(RES.ok == 'ok'){ |
||||
|
//var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }'
|
||||
|
//var search_term = {"time" : {$lte : datenow}}
|
||||
|
var resdataarray = [] |
||||
|
var resraw = {} |
||||
|
var reslab = '' |
||||
|
var datestart1 = DateTime.fromISO(RES.start) |
||||
|
console.log('-----now1------- '+JSON.stringify(search_term)) |
||||
|
console.log('-----now2------- '+JSON.stringify(datestart1)) |
||||
|
console.log('-----now3------- '+JSON.stringify(datestart)) |
||||
|
|
||||
|
db.collection('logs').find(search_term).toArray() |
||||
|
//db.collection('logs').find({"time" : {$gt : datestart}}).toArray()
|
||||
|
.then(item => { |
||||
|
console.log('item '+JSON.stringify(item)) |
||||
|
for (let i in item) { |
||||
|
reslab = item[i].tailed_path |
||||
|
var segment_array = reslab.split( '/' ); |
||||
|
var last_segment = segment_array.pop(); |
||||
|
var fieldstmp = last_segment.split('-'); |
||||
|
var nameofswarmlab = fieldstmp[0]; |
||||
|
|
||||
|
var regexlog = new RegExp(nameofswarmlab); |
||||
|
for (let ii in resdata.data) { |
||||
|
if( regexlog.test(resdata.data[ii].res25swarmlabname) ){ |
||||
|
resdataarray.push(item[i]) |
||||
|
RES.found = item[i] |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
RES.error_msg = "ok" |
||||
|
RES.data = resdataarray |
||||
|
//RES.dataserver = resdataarray
|
||||
|
//RES.dataservertmp = resdata
|
||||
|
res.json(RES) |
||||
|
}) |
||||
|
.catch(err => { |
||||
|
console.error(err) |
||||
|
RES.error_msg = err |
||||
|
res.json(RES) |
||||
|
}) |
||||
|
} else{ // RES.ok
|
||||
|
RES.error_msg = 'no date' |
||||
|
res.json(RES) |
||||
|
} |
||||
|
} // error mongo connect
|
||||
|
}); // mongo connect
|
||||
|
}else{ // token error
|
||||
|
RES.data = 'no' |
||||
|
RES.error_msg = "token err" |
||||
|
res.json(RES) |
||||
|
} |
||||
|
})() |
||||
|
|
||||
|
}); |
||||
|
|
||||
|
app.get('/run', [ |
||||
|
//check('access_token').isLength({ min: 40 }),
|
||||
|
//check('llo').isBase64()
|
||||
|
], |
||||
|
cors(corsOptions), (req, res, next) => { |
||||
|
|
||||
|
(async() => { |
||||
|
|
||||
|
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
||||
|
const OPTS = { |
||||
|
useNewUrlParser: true, |
||||
|
useUnifiedTopology: true |
||||
|
}; |
||||
|
MongoClient.connect(mongourl, OPTS, function(err, client){ |
||||
|
if(err){ |
||||
|
console.log(err); |
||||
|
} else { |
||||
|
const db = client.db('fluent'); |
||||
|
//db.collection('log', onCollection);
|
||||
|
console.log(JSON.stringify('mongo connected')) |
||||
|
var stream = db.collection('logs').find({}, { |
||||
|
tailable: true, |
||||
|
awaitdata: true |
||||
|
/* other options */ |
||||
|
}).stream(); |
||||
|
|
||||
|
stream.on('data', function (doc) { |
||||
|
console.log(JSON.stringify(doc)) |
||||
|
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
|
||||
|
}); |
||||
|
} |
||||
|
}); |
||||
|
var RES = new Object(); |
||||
|
RES.code = req.query["filter"] |
||||
|
RES.token = req.query["filter"] |
||||
|
var isvalid = await checkToken(RES.token); |
||||
|
if(isvalid.action == 'ok'){ |
||||
|
console.log("Authserver ok " + RES.token); |
||||
|
}else{ |
||||
|
console.log("Authserver no " + RES.token); |
||||
|
} |
||||
|
RES.error = false |
||||
|
RES.error_msg = "ok" |
||||
|
res.json(RES) |
||||
|
})() |
||||
|
|
||||
|
}); |
||||
|
|
||||
|
// ***************************************************
|
||||
|
// rest post
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
app.post('/run', [ |
||||
|
//check('access_token').isLength({ min: 40 }),
|
||||
|
//check('llo').isBase64()
|
||||
|
], |
||||
|
cors(corsOptions), (req, res, next) => { |
||||
|
|
||||
|
|
||||
|
(async() => { |
||||
|
|
||||
|
//console.log(JSON.stringify(req.headers));
|
||||
|
//console.log(JSON.stringify(req.body));
|
||||
|
//console.log("mongo "+JSON.stringify(req.body));
|
||||
|
//console.log("LOG "+JSON.stringify(req.body[0].message));
|
||||
|
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
|
||||
|
for (var i = 0; i < req.body.length; i++){ |
||||
|
//var getpath = await onlogfile(req.body[i].tailed_path)
|
||||
|
|
||||
|
var path = req.body[i].tailed_path |
||||
|
|
||||
|
console.log('File', path, 'has been added'); |
||||
|
var pathfileval = pathmodule.basename(path); |
||||
|
var arrfile = pathfileval.toString().split("-"); |
||||
|
var pathfile = arrfile[0]; |
||||
|
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
||||
|
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval)) |
||||
|
if (indexfind1 === -1 ){ |
||||
|
(async() => { |
||||
|
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval)) |
||||
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
||||
|
var resdata = await getpipelines(token,pathfile) |
||||
|
//resdata.data.pathlogfile = 'test'
|
||||
|
var resob = {} |
||||
|
resob.pathlogfile = pathfileval |
||||
|
var resobarray = [] |
||||
|
for (let i in resdata.data) { |
||||
|
var resob1 = {} |
||||
|
resob1.data = resdata.data[i].res25swarmlabname |
||||
|
resob1.user25user = resdata.data[i].res25user |
||||
|
resob1.res25creator = resdata.data[i].res25creator |
||||
|
resob1.res25fileforce = resdata.data[i].res25fileforce |
||||
|
resobarray.push(resob1) |
||||
|
} |
||||
|
resob.data = resobarray |
||||
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
||||
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
||||
|
|
||||
|
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
||||
|
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval) |
||||
|
|
||||
|
//console.log('info', JSON.stringify(resdata));
|
||||
|
//console.log('info------------- ', JSON.stringify(global.pipelines));
|
||||
|
})() |
||||
|
} |
||||
|
//
|
||||
|
var obj = req.body[i]; |
||||
|
|
||||
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
|
||||
|
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
|
||||
|
var now = new Date(); |
||||
|
|
||||
|
var reslog = new Object(); |
||||
|
reslog.log = obj |
||||
|
|
||||
|
reslog.date = convertDateToUTC(now) |
||||
|
console.log(reslog); |
||||
|
var pathfileval = pathmodule.basename(reslog.log.tailed_path); |
||||
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval); |
||||
|
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
|
||||
|
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval) |
||||
|
console.log("IOT "+JSON.stringify(reslog.log.tailed_path)); |
||||
|
console.log("IOTindexfind "+JSON.stringify(indexfind)); |
||||
|
console.log("IOTuser "+JSON.stringify(global.pipelines)); |
||||
|
// io.in("iot").emit("message", reslog);
|
||||
|
// io.emit("logdata", reslog);
|
||||
|
} |
||||
|
})() |
||||
|
|
||||
|
//io.in("iot").emit("message", RES);
|
||||
|
|
||||
|
console.error('socket POST from client'); |
||||
|
var RES = new Object(); |
||||
|
RES.error = false |
||||
|
RES.error_msg = "ok" |
||||
|
RES.msg = req.body[0].messsage |
||||
|
|
||||
|
res.json(RES) |
||||
|
}); |
||||
|
|
||||
|
// ***************************************************
|
||||
|
// socket
|
||||
|
// ***************************************************
|
||||
|
|
||||
|
//function getSHA256ofJSON(input){
|
||||
|
// return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
|
||||
|
//}
|
||||
|
|
||||
|
function sleep(ms) { |
||||
|
return new Promise(resolve => setTimeout(resolve, ms)); |
||||
|
} |
||||
|
|
||||
|
function getSHA256ofJSON(data, inputEncoding, encoding){ |
||||
|
if (!data) { |
||||
|
return ''; |
||||
|
} |
||||
|
inputEncoding = inputEncoding || 'utf-8'; |
||||
|
encoding = encoding || 'hex'; |
||||
|
const hash = require("crypto").createHash('md5'); |
||||
|
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); |
||||
|
} |
||||
|
|
||||
|
//var getkey = function getkey(key) {
|
||||
|
async function getkey(key) { |
||||
|
return new Promise((resolve) => { |
||||
|
|
||||
|
|
||||
|
pubClient.get(key, function (err, reply) { |
||||
|
if(err){ |
||||
|
|
||||
|
console.log('----------error------------') |
||||
|
|
||||
|
resolve(null) |
||||
|
} |
||||
|
else { |
||||
|
if(reply){ |
||||
|
console.log('---------fount----------') |
||||
|
resolve(1) |
||||
|
}else{ |
||||
|
console.log('----------not fount------------') |
||||
|
resolve(2) |
||||
|
//return 2
|
||||
|
} |
||||
|
} |
||||
|
}) |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
var setkey = function setkv(key,value){ |
||||
|
return new Promise((resolve)=>{ |
||||
|
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
||||
|
pubClient.set(key,value, function(err,reply){ |
||||
|
if(err){ |
||||
|
resolve(null) |
||||
|
} |
||||
|
else{ |
||||
|
resolve(reply) |
||||
|
} |
||||
|
}) |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
async function iosend(data,issend,io,pubClient,user1){ |
||||
|
var new1 = {} |
||||
|
new1.tailed_path = data.tailed_path |
||||
|
new1.message = data.message |
||||
|
|
||||
|
var now = new Date(); |
||||
|
var reslog1 = {} |
||||
|
//reslog1.data = resob1
|
||||
|
reslog1.log = new1 |
||||
|
reslog1.date = convertDateToUTC(now) |
||||
|
var user = user1 |
||||
|
|
||||
|
const randomTimeInMs = Math.random() * (2000); |
||||
|
await sleep(randomTimeInMs); |
||||
|
var getres = await getkey(issend); |
||||
|
|
||||
|
if(getres == '1'){ |
||||
|
console.log(issend + ' ---1 '+ JSON.stringify(reslog1)) |
||||
|
//io.in(user).emit("logdata", reslog1);
|
||||
|
}else if(getres == '2'){ |
||||
|
console.log(issend + ' ---2 '+ JSON.stringify(reslog1)) |
||||
|
setkey(issend,'1') |
||||
|
//pubClient.set(issend, '1', function(err, res) {
|
||||
|
//});
|
||||
|
io.in(user).emit("logdata", reslog1); |
||||
|
//}
|
||||
|
|
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
function onCollection(err, collection) { |
||||
|
let options = { tailable: true, |
||||
|
awaitdata: true, |
||||
|
numberOfRetries: -1, |
||||
|
tailableRetryInterval: 500 |
||||
|
}; |
||||
|
var cursor = collection.find({},options).stream(); |
||||
|
var itemsProcessed = 0; |
||||
|
|
||||
|
var reslog = new Object(); |
||||
|
var now = new Date(); |
||||
|
cursor.on('data', function (data) { |
||||
|
var issendob = new Object(); |
||||
|
issendob.id = data._id |
||||
|
issendob.message = data.message |
||||
|
issendob.tailed_path = data.tailed_path |
||||
|
|
||||
|
var issend = getSHA256ofJSON(issendob) |
||||
|
|
||||
|
console.log('++++++++' + JSON.stringify(data)); |
||||
|
console.log('++++++++ob' + JSON.stringify(issendob)); |
||||
|
console.log('++++++++sha' + JSON.stringify(issend)); |
||||
|
|
||||
|
var pathfileval = pathmodule.basename(data.tailed_path); |
||||
|
var arrfile = pathfileval.toString().split("-"); |
||||
|
var pathfile = arrfile[0]; |
||||
|
|
||||
|
var indexupdate = "yes" |
||||
|
var resob = {}; |
||||
|
pubClient.get(pathfileval, function(err, object) { |
||||
|
var objecttmp = JSON.parse(object); |
||||
|
if(object){ |
||||
|
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g,"") |
||||
|
iosend(data,issend,io,pubClient,user1) |
||||
|
}else{ |
||||
|
(async() => { |
||||
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
||||
|
var resdata1 = await getpipelines(token,pathfile) |
||||
|
resob.pathlogfile = pathfileval |
||||
|
var resob11 = {} |
||||
|
var i1 = 0 |
||||
|
resob11.data = resdata1.data[i1].res25swarmlabname |
||||
|
resob11.user25user = resdata1.data[i1].res25user.replace(/[\n\t\r]/g,"") |
||||
|
resob11.res25creator = resdata1.data[i1].res25creator |
||||
|
resob11.res25fileforce = resdata1.data[i1].res25fileforce |
||||
|
resob11.tailed_path = pathfileval |
||||
|
var resob1string1 = JSON.stringify(resob11); |
||||
|
await pubClient.set(pathfileval, resob1string1, function(err, res) { |
||||
|
}); |
||||
|
var user1 = resob11.user25user |
||||
|
iosend(data,issend,io,pubClient,user1) |
||||
|
console.log(' ---no--- '+ JSON.stringify(data)) |
||||
|
})() //await inside yes
|
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
}); |
||||
|
|
||||
|
|
||||
|
setInterval(function () { |
||||
|
console.log('itemsProcessed', itemsProcessed); |
||||
|
// this method is also exposed by the Server instance
|
||||
|
//console.log(adapter.rooms);
|
||||
|
}, 8000); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
var mongourl = "mongodb://"+CONFIG.mongo.user+":"+CONFIG.mongo.password+"@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats" |
||||
|
const OPTS = { |
||||
|
useNewUrlParser: true, |
||||
|
useUnifiedTopology: true |
||||
|
}; |
||||
|
var mongooptions = { |
||||
|
autoReconnect: true, |
||||
|
keepAlive: 1, |
||||
|
connectTimeoutMS: 30000, |
||||
|
socketTimeoutMS: 0 |
||||
|
} |
||||
|
MongoClient.connect(mongourl, OPTS, function(err, client){ |
||||
|
if(err){ |
||||
|
console.log(err); |
||||
|
} else { |
||||
|
const db = client.db('fluent'); |
||||
|
db.collection('logs', onCollection); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
io.on('connection', s => { |
||||
|
console.error('socket connection'); |
||||
|
|
||||
|
//s.set('transports', ['websocket']);
|
||||
|
//s.set('pingTimeout', 30000);
|
||||
|
//s.set('allowUpgrades', false);
|
||||
|
//s.set('serveClient', false);
|
||||
|
//s.set('pingInterval', 10000);
|
||||
|
// ------------------------------
|
||||
|
// --- set
|
||||
|
// ------------------------------
|
||||
|
var usersession = new Object(); |
||||
|
usersession.SOCKET = {}; |
||||
|
usersession.SOCKET.error = {}; |
||||
|
console.error('socket ...'); |
||||
|
s.auth = false; |
||||
|
|
||||
|
// ------------------------------
|
||||
|
// --- authenticate
|
||||
|
// ------------------------------
|
||||
|
s.on('authenticate', function(data){ |
||||
|
const token = data |
||||
|
console.log('invalid 1 ' + token); |
||||
|
(async() => { |
||||
|
var isvalid = await checkToken(token); |
||||
|
if(isvalid.action == 'ok'){ |
||||
|
console.log("Authserver ok ", s.id + ' - ' + token); |
||||
|
// pubClient.set(session, resob1string, function(err, res) {
|
||||
|
// });
|
||||
|
usersession.SOCKET.user = isvalid.user |
||||
|
usersession.SOCKET.scope = isvalid.scope // space delimeter
|
||||
|
usersession.SOCKET.token = isvalid.token |
||||
|
s.auth = true; |
||||
|
}else{ |
||||
|
console.log("Authserver no ", s.id + ' - ' + token); |
||||
|
s.auth = false; |
||||
|
} |
||||
|
})() |
||||
|
}); |
||||
|
|
||||
|
setTimeout(function(){ |
||||
|
if (!s.auth) { |
||||
|
console.log("Disconnecting timeout socket ", s.id); |
||||
|
//s.disconnect('unauthorized');
|
||||
|
}else{ |
||||
|
var room = usersession.SOCKET.user |
||||
|
//s.on("subscribe", function (room) {
|
||||
|
s.join(room); |
||||
|
console.log("joining rooom", s.rooms); |
||||
|
console.log(room + ' created ') |
||||
|
// });
|
||||
|
} |
||||
|
}, 30000); |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
var id = s.id |
||||
|
s.on('log', obj => { |
||||
|
console.error('from client '+ s.id + ' obj ' + obj); |
||||
|
}); |
||||
|
|
||||
|
}); |
||||
|
|
||||
|
http.listen(3000, () => console.error('listening on http://localhost:3000/')); |
||||
|
console.error('socket.io example'); |
Loading…
Reference in new issue