diff --git a/swarmlab-app/src/run/app.js b/swarmlab-app/src/run/app.js index 9726a012..1d08064f 100644 --- a/swarmlab-app/src/run/app.js +++ b/swarmlab-app/src/run/app.js @@ -197,6 +197,7 @@ app.get("/length", cors(corsOptions), (req, res) => { ); }); +// έλεγχος αν η δομή είναι κατάλληλη για μετατροπή σε json function IsJsonString(str) { try { JSON.parse(str); @@ -206,6 +207,7 @@ function IsJsonString(str) { return true; } +// endpoint για την αναγνώριση των υπηρεσιών στο δίκτυο app.get("/services", cors(corsOptions), (req, res) => { console.error("getting length of logs"); @@ -317,6 +319,7 @@ app.get("/test", cors(corsOptions), (req, res) => { ); }); +// endpoint που επιστρέφει όλα τα logs. app.get("/raw", cors(corsOptions), (req, res) => { console.log("reading from db...."); @@ -350,6 +353,7 @@ app.get("/raw", cors(corsOptions), (req, res) => { ); }); +// βασικό endpoint που επιστρέφει κατάλληλα τα logs στην ευρετηρίαση του web-client app.get("/test2", cors(corsOptions), (req, res) => { var RES = new Object(); const page = req.query["page"]; @@ -582,16 +586,7 @@ app.get("/test2", cors(corsOptions), (req, res) => { console.log("reading from db...."); -// Lefos-- variable poy krata to trexon room tou xrhsth kathe fora -var curRoom; - -// var url = "mongodb://mongo:27017/"; -// MongoClient.connect(url, function (err, db) { -// if (err) throw err; -// var dbo = db.db("fluentdb"); -// dbo.collection("test", onCollectionNew); -// }); - +// συνάρτηση που χρησιμοποιείται ως callback για την δημιουργία stream με τη βάση και συγκεκριμένο user async function onCollectionNew(err, collection) { /* Prepei na elegxw kathe fora an to socket id tou user einai energo @@ -635,22 +630,10 @@ async function onCollectionNew(err, collection) { }); } +// sockets και διαχείριση συνδέσεων io.on("connection", (s) => { console.error("socket connection"); - // -------- Lefos section - //dbo.collection("test", onCollectionNew); - - // -------- - - //s.set('transports', ['websocket']); - //s.set('pingTimeout', 30000); - //s.set('allowUpgrades', false); - //s.set('serveClient', false); - //s.set('pingInterval', 10000); - // ------------------------------ - // --- set - // ------------------------------ var usersession = new Object(); usersession.SOCKET = {}; usersession.SOCKET.error = {}; @@ -931,48 +914,49 @@ function sendlog(reslog, pathfileval) { //} } -function onlogfile(path) { - console.log("File", path, "has been added"); - var pathfileval = pathmodule.basename(path); - var arrfile = pathfileval.toString().split("-"); - var pathfile = arrfile[0]; - var indexfind1 = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - console.log( - "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) - ); - if (indexfind1 === -1) { - (async () => { - console.log( - "file2222222222222222222222222222222222222 " + - JSON.stringify(pathfileval) - ); - var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto - var resdata = await getpipelines(token, pathfile); - //resdata.data.pathlogfile = 'test' - var resob = {}; - resob.pathlogfile = pathfileval; - var resobarray = []; - for (let i in resdata.data) { - var resob1 = {}; - resob1.data = resdata.data[i].res25swarmlabname; - resob1.user25user = resdata.data[i].res25user; - resob1.res25creator = resdata.data[i].res25creator; - resob1.res25fileforce = resdata.data[i].res25fileforce; - resobarray.push(resob1); - } - resob.data = resobarray; - var indexfind = global.pipelines.findIndex( - (x) => x.pathlogfile == pathfileval - ); - indexfind === -1 - ? global.pipelines.push(resob) - : console.log("object already exists " + pathfileval); - })(); - } -} - +// function onlogfile(path) { +// console.log("File", path, "has been added"); +// var pathfileval = pathmodule.basename(path); +// var arrfile = pathfileval.toString().split("-"); +// var pathfile = arrfile[0]; +// var indexfind1 = global.pipelines.findIndex( +// (x) => x.pathlogfile == pathfileval +// ); +// console.log( +// "file11111111111111111111111111111111 " + JSON.stringify(pathfileval) +// ); +// if (indexfind1 === -1) { +// (async () => { +// console.log( +// "file2222222222222222222222222222222222222 " + +// JSON.stringify(pathfileval) +// ); +// var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto +// var resdata = await getpipelines(token, pathfile); +// //resdata.data.pathlogfile = 'test' +// var resob = {}; +// resob.pathlogfile = pathfileval; +// var resobarray = []; +// for (let i in resdata.data) { +// var resob1 = {}; +// resob1.data = resdata.data[i].res25swarmlabname; +// resob1.user25user = resdata.data[i].res25user; +// resob1.res25creator = resdata.data[i].res25creator; +// resob1.res25fileforce = resdata.data[i].res25fileforce; +// resobarray.push(resob1); +// } +// resob.data = resobarray; +// var indexfind = global.pipelines.findIndex( +// (x) => x.pathlogfile == pathfileval +// ); +// indexfind === -1 +// ? global.pipelines.push(resob) +// : console.log("object already exists " + pathfileval); +// })(); +// } +// } + +// Έλεγχος αν υπάρχει ήδη ενεργό stream με τη βάση δεδομένων για κάποιον async function checkstream(data) { var res = await getkey(data.id); if (res == "1") { @@ -997,6 +981,7 @@ function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } +// δεν την χρησιμοποιώ κάπου function getSHA256ofJSON(data, inputEncoding, encoding) { if (!data) { return ""; @@ -1007,29 +992,28 @@ function getSHA256ofJSON(data, inputEncoding, encoding) { return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); } -// --- LEFOS - get user via token from REDIS -async function getUser(token) { - return new Promise((resolve) => { - pubClient.get(token, function (err, reply) { - if (err) { - console.log("----------error------------"); - - resolve(null); - } else { - if (reply) { - console.log("---------found----------"); - resolve(1); - } else { - console.log("----------not found------------"); - resolve(2); - //return 2 - } - } - }); - }); -} - -//var getkey = function getkey(key) { +// async function getUser(token) { +// return new Promise((resolve) => { +// pubClient.get(token, function (err, reply) { +// if (err) { +// console.log("----------error------------"); + +// resolve(null); +// } else { +// if (reply) { +// console.log("---------found----------"); +// resolve(1); +// } else { +// console.log("----------not found------------"); +// resolve(2); +// //return 2 +// } +// } +// }); +// }); +// } + +// Διαδικασία ελέγχου στοιχείων στο redis για την εξακρίβωση ύπαρξης stream με τη βάση async function getkey(id) { return new Promise((resolve) => { pubClient.get(id, function (err, reply) { @@ -1050,6 +1034,7 @@ async function getkey(id) { }); }); } + // Lefos === Set the user to redis var setUser = function setus(id, user) { return new Promise((resolve) => { @@ -1077,105 +1062,5 @@ var setkey = function setkv(key, value) { }); }; -async function iosend(data, issend, io, pubClient, user1) { - var new1 = {}; - new1.tailed_path = data.tailed_path; - new1.message = data.message; - - var now = new Date(); - var reslog1 = {}; - //reslog1.data = resob1 - reslog1.log = new1; - reslog1.date = convertDateToUTC(now); - var user = user1; - - const randomTimeInMs = Math.random() * 2000; - await sleep(randomTimeInMs); - var getres = await getkey(issend); - - if (getres == "1") { - console.log(issend + " ---1 " + JSON.stringify(reslog1)); - //io.in(user).emit("logdata", reslog1); - } else if (getres == "2") { - console.log(issend + " ---2 " + JSON.stringify(reslog1)); - setkey(issend, "1"); - //pubClient.set(issend, '1', function(err, res) { - //}); - io.in(user).emit("logdata", reslog1); - //} - } -} - -function onCollection(err, collection) { - let options = { - tailable: true, - awaitdata: true, - numberOfRetries: -1, - tailableRetryInterval: 500, - }; - var cursor = collection.find({}, options).stream(); - var itemsProcessed = 0; - - var reslog = new Object(); - var now = new Date(); - cursor.on("data", function (data) { - var issendob = new Object(); - issendob.id = data._id; - issendob.message = data.message; - issendob.tailed_path = data.tailed_path; - - var issend = getSHA256ofJSON(issendob); - - console.log("++++++++" + JSON.stringify(data)); - console.log("++++++++ob" + JSON.stringify(issendob)); - console.log("++++++++sha" + JSON.stringify(issend)); - - var pathfileval = pathmodule.basename(data.tailed_path); - var arrfile = pathfileval.toString().split("-"); - var pathfile = arrfile[0]; - - var indexupdate = "yes"; - var resob = {}; - pubClient.get(pathfileval, function (err, object) { - var objecttmp = JSON.parse(object); - if (object) { - var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); - iosend(data, issend, io, pubClient, user1); - } else { - (async () => { - var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto - var resdata1 = await getpipelines(token, pathfile); - resob.pathlogfile = pathfileval; - var resob11 = {}; - var i1 = 0; - resob11.data = resdata1.data[i1].res25swarmlabname; - resob11.user25user = resdata1.data[i1].res25user.replace( - /[\n\t\r]/g, - "" - ); - resob11.res25creator = resdata1.data[i1].res25creator; - resob11.res25fileforce = resdata1.data[i1].res25fileforce; - resob11.tailed_path = pathfileval; - var resob1string1 = JSON.stringify(resob11); - await pubClient.set( - pathfileval, - resob1string1, - function (err, res) {} - ); - var user1 = resob11.user25user; - iosend(data, issend, io, pubClient, user1); - console.log(" ---no--- " + JSON.stringify(data)); - })(); //await inside yes - } - }); - }); - - setInterval(function () { - console.log("itemsProcessed", itemsProcessed); - // this method is also exposed by the Server instance - //console.log(adapter.rooms); - }, 8000); -} - http.listen(3000, () => console.error("listening on http://localhost:3000/")); console.error("socket.io example");