You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
882 lines
31 KiB
882 lines
31 KiB
'use strict'
|
|
|
|
var pathmodule = require('path');
|
|
var app = require('express')();
|
|
var http = require('http').Server(app);
|
|
var https = require('https');
|
|
var CONFIG = require(pathmodule.resolve(__dirname, 'runconfig.js'));
|
|
const io = require('socket.io')(http, {
|
|
// pingTimeout: 30000,
|
|
// allowUpgrades: false,
|
|
// serveClient: false,
|
|
// pingInterval: 10000,
|
|
// //transports: [ 'websocket', 'polling' ],
|
|
// transports: [ 'polling', 'websocket' ],
|
|
cors: {
|
|
origin: 'http://localhost:8080',
|
|
methods: ['GET', 'POST'],
|
|
allowedHeaders: ['my-custom-header'],
|
|
credentials: true
|
|
},
|
|
cookie: {
|
|
name: 'test',
|
|
httpOnly: false,
|
|
path: '/custom'
|
|
}
|
|
});
|
|
|
|
/*
|
|
const Redis = require("ioredis");
|
|
const redistest = new Redis({
|
|
host: 'redisserver',
|
|
port: 6379,
|
|
});
|
|
const pubtest = new Redis({
|
|
host: 'redisserver',
|
|
port: 6379,
|
|
});
|
|
*/
|
|
|
|
|
|
//import { createAdapter } from 'socket.io-redis';
|
|
const createAdapter = require('socket.io-redis');
|
|
//const RedisClient = require("redis");
|
|
const Redis = require('ioredis');
|
|
//const pubClient = RedisClient.createClient({
|
|
|
|
const pubClient = new Redis({
|
|
host: 'redisserver',
|
|
port: 6379,
|
|
});
|
|
|
|
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
|
|
const subClient = pubClient.duplicate();
|
|
|
|
io.adapter(createAdapter({pubClient, subClient}));
|
|
|
|
|
|
pubClient.on('connect', function () {
|
|
console.log('You are now connected');
|
|
});
|
|
|
|
const MongoClient = require('mongodb').MongoClient;
|
|
const {DateTime} = require('luxon');
|
|
|
|
|
|
var async = require('async');
|
|
const {check, validationResult} = require('express-validator');
|
|
const urlExistSync = require('url-exist-sync');
|
|
|
|
var express = require('express');
|
|
app.use(express.json());
|
|
|
|
const axios = require('axios');
|
|
axios.defaults.timeout = 30000
|
|
|
|
const helmet = require('helmet');
|
|
app.use(helmet());
|
|
|
|
const cors = require('cors')
|
|
const whitelist = [
|
|
'http://localhost:8080',
|
|
'http://localhost:3080',
|
|
'http://localhost:3081',
|
|
'http://localhost:3082'
|
|
]
|
|
const corsOptions = {
|
|
credentials: true,
|
|
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
|
|
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
|
|
allowedHeaders: [
|
|
'Content-Type',
|
|
'Authorization',
|
|
'X-Requested-With',
|
|
'device-remember-token',
|
|
'Access-Control-Allow-Origin',
|
|
'Access-Control-Allow-Headers',
|
|
'Origin',
|
|
'Accept'
|
|
],
|
|
origin: function (origin, callback) {
|
|
if (whitelist.indexOf(origin) !== -1) {
|
|
callback(null, true)
|
|
} else {
|
|
callback(null, true)
|
|
//callback(new Error('Not allowed by CORS'))
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// ***************************************************
|
|
// checktoken
|
|
// ***************************************************
|
|
|
|
async function checkToken(token) {
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: 'https://api.swarmlab.io',
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'multipart/form-data',
|
|
'Authorization': 'Bearer ' + token
|
|
}
|
|
})
|
|
try {
|
|
var pipelines = {
|
|
'source': 'ssologin'
|
|
}
|
|
var params = {
|
|
pipeline: pipelines
|
|
}
|
|
|
|
var options = {
|
|
headers: {'content-type': 'application/x-www-form-urlencoded', Authorization: `Bearer ${token}`},
|
|
};
|
|
|
|
instance.defaults.timeout = 30000;
|
|
const res = await instance.post('/istokenvalidsso', params, options);
|
|
if (res.status == 200) {
|
|
//console.log("check " +JSON.stringify(res.data))
|
|
return res.data
|
|
} else {
|
|
console.log('noerror: ' + res)
|
|
return res.status
|
|
|
|
}
|
|
} catch (err) {
|
|
console.error('error: ' + err);
|
|
var error = new Object();
|
|
error.action = '401'
|
|
return error
|
|
}
|
|
}
|
|
|
|
|
|
function convertDateToUTC(date) {
|
|
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(), date.getUTCMilliseconds());
|
|
}
|
|
|
|
// ***************************************************
|
|
// get pipelines
|
|
// ***************************************************
|
|
|
|
async function getpipelines(token, pipelinename) {
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: 'https://api.swarmlab.io',
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'multipart/form-data',
|
|
'Authorization': 'Bearer ' + token
|
|
}
|
|
})
|
|
/*
|
|
var params = {
|
|
playbook: value
|
|
}
|
|
var options = {
|
|
params: params,
|
|
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
|
|
};
|
|
|
|
const playbook = await api.GET('playbookCode',options);
|
|
return playbook
|
|
*/
|
|
try {
|
|
|
|
var pipelines = {
|
|
'querytokenFilter': CONFIG.api.token,
|
|
'filter': pipelinename
|
|
}
|
|
//var params = {
|
|
// pipeline: pipelines
|
|
// }
|
|
var params = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename
|
|
}
|
|
|
|
var options = {
|
|
params: params,
|
|
headers: {'content-type': 'application/x-www-form-urlencoded', Authorization: `Bearer ${token}`},
|
|
};
|
|
|
|
//https://api.swarmlab.io/gettutorlabrooms?sort=pipelinename%7Casc&page=1&per_page=5&filter=&type=scripts&tutor=yes
|
|
instance.defaults.timeout = 30000;
|
|
//const res = await instance.get('/getplaygrounds',params,options);
|
|
const res = await instance.get('/getplaygrounds', options);
|
|
if (res.status == 200) {
|
|
return res.data
|
|
} else {
|
|
console.log('noerror: ' + res)
|
|
return await res.status
|
|
|
|
}
|
|
} catch (err) {
|
|
console.error('error: ' + err);
|
|
var error = new Object();
|
|
error.action = '401'
|
|
return await error
|
|
}
|
|
}
|
|
|
|
// ***************************************************
|
|
// get user pipelines
|
|
// ***************************************************
|
|
|
|
async function getuserpipelines(token, user, swarmlabname) {
|
|
var pipelinename = user
|
|
const agent = new https.Agent({
|
|
rejectUnauthorized: false,
|
|
});
|
|
const instance = axios.create({
|
|
baseURL: 'https://api.swarmlab.io',
|
|
withCredentials: true,
|
|
rejectUnauthorized: false,
|
|
crossdomain: true,
|
|
httpsAgent: agent,
|
|
headers: {
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'multipart/form-data',
|
|
'Authorization': 'Bearer ' + token
|
|
}
|
|
})
|
|
try {
|
|
|
|
var pipelines = {
|
|
'querytokenFilter': CONFIG.api.token,
|
|
'filter': pipelinename,
|
|
swarmlabname: swarmlabname
|
|
}
|
|
//var params = {
|
|
// pipeline: pipelines
|
|
// }
|
|
var params = {
|
|
querytokenFilter: CONFIG.api.token,
|
|
filter: pipelinename,
|
|
swarmlabname: swarmlabname
|
|
}
|
|
|
|
var options = {
|
|
params: params,
|
|
headers: {'content-type': 'application/x-www-form-urlencoded', Authorization: `Bearer ${token}`},
|
|
};
|
|
|
|
instance.defaults.timeout = 30000;
|
|
const res = await instance.get('/getuserplaygrounds', options);
|
|
if (res.status == 200) {
|
|
return res.data
|
|
} else {
|
|
console.log('noerror: ' + res)
|
|
return await res.status
|
|
|
|
}
|
|
} catch (err) {
|
|
console.error('error: ' + err);
|
|
var error = new Object();
|
|
error.action = '401'
|
|
error.error = err
|
|
return await error
|
|
}
|
|
}
|
|
|
|
|
|
global.online = 'ob';
|
|
global.pipelines = [];
|
|
|
|
function sendlog(reslog, pathfileval) {
|
|
var usertmp = global.pipelines.find(x => x.pathlogfile == pathfileval);
|
|
//for (var key in usertmp.data){
|
|
var user = usertmp.data[0].user25user;
|
|
// if(usertmp.data){
|
|
console.log('-----------------------' + JSON.stringify(usertmp));
|
|
io.in(user).emit('logdata', reslog);
|
|
// }
|
|
//}
|
|
}
|
|
|
|
function onlogfile(path) {
|
|
|
|
console.log('File', path, 'has been added');
|
|
var pathfileval = pathmodule.basename(path);
|
|
var arrfile = pathfileval.toString().split('-');
|
|
var pathfile = arrfile[0];
|
|
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile == pathfileval);
|
|
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
|
|
if (indexfind1 === -1) {
|
|
(async () => {
|
|
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
|
|
var token = 'd2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407'; // desto
|
|
var resdata = await getpipelines(token, pathfile)
|
|
//resdata.data.pathlogfile = 'test'
|
|
var resob = {}
|
|
resob.pathlogfile = pathfileval
|
|
var resobarray = []
|
|
for (let i in resdata.data) {
|
|
var resob1 = {}
|
|
resob1.data = resdata.data[i].res25swarmlabname
|
|
resob1.user25user = resdata.data[i].res25user
|
|
resob1.res25creator = resdata.data[i].res25creator
|
|
resob1.res25fileforce = resdata.data[i].res25fileforce
|
|
resobarray.push(resob1)
|
|
}
|
|
resob.data = resobarray
|
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile == pathfileval);
|
|
indexfind === -1 ? global.pipelines.push(resob) : console.log('object already exists ' + pathfileval)
|
|
})()
|
|
}
|
|
|
|
}
|
|
|
|
// ***************************************************
|
|
// rest get
|
|
// ***************************************************
|
|
|
|
app.get('/get_log', [
|
|
check('token').isLength({min: 40})
|
|
],
|
|
cors(corsOptions), (req, res, next) => {
|
|
|
|
(async () => {
|
|
var RES = new Object();
|
|
RES.token = req.query['token']
|
|
RES.start = req.query['start']
|
|
RES.end = req.query['end']
|
|
RES.swarmlabname = req.query['swarmlabname']
|
|
RES.ok = 'ok'
|
|
/*
|
|
*
|
|
* validate
|
|
*
|
|
*/
|
|
|
|
|
|
var isvalid = await checkToken(RES.token);
|
|
if (isvalid.action == 'ok') {
|
|
console.log('Authserver ok ' + RES.token);
|
|
RES.error = 'ok'
|
|
} else {
|
|
console.log('Authserver no ' + RES.token);
|
|
RES.error = 'no'
|
|
}
|
|
if (RES.error == 'ok') {
|
|
|
|
var resdata = await getuserpipelines(RES.token, isvalid.user, RES.swarmlabname)
|
|
var mongourl = 'mongodb://' + CONFIG.mongo.user + ':' + CONFIG.mongo.password + '@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats'
|
|
const OPTS = {
|
|
useNewUrlParser: true,
|
|
useUnifiedTopology: true
|
|
};
|
|
MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
const db = client.db('fluent');
|
|
//usersession.SOCKET.user = isvalid.user
|
|
console.log(JSON.stringify('mongo ----------------connected'))
|
|
console.log('-----test------- ' + JSON.stringify(RES))
|
|
if ((typeof RES.start !== 'undefined') && (typeof RES.end !== 'undefined')) {
|
|
if (DateTime.fromISO(RES.start).isValid) {
|
|
var datestart = DateTime.fromISO(RES.start)
|
|
var dateend = DateTime.fromISO(RES.end)
|
|
var search_term = {
|
|
'$and': [
|
|
{
|
|
'time': {
|
|
$gte: datestart
|
|
}
|
|
},
|
|
{
|
|
'time': {
|
|
$lt: dateend
|
|
}
|
|
},
|
|
]
|
|
}
|
|
} else {
|
|
RES.ok = 'no'
|
|
}
|
|
} else if (typeof RES.end !== 'undefined') {
|
|
var dateend = DateTime.fromISO(RES.end)
|
|
if (DateTime.fromISO(RES.end).isValid) {
|
|
var search_term = {
|
|
'$and': [
|
|
{
|
|
'time': {
|
|
$lt: dateend
|
|
}
|
|
}
|
|
]
|
|
}
|
|
} else {
|
|
RES.ok = 'no'
|
|
}
|
|
} else if (typeof RES.start !== 'undefined') {
|
|
var datestart = DateTime.fromISO(RES.start)
|
|
if (DateTime.fromISO(RES.start).isValid) {
|
|
var search_term = {
|
|
'$and': [
|
|
{
|
|
'time': {
|
|
$gte: datestart
|
|
}
|
|
}
|
|
]
|
|
}
|
|
} else {
|
|
RES.ok = 'no'
|
|
}
|
|
}
|
|
if (RES.ok == 'ok') {
|
|
//var search_term = '{"$gte": new Date("2020-12-01T00:00:00.000Z") , "$lt": new Date("2020-12-11T16:17:36.470Z") }'
|
|
//var search_term = {"time" : {$lte : datenow}}
|
|
var resdataarray = []
|
|
var resraw = {}
|
|
var reslab = ''
|
|
var datestart1 = DateTime.fromISO(RES.start)
|
|
console.log('-----now1------- ' + JSON.stringify(search_term))
|
|
console.log('-----now2------- ' + JSON.stringify(datestart1))
|
|
console.log('-----now3------- ' + JSON.stringify(datestart))
|
|
|
|
db.collection('logs').find(search_term).toArray()
|
|
//db.collection('logs').find({"time" : {$gt : datestart}}).toArray()
|
|
.then(item => {
|
|
console.log('item ' + JSON.stringify(item))
|
|
for (let i in item) {
|
|
reslab = item[i].tailed_path
|
|
var segment_array = reslab.split('/');
|
|
var last_segment = segment_array.pop();
|
|
var fieldstmp = last_segment.split('-');
|
|
var nameofswarmlab = fieldstmp[0];
|
|
|
|
var regexlog = new RegExp(nameofswarmlab);
|
|
for (let ii in resdata.data) {
|
|
if (regexlog.test(resdata.data[ii].res25swarmlabname)) {
|
|
resdataarray.push(item[i])
|
|
RES.found = item[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
RES.error_msg = 'ok'
|
|
RES.data = resdataarray
|
|
//RES.dataserver = resdataarray
|
|
//RES.dataservertmp = resdata
|
|
res.json(RES)
|
|
})
|
|
.catch(err => {
|
|
console.error(err)
|
|
RES.error_msg = err
|
|
res.json(RES)
|
|
})
|
|
} else { // RES.ok
|
|
RES.error_msg = 'no date'
|
|
res.json(RES)
|
|
}
|
|
} // error mongo connect
|
|
}); // mongo connect
|
|
} else { // token error
|
|
RES.data = 'no'
|
|
RES.error_msg = 'token err'
|
|
res.json(RES)
|
|
}
|
|
})()
|
|
|
|
});
|
|
|
|
app.get('/run', [
|
|
//check('access_token').isLength({ min: 40 }),
|
|
//check('llo').isBase64()
|
|
],
|
|
cors(corsOptions), (req, res, next) => {
|
|
|
|
(async () => {
|
|
|
|
var mongourl = 'mongodb://' + CONFIG.mongo.user + ':' + CONFIG.mongo.password + '@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats'
|
|
const OPTS = {
|
|
useNewUrlParser: true,
|
|
useUnifiedTopology: true
|
|
};
|
|
MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
const db = client.db('fluent');
|
|
//db.collection('log', onCollection);
|
|
console.log(JSON.stringify('mongo connected'))
|
|
var stream = db.collection('logs').find({}, {
|
|
tailable: true,
|
|
awaitdata: true
|
|
/* other options */
|
|
}).stream();
|
|
|
|
stream.on('data', function (doc) {
|
|
console.log(JSON.stringify(doc))
|
|
//socket.iWrite(JSON.stringify({'action': 'log','param': doc.log}));
|
|
});
|
|
}
|
|
});
|
|
var RES = new Object();
|
|
RES.code = req.query['filter']
|
|
RES.token = req.query['filter']
|
|
var isvalid = await checkToken(RES.token);
|
|
if (isvalid.action == 'ok') {
|
|
console.log('Authserver ok ' + RES.token);
|
|
} else {
|
|
console.log('Authserver no ' + RES.token);
|
|
}
|
|
RES.error = false
|
|
RES.error_msg = 'ok'
|
|
res.json(RES)
|
|
})()
|
|
|
|
});
|
|
|
|
// ***************************************************
|
|
// rest post
|
|
// ***************************************************
|
|
|
|
app.post('/run', [
|
|
//check('access_token').isLength({ min: 40 }),
|
|
//check('llo').isBase64()
|
|
],
|
|
cors(corsOptions), (req, res, next) => {
|
|
|
|
|
|
(async () => {
|
|
|
|
//console.log(JSON.stringify(req.headers));
|
|
//console.log(JSON.stringify(req.body));
|
|
//console.log("mongo "+JSON.stringify(req.body));
|
|
//console.log("LOG "+JSON.stringify(req.body[0].message));
|
|
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
|
|
for (var i = 0; i < req.body.length; i++) {
|
|
//var getpath = await onlogfile(req.body[i].tailed_path)
|
|
|
|
var path = req.body[i].tailed_path
|
|
|
|
console.log('File', path, 'has been added');
|
|
var pathfileval = pathmodule.basename(path);
|
|
var arrfile = pathfileval.toString().split('-');
|
|
var pathfile = arrfile[0];
|
|
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile == pathfileval);
|
|
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
|
|
if (indexfind1 === -1) {
|
|
(async () => {
|
|
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
|
|
var token = 'd2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407'; // desto
|
|
var resdata = await getpipelines(token, pathfile)
|
|
//resdata.data.pathlogfile = 'test'
|
|
var resob = {}
|
|
resob.pathlogfile = pathfileval
|
|
var resobarray = []
|
|
for (let i in resdata.data) {
|
|
var resob1 = {}
|
|
resob1.data = resdata.data[i].res25swarmlabname
|
|
resob1.user25user = resdata.data[i].res25user
|
|
resob1.res25creator = resdata.data[i].res25creator
|
|
resob1.res25fileforce = resdata.data[i].res25fileforce
|
|
resobarray.push(resob1)
|
|
}
|
|
resob.data = resobarray
|
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
|
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile == pathfileval);
|
|
|
|
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
|
|
indexfind === -1 ? global.pipelines.push(resob) : console.log('object already exists ' + pathfileval)
|
|
|
|
//console.log('info', JSON.stringify(resdata));
|
|
//console.log('info------------- ', JSON.stringify(global.pipelines));
|
|
})()
|
|
}
|
|
//
|
|
var obj = req.body[i];
|
|
|
|
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
|
|
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
|
|
var now = new Date();
|
|
|
|
var reslog = new Object();
|
|
reslog.log = obj
|
|
|
|
reslog.date = convertDateToUTC(now)
|
|
console.log(reslog);
|
|
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
|
|
var indexfind = global.pipelines.findIndex(x => x.pathlogfile == pathfileval);
|
|
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
|
|
indexfind === -1 ? console.log('object not found') : sendlog(reslog, pathfileval)
|
|
console.log('IOT ' + JSON.stringify(reslog.log.tailed_path));
|
|
console.log('IOTindexfind ' + JSON.stringify(indexfind));
|
|
console.log('IOTuser ' + JSON.stringify(global.pipelines));
|
|
// io.in("iot").emit("message", reslog);
|
|
// io.emit("logdata", reslog);
|
|
}
|
|
})()
|
|
|
|
//io.in("iot").emit("message", RES);
|
|
|
|
console.error('socket POST from client');
|
|
var RES = new Object();
|
|
RES.error = false
|
|
RES.error_msg = 'ok'
|
|
RES.msg = req.body[0].messsage
|
|
|
|
res.json(RES)
|
|
});
|
|
|
|
// ***************************************************
|
|
// socket
|
|
// ***************************************************
|
|
|
|
//function getSHA256ofJSON(input){
|
|
// return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
|
|
//}
|
|
|
|
function sleep(ms) {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|
|
|
|
function getSHA256ofJSON(data, inputEncoding, encoding) {
|
|
if (!data) {
|
|
return '';
|
|
}
|
|
inputEncoding = inputEncoding || 'utf-8';
|
|
encoding = encoding || 'hex';
|
|
const hash = require('crypto').createHash('md5');
|
|
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding);
|
|
}
|
|
|
|
//var getkey = function getkey(key) {
|
|
async function getkey(key) {
|
|
return new Promise((resolve) => {
|
|
|
|
|
|
pubClient.get(key, function (err, reply) {
|
|
if (err) {
|
|
|
|
console.log('----------error------------')
|
|
|
|
resolve(null)
|
|
} else {
|
|
if (reply) {
|
|
console.log('---------fount----------')
|
|
resolve(1)
|
|
} else {
|
|
console.log('----------not fount------------')
|
|
resolve(2)
|
|
//return 2
|
|
}
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
var setkey = function setkv(key, value) {
|
|
return new Promise((resolve) => {
|
|
//pubClient.set(key,value, 'EX', expire, function(err,reply){
|
|
pubClient.set(key, value, function (err, reply) {
|
|
if (err) {
|
|
resolve(null)
|
|
} else {
|
|
resolve(reply)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
async function iosend(data, issend, io, pubClient, user1) {
|
|
var new1 = {}
|
|
new1.tailed_path = data.tailed_path
|
|
new1.message = data.message
|
|
|
|
var now = new Date();
|
|
var reslog1 = {}
|
|
//reslog1.data = resob1
|
|
reslog1.log = new1
|
|
reslog1.date = convertDateToUTC(now)
|
|
var user = user1
|
|
|
|
const randomTimeInMs = Math.random() * (2000);
|
|
await sleep(randomTimeInMs);
|
|
var getres = await getkey(issend);
|
|
|
|
if (getres == '1') {
|
|
console.log(issend + ' ---1 ' + JSON.stringify(reslog1))
|
|
//io.in(user).emit("logdata", reslog1);
|
|
} else if (getres == '2') {
|
|
console.log(issend + ' ---2 ' + JSON.stringify(reslog1))
|
|
setkey(issend, '1')
|
|
//pubClient.set(issend, '1', function(err, res) {
|
|
//});
|
|
io.in(user).emit('logdata', reslog1);
|
|
//}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
function onCollection(err, collection) {
|
|
let options = {
|
|
tailable: true,
|
|
awaitdata: true,
|
|
numberOfRetries: -1,
|
|
tailableRetryInterval: 500
|
|
};
|
|
var cursor = collection.find({}, options).stream();
|
|
var itemsProcessed = 0;
|
|
|
|
var reslog = new Object();
|
|
var now = new Date();
|
|
cursor.on('data', function (data) {
|
|
var issendob = new Object();
|
|
issendob.id = data._id
|
|
issendob.message = data.message
|
|
issendob.tailed_path = data.tailed_path
|
|
|
|
var issend = getSHA256ofJSON(issendob)
|
|
|
|
console.log('++++++++' + JSON.stringify(data));
|
|
console.log('++++++++ob' + JSON.stringify(issendob));
|
|
console.log('++++++++sha' + JSON.stringify(issend));
|
|
|
|
var pathfileval = pathmodule.basename(data.tailed_path);
|
|
var arrfile = pathfileval.toString().split('-');
|
|
var pathfile = arrfile[0];
|
|
|
|
var indexupdate = 'yes'
|
|
var resob = {};
|
|
pubClient.get(pathfileval, function (err, object) {
|
|
var objecttmp = JSON.parse(object);
|
|
if (object) {
|
|
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, '')
|
|
iosend(data, issend, io, pubClient, user1)
|
|
} else {
|
|
(async () => {
|
|
var token = 'd2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407'; // desto
|
|
var resdata1 = await getpipelines(token, pathfile)
|
|
resob.pathlogfile = pathfileval
|
|
var resob11 = {}
|
|
var i1 = 0
|
|
resob11.data = resdata1.data[i1].res25swarmlabname
|
|
resob11.user25user = resdata1.data[i1].res25user.replace(/[\n\t\r]/g, '')
|
|
resob11.res25creator = resdata1.data[i1].res25creator
|
|
resob11.res25fileforce = resdata1.data[i1].res25fileforce
|
|
resob11.tailed_path = pathfileval
|
|
var resob1string1 = JSON.stringify(resob11);
|
|
await pubClient.set(pathfileval, resob1string1, function (err, res) {
|
|
});
|
|
var user1 = resob11.user25user
|
|
iosend(data, issend, io, pubClient, user1)
|
|
console.log(' ---no--- ' + JSON.stringify(data))
|
|
})() //await inside yes
|
|
}
|
|
});
|
|
|
|
});
|
|
|
|
|
|
setInterval(function () {
|
|
console.log('itemsProcessed', itemsProcessed);
|
|
// this method is also exposed by the Server instance
|
|
//console.log(adapter.rooms);
|
|
}, 8000);
|
|
}
|
|
|
|
|
|
var mongourl = 'mongodb://' + CONFIG.mongo.user + ':' + CONFIG.mongo.password + '@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1&authSource=swarmlabplaygroundstats'
|
|
const OPTS = {
|
|
useNewUrlParser: true,
|
|
useUnifiedTopology: true
|
|
};
|
|
var mongooptions = {
|
|
autoReconnect: true,
|
|
keepAlive: 1,
|
|
connectTimeoutMS: 30000,
|
|
socketTimeoutMS: 0
|
|
}
|
|
MongoClient.connect(mongourl, OPTS, function (err, client) {
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
const db = client.db('fluent');
|
|
db.collection('logs', onCollection);
|
|
}
|
|
});
|
|
|
|
io.on('connection', s => {
|
|
console.error('socket connection');
|
|
|
|
//s.set('transports', ['websocket']);
|
|
//s.set('pingTimeout', 30000);
|
|
//s.set('allowUpgrades', false);
|
|
//s.set('serveClient', false);
|
|
//s.set('pingInterval', 10000);
|
|
// ------------------------------
|
|
// --- set
|
|
// ------------------------------
|
|
var usersession = new Object();
|
|
usersession.SOCKET = {};
|
|
usersession.SOCKET.error = {};
|
|
console.error('socket ...');
|
|
s.auth = false;
|
|
|
|
// ------------------------------
|
|
// --- authenticate
|
|
// ------------------------------
|
|
s.on('authenticate', function (data) {
|
|
const token = data
|
|
console.log('invalid 1 ' + token);
|
|
(async () => {
|
|
var isvalid = await checkToken(token);
|
|
if (isvalid.action == 'ok') {
|
|
console.log('Authserver ok ', s.id + ' - ' + token);
|
|
// pubClient.set(session, resob1string, function(err, res) {
|
|
// });
|
|
usersession.SOCKET.user = isvalid.user
|
|
usersession.SOCKET.scope = isvalid.scope // space delimeter
|
|
usersession.SOCKET.token = isvalid.token
|
|
s.auth = true;
|
|
} else {
|
|
console.log('Authserver no ', s.id + ' - ' + token);
|
|
s.auth = false;
|
|
}
|
|
})()
|
|
});
|
|
|
|
setTimeout(function () {
|
|
if (!s.auth) {
|
|
console.log('Disconnecting timeout socket ', s.id);
|
|
//s.disconnect('unauthorized');
|
|
} else {
|
|
var room = usersession.SOCKET.user
|
|
//s.on("subscribe", function (room) {
|
|
s.join(room);
|
|
console.log('joining rooom', s.rooms);
|
|
console.log(room + ' created ')
|
|
// });
|
|
}
|
|
}, 30000);
|
|
|
|
|
|
var id = s.id
|
|
s.on('log', obj => {
|
|
console.error('from client ' + s.id + ' obj ' + obj);
|
|
});
|
|
|
|
});
|
|
|
|
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
|
|
console.error('socket.io example');
|