@ -1,643 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
const io = require("socket.io")(http, {
// pingTimeout: 30000,
// allowUpgrades: false,
// serveClient: false,
// pingInterval: 10000,
// //transports: [ 'websocket', 'polling' ],
// transports: [ 'polling', 'websocket' ],
cors: {
origin: "http://localhost:8080",
methods: ["GET", "POST"],
allowedHeaders: ["my-custom-header"],
credentials: true
cookie: {
name: "test",
httpOnly: false,
path: "/custom"
const Redis = require("ioredis");
const redistest = new Redis({
host: 'redisserver',
port: 6379,
const pubtest = new Redis({
host: 'redisserver',
port: 6379,
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//const RedisClient = require("redis");
const Redis = require("ioredis");
//const pubClient = RedisClient.createClient({
const pubClient = new Redis({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
const MongoClient = require('mongodb').MongoClient;
var async = require("async");
const { check, validationResult } = require('express-validator');
const urlExistSync = require("url-exist-sync");
var express = require('express');
const axios = require('axios');
axios.defaults.timeout = 30000
const helmet = require('helmet');
const cors = require('cors')
const whitelist = [
const corsOptions = {
credentials: true,
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function(origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, true)
//callback(new Error('Not allowed by CORS'))
// ***************************************************
// checktoken
// ***************************************************
async function checkToken(token) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
try {
var pipelines = {
var params = {
pipeline: pipelines
var options = {
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
const res = await instance.post('/istokenvalidsso',params,options);
if(res.status == 200){
//console.log("check " +JSON.stringify(res.data))
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function convertDateToUTC(date) {
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds());
// ***************************************************
// get pipelines
// ***************************************************
async function getpipelines(token,pipelinename) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
var params = {
playbook: value
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
const playbook = await api.GET('playbookCode',options);
return playbook
try {
var pipelines = {
//var params = {
// pipeline: pipelines
// }
var params = {
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
//const res = await instance.get('/getplaygrounds',params,options);
const res = await instance.get('/getplaygrounds',options);
if(res.status == 200){
return res.data
console.log("noerror: " + res)
return await res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return await error
function sendlog(reslog,pathfileval){
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//for (var key in usertmp.data){
var user = usertmp.data[0].user25user;
// if(usertmp.data){
console.log('-----------------------' + JSON.stringify(usertmp));
io.in(user).emit("logdata", reslog);
// }
function onlogfile(path){
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
// ***************************************************
// rest get
// ***************************************************
app.get('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
//db.collection('log', onCollection);
console.log(JSON.stringify('mongo connected'))
var stream = db.collection('logs').find({}, {
tailable: true,
awaitdata: true
/* other options */
stream.on('data', function (doc) {
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
var RES = new Object();
RES.code = req.query["filter"]
RES.token = req.query["filter"]
var isvalid = await checkToken(RES.token);
if(isvalid.action == 'ok'){
console.log("Authserver ok " + RES.token);
console.log("Authserver no " + RES.token);
RES.error = false
RES.error_msg = "ok"
// ***************************************************
// rest post
// ***************************************************
app.post('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
//console.log("mongo "+JSON.stringify(req.body));
//console.log("LOG "+JSON.stringify(req.body[0].message));
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
for (var i = 0; i < req.body.length; i++){
//var getpath = await onlogfile(req.body[i].tailed_path)
var path = req.body[i].tailed_path
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
//console.log('info', JSON.stringify(resdata));
//console.log('info------------- ', JSON.stringify(global.pipelines));
var obj = req.body[i];
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
var now = new Date();
var reslog = new Object();
reslog.log = obj
reslog.date = convertDateToUTC(now)
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval)
console.log("IOT "+JSON.stringify(reslog.log.tailed_path));
console.log("IOTindexfind "+JSON.stringify(indexfind));
console.log("IOTuser "+JSON.stringify(global.pipelines));
// io.in("iot").emit("message", reslog);
// io.emit("logdata", reslog);
//io.in("iot").emit("message", RES);
console.error('socket POST from client');
var RES = new Object();
RES.error = false
RES.error_msg = "ok"
RES.msg = req.body[0].messsage
// ***************************************************
// socket
// ***************************************************
function getSHA256ofJSON(input){
return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
function onCollection(err, collection) {
let options = { tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500
var cursor = collection.find({},options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var reslog1 = new Object();
var now = new Date();
cursor.on('data', function (data) {
(async() => {
var issendob = {};
issendob.message = data.message
issendob.tailed_path = data.tailed_path
var issend = getSHA256ofJSON(issendob)
console.log('++++++++' + JSON.stringify(data));
console.log('++++++++' + JSON.stringify(issend));
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = "yes"
var resob = {}
await pubClient.get(pathfileval, function(err, object) {
console.log('----------------' + err + '<<<<<<<<<<<<<<<<<<<<<<' + object);
indexupdate = "no"
console.log('redis '+JSON.stringify(object));
console.log('update '+JSON.stringify(indexupdate));
if (indexupdate == "yes" ){
(async() => {
io.in('anagnostopoulos@uniwa.gr').emit("logdata", reslog1);
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
resob.pathlogfile = pathfileval
var resobarray = []
var resob1 = {}
var i = 0
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob1.tailed_path = pathfileval
var resob1string = JSON.stringify(resob1);
pubClient.set(pathfileval, resob1string, function(err, res) {
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
var user = resob1.res25creator
console.log('datauser ' + JSON.stringify(user));
console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(reslog));
await pubClient.get(issend, function(err, object) {
if(err == null){
pubClient.set(issend, itemsProcessed, function(err, res) {
io.in(user).emit("logdata", reslog);
})() //await inside yes
(async() => {
await pubClient.get(pathfileval, function(err, object) {
var objecttmp = JSON.parse(object);
var resob1 = {}
resob1.data = objecttmp.res25swarmlabname
resob1.user25user = objecttmp.res25user
resob1.res25creator = objecttmp.res25creator
resob1.res25fileforce = objecttmp.res25fileforce
resob1.tailed_path = objecttmp.tailed_path
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
console.log('<<<<<<<<<<<---------------------<<<<<<<<<<<<<<<---------------------------<<<<<<<<<<<< '+JSON.stringify(object));
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< '+JSON.stringify(reslog));
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<obj>>>>>>>>>>>>>>>>>> '+JSON.stringify(resob1));
//var user = objecttmp.user25user
var user = resob1.res25creator
//io.in(user).emit("logdata", reslog);
//var user = 'anagnostopoulos@uniwa.gr'
pubClient.get(issend).then(function (result) {
console.log("---result--- "+result); // Prints "bar"
io.in(user).emit("logdata", reslog);
pubClient.get(issend, function(err, object) {
if(err == null){
pubClient.set(issend, itemsProcessed, function(err, res) {
//io.in(user).emit("logdata", reslog);
})() //await inside no
}); //redis get
})() //async
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
// this method is also exposed by the Server instance
}, 8000);
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
var mongooptions = {
autoReconnect: true,
keepAlive: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 0
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
db.collection('logs', onCollection);
io.on('connection', s => {
console.error('socket connection');
//s.set('transports', ['websocket']);
//s.set('pingTimeout', 30000);
//s.set('allowUpgrades', false);
//s.set('serveClient', false);
//s.set('pingInterval', 10000);
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error('socket ...');
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on('authenticate', function(data){
const token = data
console.log('invalid 1 ' + token);
(async() => {
var isvalid = await checkToken(token);
if(isvalid.action == 'ok'){
console.log("Authserver ok ", s.id + ' - ' + token);
// pubClient.set(session, resob1string, function(err, res) {
// });
usersession.SOCKET.user = isvalid.user
usersession.SOCKET.scope = isvalid.scope // space delimeter
usersession.SOCKET.token = isvalid.token
s.auth = true;
console.log("Authserver no ", s.id + ' - ' + token);
s.auth = false;
if (!s.auth) {
console.log("Disconnecting timeout socket ", s.id);
var room = usersession.SOCKET.user
//s.on("subscribe", function (room) {
console.log("joining rooom", s.rooms);
console.log(room + ' created ')
// });
}, 30000);
var id = s.id
s.on('log', obj => {
console.error('from client '+ s.id + ' obj ' + obj);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');


@ -1,91 +0,0 @@
var path = require('path');
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
const cors = require("cors");
//const socketAuth = require('socketio-auth');
//const whitelist = ["http://localhost:8080"];
const whitelist = ["*"];
const corsOptions = {
credentials: true,
methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function (origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true);
} else {
callback(null, true);
//callback(new Error('Not allowed by CORS'))
//check('access_token').isLength({ min: 40 }),
(req, res, next) => {
var RES = new Object();
RES.code = req.query["code"];
console.error("socket GET from client " + RES.code);
RES.error = false;
RES.error_msg = "ok";
io.emit("iotdata", RES);
io.in("iot").emit("message", RES);
//check('access_token').isLength({ min: 40 }),
(req, res) => {
var data = req.query["input"];
var RES = new Object();
console.error(`Client called GET from axios`);
io.on("log", (data) => {
console.log(JSON.stringify("d " + data));
console.error(JSON.stringify("c " + data));
io.on("connection", (s) => {
console.error(`\nSomeone connected to port 3000`);
var id = s.id;
s.on("log", (data) => {
http.listen(3000, () => console.error("listening on"));
console.error("Run demo project");
console.log("Hello World!");


@ -1,575 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
var io = require('socket.io')(http);
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//import { RedisClient } from 'redis';
const RedisClient = require("redis");
const pubClient = RedisClient.createClient({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
const MongoClient = require('mongodb').MongoClient;
//var chokidar = require("chokidar");
//var logpath = "/var/lab/playground-serverlogs";
//var watcher = chokidar.watch(logpath, {
// ignored: /[\/\\]\./,
// awaitWriteFinish: true,
// persistent: true
// });
var async = require("async");
const { check, validationResult } = require('express-validator');
const urlExistSync = require("url-exist-sync");
var express = require('express');
const axios = require('axios');
axios.defaults.timeout = 30000
const helmet = require('helmet');
const cors = require('cors')
const whitelist = [
const corsOptions = {
credentials: true,
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function(origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, true)
//callback(new Error('Not allowed by CORS'))
// ***************************************************
// checktoken
// ***************************************************
async function checkToken(token) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
try {
var pipelines = {
var params = {
pipeline: pipelines
var options = {
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
const res = await instance.post('/istokenvalidsso',params,options);
if(res.status == 200){
//console.log("check " +JSON.stringify(res.data))
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function convertDateToUTC(date) {
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds());
// ***************************************************
// get pipelines
// ***************************************************
async function getpipelines(token,pipelinename) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
var params = {
playbook: value
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
const playbook = await api.GET('playbookCode',options);
return playbook
try {
var pipelines = {
//var params = {
// pipeline: pipelines
// }
var params = {
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
//const res = await instance.get('/getplaygrounds',params,options);
const res = await instance.get('/getplaygrounds',options);
if(res.status == 200){
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function sendlog(reslog,pathfileval){
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//for (var key in usertmp.data){
var user = usertmp.data[0].user25user;
// if(usertmp.data){
console.log('-----------------------' + JSON.stringify(usertmp));
io.in(user).emit("logdata", reslog);
// }
function onlogfile(path){
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
// ***************************************************
// rest get
// ***************************************************
app.get('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
//db.collection('log', onCollection);
console.log(JSON.stringify('mongo connected'))
var stream = db.collection('logs').find({}, {
tailable: true,
awaitdata: true
/* other options */
stream.on('data', function (doc) {
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
var RES = new Object();
RES.code = req.query["filter"]
RES.token = req.query["filter"]
var isvalid = await checkToken(RES.token);
if(isvalid.action == 'ok'){
console.log("Authserver ok " + RES.token);
console.log("Authserver no " + RES.token);
RES.error = false
RES.error_msg = "ok"
// ***************************************************
// rest post
// ***************************************************
app.post('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
//console.log("mongo "+JSON.stringify(req.body));
//console.log("LOG "+JSON.stringify(req.body[0].message));
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
for (var i = 0; i < req.body.length; i++){
//var getpath = await onlogfile(req.body[i].tailed_path)
var path = req.body[i].tailed_path
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
//console.log('info', JSON.stringify(resdata));
//console.log('info------------- ', JSON.stringify(global.pipelines));
var obj = req.body[i];
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
var now = new Date();
var reslog = new Object();
reslog.log = obj
reslog.date = convertDateToUTC(now)
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval)
console.log("IOT "+JSON.stringify(reslog.log.tailed_path));
console.log("IOTindexfind "+JSON.stringify(indexfind));
console.log("IOTuser "+JSON.stringify(global.pipelines));
// io.in("iot").emit("message", reslog);
// io.emit("logdata", reslog);
//io.in("iot").emit("message", RES);
console.error('socket POST from client');
var RES = new Object();
RES.error = false
RES.error_msg = "ok"
RES.msg = req.body[0].messsage
// ***************************************************
// socket
// ***************************************************
io.origins('*:*') // for latest version
function onCollection(err, collection) {
let options = { tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500
var cursor = collection.find({},options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var now = new Date();
cursor.on('data', function (data) {
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = true
//var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
pubClient.hgetall(pathfileval, function(err, object) {
indexupdate = false
console.log('redis '+JSON.stringify(object));
var resob = {}
if (indexupdate ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
console.log('file2222222222222222222222222222222222222------------------------------ ' + JSON.stringify(resdata.data))
resob.pathlogfile = pathfileval
var resobarray = []
//for (let i in resdata.data) {
var resob1 = {}
var i = 0
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob1.tailed_path = pathfileval
var resob1string = JSON.stringify(resob1);
pubClient.hgetall(pathfileval, function(err, object) {
console.log('redis>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(object));
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
//var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
reslog.log = data
reslog.date = convertDateToUTC(now)
console.log('data ' + JSON.stringify(reslog));
//var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//var user = usertmp.data[0].user25user;
var user = "anagnostopoulos@uniwa.gr"
console.log('datauser ' + JSON.stringify(user));
io.in(user).emit("logdata", reslog);
reslog.log = data
reslog.date = convertDateToUTC(now)
console.log('dataelse ' + JSON.stringify(reslog));
//var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//var user = usertmp.data[0].user25user;
///console.log('datauser ' + JSON.stringify(user));
var user = "anagnostopoulos@uniwa.gr"
io.in(user).emit("logdata", reslog);
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
}, 1000);
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
//var mongolab_uri = "mongodb://<dbUser>:<dbPassword>@<host1>:<port1>,<host2>:<port2>/<dbName>?replicaSet=<replicaSetName>";
var mongooptions = {
autoReconnect: true,
keepAlive: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 0
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
db.collection('logs', onCollection);
io.on('connection', s => {
console.error('socket connection');
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error('socket ...');
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on('authenticate', function(data){
const token = data
console.log('invalid 1 ' + token);
(async() => {
var isvalid = await checkToken(token);
if(isvalid.action == 'ok'){
console.log("Authserver ok ", s.id + ' - ' + token);
usersession.SOCKET.user = isvalid.user
usersession.SOCKET.scope = isvalid.scope // space delimeter
usersession.SOCKET.token = isvalid.token
s.auth = true;
console.log("Authserver no ", s.id + ' - ' + token);
s.auth = false;
if (!s.auth) {
console.log("Disconnecting timeout socket ", s.id);
var room = usersession.SOCKET.user
//s.on("subscribe", function (room) {
// console.log("joining room", room);
// });
}, 30000);
var id = s.id
s.on('log', obj => {
console.error('from client '+ s.id + ' obj ' + obj);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');


@ -1,578 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
var io = require('socket.io')(http);
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//import { RedisClient } from 'redis';
const RedisClient = require("redis");
const pubClient = RedisClient.createClient({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
const MongoClient = require('mongodb').MongoClient;
//var chokidar = require("chokidar");
//var logpath = "/var/lab/playground-serverlogs";
//var watcher = chokidar.watch(logpath, {
// ignored: /[\/\\]\./,
// awaitWriteFinish: true,
// persistent: true
// });
var async = require("async");
const { check, validationResult } = require('express-validator');
const urlExistSync = require("url-exist-sync");
var express = require('express');
const axios = require('axios');
axios.defaults.timeout = 30000
const helmet = require('helmet');
const cors = require('cors')
const whitelist = [
const corsOptions = {
credentials: true,
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function(origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, true)
//callback(new Error('Not allowed by CORS'))
// ***************************************************
// checktoken
// ***************************************************
async function checkToken(token) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
try {
var pipelines = {
var params = {
pipeline: pipelines
var options = {
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
const res = await instance.post('/istokenvalidsso',params,options);
if(res.status == 200){
//console.log("check " +JSON.stringify(res.data))
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function convertDateToUTC(date) {
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds());
// ***************************************************
// get pipelines
// ***************************************************
async function getpipelines(token,pipelinename) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
var params = {
playbook: value
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
const playbook = await api.GET('playbookCode',options);
return playbook
try {
var pipelines = {
//var params = {
// pipeline: pipelines
// }
var params = {
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
//const res = await instance.get('/getplaygrounds',params,options);
const res = await instance.get('/getplaygrounds',options);
if(res.status == 200){
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function sendlog(reslog,pathfileval){
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//for (var key in usertmp.data){
var user = usertmp.data[0].user25user;
// if(usertmp.data){
console.log('-----------------------' + JSON.stringify(usertmp));
io.in(user).emit("logdata", reslog);
// }
function onlogfile(path){
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
// ***************************************************
// rest get
// ***************************************************
app.get('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
//db.collection('log', onCollection);
console.log(JSON.stringify('mongo connected'))
var stream = db.collection('logs').find({}, {
tailable: true,
awaitdata: true
/* other options */
stream.on('data', function (doc) {
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
var RES = new Object();
RES.code = req.query["filter"]
RES.token = req.query["filter"]
var isvalid = await checkToken(RES.token);
if(isvalid.action == 'ok'){
console.log("Authserver ok " + RES.token);
console.log("Authserver no " + RES.token);
RES.error = false
RES.error_msg = "ok"
// ***************************************************
// rest post
// ***************************************************
app.post('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
//console.log("mongo "+JSON.stringify(req.body));
//console.log("LOG "+JSON.stringify(req.body[0].message));
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
for (var i = 0; i < req.body.length; i++){
//var getpath = await onlogfile(req.body[i].tailed_path)
var path = req.body[i].tailed_path
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
//console.log('info', JSON.stringify(resdata));
//console.log('info------------- ', JSON.stringify(global.pipelines));
var obj = req.body[i];
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
var now = new Date();
var reslog = new Object();
reslog.log = obj
reslog.date = convertDateToUTC(now)
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval)
console.log("IOT "+JSON.stringify(reslog.log.tailed_path));
console.log("IOTindexfind "+JSON.stringify(indexfind));
console.log("IOTuser "+JSON.stringify(global.pipelines));
// io.in("iot").emit("message", reslog);
// io.emit("logdata", reslog);
//io.in("iot").emit("message", RES);
console.error('socket POST from client');
var RES = new Object();
RES.error = false
RES.error_msg = "ok"
RES.msg = req.body[0].messsage
// ***************************************************
// socket
// ***************************************************
io.origins('*:*') // for latest version
function onCollection(err, collection) {
let options = { tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500
var cursor = collection.find({},options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var now = new Date();
cursor.on('data', function (data) {
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = true
var resob = {}
//var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
pubClient.hgetall(pathfileval, function(err, object) {
indexupdate = false
console.log('redis '+JSON.stringify(object));
if (indexupdate ){
(async() => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
console.log('------------------------------ ' + JSON.stringify(resdata.data))
resob.pathlogfile = pathfileval
var resobarray = []
//for (let i in resdata.data) {
var resob1 = {}
var i = 0
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob1.tailed_path = pathfileval
var resob1string = JSON.stringify(resob1);
console.log('+++++++++++++++++' + resob1string + '<<<<<<<<<<<<<<<<<<<<<<' + pathfileval);
pubClient.hmset(pathfileval, resob1string, function(err, res) {
await pubClient.hgetall(pathfileval, function(err, object) {
console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(object));
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
//var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
reslog.log = data
reslog.date = convertDateToUTC(now)
//console.log('data ' + JSON.stringify(reslog));
//var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//var user = usertmp.data[0].user25user;
var user = "anagnostopoulos@uniwa.gr"
console.log('datauser ' + JSON.stringify(user));
io.in(user).emit("logdata", reslog);
reslog.log = data
reslog.date = convertDateToUTC(now)
//console.log('dataelse ' + JSON.stringify(reslog));
//var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//var user = usertmp.data[0].user25user;
///console.log('datauser ' + JSON.stringify(user));
var user = "anagnostopoulos@uniwa.gr"
io.in(user).emit("logdata", reslog);
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
}, 1000);
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
//var mongolab_uri = "mongodb://<dbUser>:<dbPassword>@<host1>:<port1>,<host2>:<port2>/<dbName>?replicaSet=<replicaSetName>";
var mongooptions = {
autoReconnect: true,
keepAlive: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 0
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
db.collection('logs', onCollection);
io.on('connection', s => {
console.error('socket connection');
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error('socket ...');
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on('authenticate', function(data){
const token = data
console.log('invalid 1 ' + token);
(async() => {
var isvalid = await checkToken(token);
if(isvalid.action == 'ok'){
console.log("Authserver ok ", s.id + ' - ' + token);
usersession.SOCKET.user = isvalid.user
usersession.SOCKET.scope = isvalid.scope // space delimeter
usersession.SOCKET.token = isvalid.token
s.auth = true;
console.log("Authserver no ", s.id + ' - ' + token);
s.auth = false;
if (!s.auth) {
console.log("Disconnecting timeout socket ", s.id);
var room = usersession.SOCKET.user
//s.on("subscribe", function (room) {
// console.log("joining room", room);
// });
}, 30000);
var id = s.id
s.on('log', obj => {
console.error('from client '+ s.id + ' obj ' + obj);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');


@ -1,48 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
var io = require('socket.io')(http);
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//import { RedisClient } from 'redis';
const RedisClient = require("redis");
const pubClient = RedisClient.createClient({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
setInterval(function () {
var resob1 = {}
resob1.data = '1'
resob1.user25user = 'user'
var resob1string = JSON.stringify(resob1);
console.log('-------------------- '+JSON.stringify(resob1string));
var resob1string = 'test';
pubClient.hmset('ekjgpiegwerpowfmfsdfsdgsk', resob1string, function(err, res) {
console.log('>>>>>>>>>eroor>>>>>>>>>>>>>>>>>>> '+JSON.stringify(err));
console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(res));
pubClient.hgetall('ekjgpiegwerpowfmfsdfsdgsk', function(err, object) {
console.log('<<<<<<<<<<eroor<<<<<<<<<<<<<<<<< '+JSON.stringify(err));
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< '+JSON.stringify(object));
}, 3000);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');


@ -1,563 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
var io = require('socket.io')(http);
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//import { RedisClient } from 'redis';
const RedisClient = require("redis");
const pubClient = RedisClient.createClient({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
const MongoClient = require('mongodb').MongoClient;
var async = require("async");
const { check, validationResult } = require('express-validator');
const urlExistSync = require("url-exist-sync");
var express = require('express');
const axios = require('axios');
axios.defaults.timeout = 30000
const helmet = require('helmet');
const cors = require('cors')
const whitelist = [
const corsOptions = {
credentials: true,
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function(origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, true)
//callback(new Error('Not allowed by CORS'))
// ***************************************************
// checktoken
// ***************************************************
async function checkToken(token) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
try {
var pipelines = {
var params = {
pipeline: pipelines
var options = {
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
const res = await instance.post('/istokenvalidsso',params,options);
if(res.status == 200){
//console.log("check " +JSON.stringify(res.data))
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function convertDateToUTC(date) {
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds());
// ***************************************************
// get pipelines
// ***************************************************
async function getpipelines(token,pipelinename) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
var params = {
playbook: value
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
const playbook = await api.GET('playbookCode',options);
return playbook
try {
var pipelines = {
//var params = {
// pipeline: pipelines
// }
var params = {
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
//const res = await instance.get('/getplaygrounds',params,options);
const res = await instance.get('/getplaygrounds',options);
if(res.status == 200){
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function sendlog(reslog,pathfileval){
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//for (var key in usertmp.data){
var user = usertmp.data[0].user25user;
// if(usertmp.data){
console.log('-----------------------' + JSON.stringify(usertmp));
io.in(user).emit("logdata", reslog);
// }
function onlogfile(path){
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
// ***************************************************
// rest get
// ***************************************************
app.get('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
//db.collection('log', onCollection);
console.log(JSON.stringify('mongo connected'))
var stream = db.collection('logs').find({}, {
tailable: true,
awaitdata: true
/* other options */
stream.on('data', function (doc) {
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
var RES = new Object();
RES.code = req.query["filter"]
RES.token = req.query["filter"]
var isvalid = await checkToken(RES.token);
if(isvalid.action == 'ok'){
console.log("Authserver ok " + RES.token);
console.log("Authserver no " + RES.token);
RES.error = false
RES.error_msg = "ok"
// ***************************************************
// rest post
// ***************************************************
app.post('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
//console.log("mongo "+JSON.stringify(req.body));
//console.log("LOG "+JSON.stringify(req.body[0].message));
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
for (var i = 0; i < req.body.length; i++){
//var getpath = await onlogfile(req.body[i].tailed_path)
var path = req.body[i].tailed_path
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
//console.log('info', JSON.stringify(resdata));
//console.log('info------------- ', JSON.stringify(global.pipelines));
var obj = req.body[i];
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
var now = new Date();
var reslog = new Object();
reslog.log = obj
reslog.date = convertDateToUTC(now)
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval)
console.log("IOT "+JSON.stringify(reslog.log.tailed_path));
console.log("IOTindexfind "+JSON.stringify(indexfind));
console.log("IOTuser "+JSON.stringify(global.pipelines));
// io.in("iot").emit("message", reslog);
// io.emit("logdata", reslog);
//io.in("iot").emit("message", RES);
console.error('socket POST from client');
var RES = new Object();
RES.error = false
RES.error_msg = "ok"
RES.msg = req.body[0].messsage
// ***************************************************
// socket
// ***************************************************
io.origins('*:*') // for latest version
function onCollection(err, collection) {
let options = { tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500
var cursor = collection.find({},options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var now = new Date();
cursor.on('data', function (data) {
console.log('+++++++++ondata<<<<<<<<<<<<<<<<<<<<<<' + data);
io.in('anagnostopoulos@uniwa.gr').emit("logdata", data);
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = true
var resob = {}
//var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
pubClient.get(pathfileval, function(err, object) {
indexupdate = false
console.log('redis '+JSON.stringify(object));
if (indexupdate ){
(async() => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
resob.pathlogfile = pathfileval
var resobarray = []
var resob1 = {}
var i = 0
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob1.tailed_path = pathfileval
var resob1string = JSON.stringify(resob1);
console.log('+++++++++++++++++' + resob1string + '<<<<<<<<<<<<<<<<<<<<<<' + pathfileval);
pubClient.set(pathfileval, resob1string, function(err, res) {
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
var user = resob1.res25creator
console.log('datauser ' + JSON.stringify(user));
//io.in(user).emit("logdata", reslog);
var user = 'anagnostopoulos@uniwa.gr'
io.in(user).emit("logdata", reslog);
pubClient.get(pathfileval, function(err, object) {
var resob1 = {}
var i = 0
resob1.data = object.res25swarmlabname
resob1.user25user = object.res25user
resob1.res25creator = object.res25creator
resob1.res25fileforce = object.res25fileforce
resob1.tailed_path = object.tailed_path
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
console.log('>>>>>>>>>>2222222222222>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(object));
var user = object.user25user
io.in(user).emit("logdata", reslog);
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
}, 1000);
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
var mongooptions = {
autoReconnect: true,
keepAlive: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 0
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
db.collection('logs', onCollection);
io.on('connection', s => {
console.error('socket connection');
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error('socket ...');
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on('authenticate', function(data){
const token = data
console.log('invalid 1 ' + token);
(async() => {
var isvalid = await checkToken(token);
if(isvalid.action == 'ok'){
console.log("Authserver ok ", s.id + ' - ' + token);
usersession.SOCKET.user = isvalid.user
usersession.SOCKET.scope = isvalid.scope // space delimeter
usersession.SOCKET.token = isvalid.token
s.auth = true;
console.log("Authserver no ", s.id + ' - ' + token);
s.auth = false;
if (!s.auth) {
console.log("Disconnecting timeout socket ", s.id);
var room = usersession.SOCKET.user
//s.on("subscribe", function (room) {
// console.log("joining room", room);
// });
}, 30000);
var id = s.id
s.on('log', obj => {
console.error('from client '+ s.id + ' obj ' + obj);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');


@ -1,697 +0,0 @@
"use strict"
var pathmodule = require('path');
var app = require('express')();
var http = require('http').Server(app);
var https = require('https');
const io = require("socket.io")(http, {
// pingTimeout: 30000,
// allowUpgrades: false,
// serveClient: false,
// pingInterval: 10000,
// //transports: [ 'websocket', 'polling' ],
// transports: [ 'polling', 'websocket' ],
cors: {
origin: "http://localhost:8080",
methods: ["GET", "POST"],
allowedHeaders: ["my-custom-header"],
credentials: true
cookie: {
name: "test",
httpOnly: false,
path: "/custom"
const Redis = require("ioredis");
const redistest = new Redis({
host: 'redisserver',
port: 6379,
const pubtest = new Redis({
host: 'redisserver',
port: 6379,
//import { createAdapter } from 'socket.io-redis';
const createAdapter = require('socket.io-redis');
//const RedisClient = require("redis");
const Redis = require("ioredis");
//const pubClient = RedisClient.createClient({
const pubClient = new Redis({
host: 'redisserver',
port: 6379,
//const pubClient = new RedisClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter({ pubClient, subClient }));
pubClient.on("connect", function() {
console.log("You are now connected");
const MongoClient = require('mongodb').MongoClient;
var async = require("async");
const { check, validationResult } = require('express-validator');
const urlExistSync = require("url-exist-sync");
var express = require('express');
const axios = require('axios');
axios.defaults.timeout = 30000
const helmet = require('helmet');
const cors = require('cors')
const whitelist = [
const corsOptions = {
credentials: true,
methods: ['GET', 'PUT', 'POST', 'DELETE', 'OPTIONS'],
optionsSuccessStatus: 200, // some legacy browsers (IE11, various SmartTVs) choke on 204
allowedHeaders: [
origin: function(origin, callback) {
if (whitelist.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, true)
//callback(new Error('Not allowed by CORS'))
// ***************************************************
// checktoken
// ***************************************************
async function checkToken(token) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
try {
var pipelines = {
var params = {
pipeline: pipelines
var options = {
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
const res = await instance.post('/istokenvalidsso',params,options);
if(res.status == 200){
//console.log("check " +JSON.stringify(res.data))
return res.data
console.log("noerror: " + res)
return res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return error
function convertDateToUTC(date) {
return new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), date.getUTCMinutes(), date.getUTCSeconds(),date.getUTCMilliseconds());
// ***************************************************
// get pipelines
// ***************************************************
async function getpipelines(token,pipelinename) {
const agent = new https.Agent({
rejectUnauthorized: false,
const instance = axios.create({
baseURL: 'https://api.swarmlab.io',
withCredentials: true,
rejectUnauthorized: false,
crossdomain: true,
httpsAgent: agent,
headers: {
'Accept': 'application/json',
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer '+token
var params = {
playbook: value
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
const playbook = await api.GET('playbookCode',options);
return playbook
try {
var pipelines = {
//var params = {
// pipeline: pipelines
// }
var params = {
var options = {
params: params,
headers: { 'content-type': 'application/x-www-form-urlencoded',Authorization: `Bearer ${token}` },
instance.defaults.timeout = 30000;
//const res = await instance.get('/getplaygrounds',params,options);
const res = await instance.get('/getplaygrounds',options);
if(res.status == 200){
return res.data
console.log("noerror: " + res)
return await res.status
catch (err) {
console.error("error: "+err);
var error = new Object();
error.action = '401'
return await error
function sendlog(reslog,pathfileval){
var usertmp = global.pipelines.find(x => x.pathlogfile==pathfileval);
//for (var key in usertmp.data){
var user = usertmp.data[0].user25user;
// if(usertmp.data){
console.log('-----------------------' + JSON.stringify(usertmp));
io.in(user).emit("logdata", reslog);
// }
function onlogfile(path){
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
// ***************************************************
// rest get
// ***************************************************
app.get('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
//db.collection('log', onCollection);
console.log(JSON.stringify('mongo connected'))
var stream = db.collection('logs').find({}, {
tailable: true,
awaitdata: true
/* other options */
stream.on('data', function (doc) {
//socket.write(JSON.stringify({'action': 'log','param': doc.log}));
var RES = new Object();
RES.code = req.query["filter"]
RES.token = req.query["filter"]
var isvalid = await checkToken(RES.token);
if(isvalid.action == 'ok'){
console.log("Authserver ok " + RES.token);
console.log("Authserver no " + RES.token);
RES.error = false
RES.error_msg = "ok"
// ***************************************************
// rest post
// ***************************************************
app.post('/run', [
//check('access_token').isLength({ min: 40 }),
cors(corsOptions), (req, res, next) => {
(async() => {
//console.log("mongo "+JSON.stringify(req.body));
//console.log("LOG "+JSON.stringify(req.body[0].message));
//console.log("PATH "+JSON.stringify(req.body[0].tailed_path));
for (var i = 0; i < req.body.length; i++){
//var getpath = await onlogfile(req.body[i].tailed_path)
var path = req.body[i].tailed_path
console.log('File', path, 'has been added');
var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
console.log('file11111111111111111111111111111111 ' + JSON.stringify(pathfileval))
if (indexfind1 === -1 ){
(async() => {
console.log('file2222222222222222222222222222222222222 ' + JSON.stringify(pathfileval))
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
//resdata.data.pathlogfile = 'test'
var resob = {}
resob.pathlogfile = pathfileval
var resobarray = []
for (let i in resdata.data) {
var resob1 = {}
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob.data = resobarray
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data[0].res25swarmlabname);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? global.pipelines.push({resob}) : console.log("object already exists")
indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists "+pathfileval)
//console.log('info', JSON.stringify(resdata));
//console.log('info------------- ', JSON.stringify(global.pipelines));
var obj = req.body[i];
//var indexfind = global.pipelines.findIndex(x => x.res25swarmlabname==resdata.data.res25swarmlabname);
//indexfind === -1 ? global.pipelines.push(resob) : console.log("object already exists")
var now = new Date();
var reslog = new Object();
reslog.log = obj
reslog.date = convertDateToUTC(now)
var pathfileval = pathmodule.basename(reslog.log.tailed_path);
var indexfind = global.pipelines.findIndex(x => x.pathlogfile==pathfileval);
//indexfind === -1 ? sendlog(reslog,pathfileval) : console.log("object already exists")
indexfind === -1 ? console.log("object not found") : sendlog(reslog,pathfileval)
console.log("IOT "+JSON.stringify(reslog.log.tailed_path));
console.log("IOTindexfind "+JSON.stringify(indexfind));
console.log("IOTuser "+JSON.stringify(global.pipelines));
// io.in("iot").emit("message", reslog);
// io.emit("logdata", reslog);
//io.in("iot").emit("message", RES);
console.error('socket POST from client');
var RES = new Object();
RES.error = false
RES.error_msg = "ok"
RES.msg = req.body[0].messsage
// ***************************************************
// socket
// ***************************************************
function getSHA256ofJSON(input){
return require("crypto").createHash("sha256").update(JSON.stringify(input)).digest("hex");
function onCollection(err, collection) {
let options = { tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500
var cursor = collection.find({},options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var reslog1 = new Object();
var now = new Date();
cursor.on('data', function (data) {
var issendob = {};
issendob.message = data.message
issendob.tailed_path = data.tailed_path
var issend = getSHA256ofJSON(issendob)
console.log('++++++++' + JSON.stringify(data));
console.log('++++++++' + JSON.stringify(issend));
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = "yes"
var resob = {};
pubClient.get(pathfileval, function(err, object) {
var objecttmp = JSON.parse(object);
reslog.log = data
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g,"")
console.log(' ---get '+ JSON.stringify(reslog))
pubClient.get(issend, function(err, object) {
console.log(' ---set '+ JSON.stringify(reslog))
pubClient.set(issend, itemsProcessed, function(err, res) {
io.in(user1).emit("logdata", reslog);
console.log(user1 + ' ---isset '+ JSON.stringify(reslog))
io.in(user1).emit("logdata", reslog);
//io.in(user1).emit("logdata", reslog);
//console.log(' --- '+ JSON.stringify(reslog))
//console.log(' --->> '+ JSON.stringify(user1))
(async() => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata1 = await getpipelines(token,pathfile)
resob.pathlogfile = pathfileval
var resob11 = {}
var i1 = 0
resob11.data = resdata1.data[i1].res25swarmlabname
resob11.user25user = resdata1.data[i1].res25user.replace(/[\n\t\r]/g,"")
resob11.res25creator = resdata1.data[i1].res25creator
resob11.res25fileforce = resdata1.data[i1].res25fileforce
resob11.tailed_path = pathfileval
var resob1string1 = JSON.stringify(resob11);
await pubClient.set(pathfileval, resob1string1, function(err, res) {
var user1 = resob11.user25user
reslog.log = 'no'
io.in(user1).emit("logdata", reslog);
console.log(' ---no--- '+ JSON.stringify(reslog))
})() //await inside yes
pubClient.get(pathfileval, function(err, object) {
reslog.log = data
var user1 = object.user25user
io.in(user1).emit("logdata", reslog);
console.log(' --- '+ JSON.stringify(reslog))
}); //redis get
pubClient.get(pathfileval, function(err, object) {
console.log('----------------' + err + '<<<<<<<<<<<<<<<<<<<<<<' + object);
indexupdate = "no"
console.log('redis '+JSON.stringify(object));
console.log('update '+JSON.stringify(indexupdate));
if (indexupdate == "yes" ){
(async() => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token,pathfile)
resob.pathlogfile = pathfileval
var resobarray = []
var resob1 = {}
var i = 0
resob1.data = resdata.data[i].res25swarmlabname
resob1.user25user = resdata.data[i].res25user
resob1.res25creator = resdata.data[i].res25creator
resob1.res25fileforce = resdata.data[i].res25fileforce
resob1.tailed_path = pathfileval
var resob1string = JSON.stringify(resob1);
pubClient.set(pathfileval, resob1string, function(err, res) {
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
var user = resob1.res25creator
console.log('datauser ' + JSON.stringify(user));
console.log('>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '+JSON.stringify(reslog));
await pubClient.get(issend, function(err, object) {
if(err == null){
pubClient.set(issend, itemsProcessed, function(err, res) {
io.in(user).emit("logdata", reslog);
})() //await inside yes
(async() => {
await pubClient.get(pathfileval, function(err, object) {
var objecttmp = JSON.parse(object);
var resob1 = {}
resob1.data = objecttmp.res25swarmlabname
resob1.user25user = objecttmp.res25user
resob1.res25creator = objecttmp.res25creator
resob1.res25fileforce = objecttmp.res25fileforce
resob1.tailed_path = objecttmp.tailed_path
reslog.data = resob1
reslog.log = data
reslog.date = convertDateToUTC(now)
console.log('<<<<<<<<<<<---------------------<<<<<<<<<<<<<<<---------------------------<<<<<<<<<<<< '+JSON.stringify(object));
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< '+JSON.stringify(reslog));
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<obj>>>>>>>>>>>>>>>>>> '+JSON.stringify(resob1));
//var user = objecttmp.user25user
var user = resob1.res25creator
//io.in(user).emit("logdata", reslog);
//var user = 'anagnostopoulos@uniwa.gr'
pubClient.get(issend).then(function (result) {
console.log("---result--- "+result); // Prints "bar"
io.in(user).emit("logdata", reslog);
pubClient.get(issend, function(err, object) {
if(err == null){
pubClient.set(issend, itemsProcessed, function(err, res) {
//io.in(user).emit("logdata", reslog);
})() //await inside no
}); //redis get
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
// this method is also exposed by the Server instance
}, 8000);
var mongourl = "mongodb://playgrounduser:efvvnuioervefSDFSGYGHRDFVsdfergvssppiiedifhwincvinviw_dbfjbsifbsdkjfswuunscfudfgbbfvibqefwrvnine@ondemand_playground_mongo1:27017,ondemand_playground_mongo2:27017,ondemand_playground_mongo3:27017,ondemand_playground_mongo4:27017,ondemand_playground_mongo5:27017,ondemand_playground_mongo6:27017,ondemand_playground_mongo7:27017/fluent?replicaSet=rs1"
const OPTS = {
useNewUrlParser: true,
useUnifiedTopology: true
var mongooptions = {
autoReconnect: true,
keepAlive: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 0
MongoClient.connect(mongourl, OPTS, function(err, client){
} else {
const db = client.db('fluent');
db.collection('logs', onCollection);
io.on('connection', s => {
console.error('socket connection');
//s.set('transports', ['websocket']);
//s.set('pingTimeout', 30000);
//s.set('allowUpgrades', false);
//s.set('serveClient', false);
//s.set('pingInterval', 10000);
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error('socket ...');
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on('authenticate', function(data){
const token = data
console.log('invalid 1 ' + token);
(async() => {
var isvalid = await checkToken(token);
if(isvalid.action == 'ok'){
console.log("Authserver ok ", s.id + ' - ' + token);
// pubClient.set(session, resob1string, function(err, res) {
// });
usersession.SOCKET.user = isvalid.user
usersession.SOCKET.scope = isvalid.scope // space delimeter
usersession.SOCKET.token = isvalid.token
s.auth = true;
console.log("Authserver no ", s.id + ' - ' + token);
s.auth = false;
if (!s.auth) {
console.log("Disconnecting timeout socket ", s.id);
var room = usersession.SOCKET.user
//s.on("subscribe", function (room) {
console.log("joining rooom", s.rooms);
console.log(room + ' created ')
// });
}, 30000);
var id = s.id
s.on('log', obj => {
console.error('from client '+ s.id + ' obj ' + obj);
http.listen(3000, () => console.error('listening on http://localhost:3000/'));
console.error('socket.io example');