From b178cdea9b336c39c1cbbd0a2b2f82e6d3e3b5b6 Mon Sep 17 00:00:00 2001 From: lefteris Date: Sun, 28 Mar 2021 01:13:01 +0200 Subject: [PATCH] stream mongo --- swarmlab-app/src/run/app.js | 198 ++++++++++++++++++++---------------- 1 file changed, 112 insertions(+), 86 deletions(-) diff --git a/swarmlab-app/src/run/app.js b/swarmlab-app/src/run/app.js index faec8d46..ecc1fdb5 100644 --- a/swarmlab-app/src/run/app.js +++ b/swarmlab-app/src/run/app.js @@ -95,17 +95,8 @@ const corsOptions = { // --- LEFOS MONGO LOGGING // ------------------------------ -// Lefos - mongo connect -// mongoose -// .connect("mongodb://mongo:27017/fluentdb", { -// useNewUrlParser: true, -// useUnifiedTopology: true, -// }) -// .then(() => console.log("MongoDB connected Volume test")) -// .catch((err) => console.log(err)); - // Lefos - mongo test read -app.get("/run", (req, res) => { +app.get("/test", (req, res) => { console.log("reading from db...."); var url = "mongodb://mongo:27017/"; @@ -124,22 +115,57 @@ app.get("/run", (req, res) => { obj.forEach((value) => { if (value.log.includes("app_name")) { jsonfinal.push(JSON.parse(value.log)); - //console.log("skata"); } }); - - // logs = JSON.parse(JSON.stringify(logs)); - // logs = logs.replace(/\\/g, ""); - //console.log("DATA FROM MONGO DB: " + result); res.send(jsonfinal); db.close(); }); }); }); +console.log("reading from db...."); + +var url = "mongodb://mongo:27017/"; +MongoClient.connect(url, function (err, db) { + if (err) throw err; + var dbo = db.db("fluentdb"); + dbo.collection("test", onCollectionNew); +}); + +function onCollectionNew(err, collection) { + let options = { + tailable: true, + awaitdata: true, + numberOfRetries: -1, + tailableRetryInterval: 500, + }; + var cursor = collection.find({}, options).stream(); + var itemsProcessed = 0; + + cursor.on("data", function (data) { + var obj = JSON.parse(JSON.stringify(data)); + // obj.log = JSON.parse(obj.log); + // var jsonfinal = []; + + // obj.forEach((value) => { + // if (value.log.includes("app_name")) { + // jsonfinal.push(JSON.parse(value.log)); + // } + // }); + + // Pernaw karfota to room pros to paron + io.in("cs141082@uniwa.gr").emit("logsend", obj); + }); +} + io.on("connection", (s) => { console.error("socket connection"); + // -------- Lefos section + //dbo.collection("test", onCollectionNew); + + // -------- + //s.set('transports', ['websocket']); //s.set('pingTimeout', 30000); //s.set('allowUpgrades', false); @@ -159,7 +185,7 @@ io.on("connection", (s) => { // ------------------------------ s.on("authenticate", function (data) { const token = data; - console.log("TEST LOG INSIDE ATHENTICATE SOCKET: invalid 1 " + token); + console.log("TEST LOG INSIDE ATHENTICATE SOCKET: " + token); (async () => { var isvalid = await checkToken(token); if (isvalid.action == "ok") { @@ -873,76 +899,76 @@ async function iosend(data, issend, io, pubClient, user1) { } } -// function onCollection(err, collection) { -// let options = { -// tailable: true, -// awaitdata: true, -// numberOfRetries: -1, -// tailableRetryInterval: 500, -// }; -// var cursor = collection.find({}, options).stream(); -// var itemsProcessed = 0; - -// var reslog = new Object(); -// var now = new Date(); -// cursor.on("data", function (data) { -// var issendob = new Object(); -// issendob.id = data._id; -// issendob.message = data.message; -// issendob.tailed_path = data.tailed_path; - -// var issend = getSHA256ofJSON(issendob); - -// console.log("++++++++" + JSON.stringify(data)); -// console.log("++++++++ob" + JSON.stringify(issendob)); -// console.log("++++++++sha" + JSON.stringify(issend)); - -// var pathfileval = pathmodule.basename(data.tailed_path); -// var arrfile = pathfileval.toString().split("-"); -// var pathfile = arrfile[0]; - -// var indexupdate = "yes"; -// var resob = {}; -// pubClient.get(pathfileval, function (err, object) { -// var objecttmp = JSON.parse(object); -// if (object) { -// var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); -// iosend(data, issend, io, pubClient, user1); -// } else { -// (async () => { -// var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto -// var resdata1 = await getpipelines(token, pathfile); -// resob.pathlogfile = pathfileval; -// var resob11 = {}; -// var i1 = 0; -// resob11.data = resdata1.data[i1].res25swarmlabname; -// resob11.user25user = resdata1.data[i1].res25user.replace( -// /[\n\t\r]/g, -// "" -// ); -// resob11.res25creator = resdata1.data[i1].res25creator; -// resob11.res25fileforce = resdata1.data[i1].res25fileforce; -// resob11.tailed_path = pathfileval; -// var resob1string1 = JSON.stringify(resob11); -// await pubClient.set( -// pathfileval, -// resob1string1, -// function (err, res) {} -// ); -// var user1 = resob11.user25user; -// iosend(data, issend, io, pubClient, user1); -// console.log(" ---no--- " + JSON.stringify(data)); -// })(); //await inside yes -// } -// }); -// }); - -// setInterval(function () { -// console.log("itemsProcessed", itemsProcessed); -// // this method is also exposed by the Server instance -// //console.log(adapter.rooms); -// }, 8000); -// } +function onCollection(err, collection) { + let options = { + tailable: true, + awaitdata: true, + numberOfRetries: -1, + tailableRetryInterval: 500, + }; + var cursor = collection.find({}, options).stream(); + var itemsProcessed = 0; + + var reslog = new Object(); + var now = new Date(); + cursor.on("data", function (data) { + var issendob = new Object(); + issendob.id = data._id; + issendob.message = data.message; + issendob.tailed_path = data.tailed_path; + + var issend = getSHA256ofJSON(issendob); + + console.log("++++++++" + JSON.stringify(data)); + console.log("++++++++ob" + JSON.stringify(issendob)); + console.log("++++++++sha" + JSON.stringify(issend)); + + var pathfileval = pathmodule.basename(data.tailed_path); + var arrfile = pathfileval.toString().split("-"); + var pathfile = arrfile[0]; + + var indexupdate = "yes"; + var resob = {}; + pubClient.get(pathfileval, function (err, object) { + var objecttmp = JSON.parse(object); + if (object) { + var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); + iosend(data, issend, io, pubClient, user1); + } else { + (async () => { + var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto + var resdata1 = await getpipelines(token, pathfile); + resob.pathlogfile = pathfileval; + var resob11 = {}; + var i1 = 0; + resob11.data = resdata1.data[i1].res25swarmlabname; + resob11.user25user = resdata1.data[i1].res25user.replace( + /[\n\t\r]/g, + "" + ); + resob11.res25creator = resdata1.data[i1].res25creator; + resob11.res25fileforce = resdata1.data[i1].res25fileforce; + resob11.tailed_path = pathfileval; + var resob1string1 = JSON.stringify(resob11); + await pubClient.set( + pathfileval, + resob1string1, + function (err, res) {} + ); + var user1 = resob11.user25user; + iosend(data, issend, io, pubClient, user1); + console.log(" ---no--- " + JSON.stringify(data)); + })(); //await inside yes + } + }); + }); + + setInterval(function () { + console.log("itemsProcessed", itemsProcessed); + // this method is also exposed by the Server instance + //console.log(adapter.rooms); + }, 8000); +} /// ARXIKO MONGO CONNECTION /// // var mongourl =