From 2ec53a3c70aee03b593065b08a976d8bbc2fcbfc Mon Sep 17 00:00:00 2001 From: lefteris Date: Thu, 1 Apr 2021 22:59:27 +0300 Subject: [PATCH] Redis, stream on/off --- swarmlab-app/src/run/app.js | 107 ++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 23 deletions(-) diff --git a/swarmlab-app/src/run/app.js b/swarmlab-app/src/run/app.js index a1e6bde9..4b7c6e26 100644 --- a/swarmlab-app/src/run/app.js +++ b/swarmlab-app/src/run/app.js @@ -62,6 +62,7 @@ app.use(helmet()); const cors = require("cors"); const whitelist = [ + "http://localhost:3000", "http://localhost:8080", "http://localhost:3080", "http://localhost:3081", @@ -96,7 +97,7 @@ const corsOptions = { // ------------------------------ // Lefos - mongo test read -app.get("/test", (req, res) => { +app.get("/test", cors(corsOptions), (req, res) => { console.log("reading from db...."); var url = "mongodb://mongo:27017/"; @@ -109,7 +110,7 @@ app.get("/test", (req, res) => { .find({}) .toArray(function (err, result) { if (err) throw err; - // LOGO OTI EXW NESTED JSON PREPEI NA TO KANW PARSE DUO FORES + // EPIDI EXW NESTED JSON PREPEI NA TO KANW PARSE DUO FORES var obj = JSON.parse(JSON.stringify(result)); var jsonfinal = []; obj.forEach((value) => { @@ -136,6 +137,11 @@ var curRoom; // }); function onCollectionNew(err, collection) { + /* + Prepei na elegxw kathe fora an to socket id tou user einai energo + wste na mhn diathreitai zwntanh h callback kai lamvanw dublicate + data ston client + */ let options = { tailable: true, awaitdata: true, @@ -144,12 +150,21 @@ function onCollectionNew(err, collection) { }; var cursor = collection.find({}, options).stream(); var itemsProcessed = 0; - var room = this; - console.log("Inside callback: " + room); + var room = this.user; + var sid = this.id; + console.log("Inside callback: " + room + " Id: " + sid); + // LEFOS --- STORE USER IN REDIS + var rep = setUser(room, room); + cursor.on("data", function (data) { var obj = JSON.parse(JSON.stringify(data)); + // var getres = getkey(sid); + // if (getres == "1") { + // console.log("sending on event log"); - // Pernaw karfota to room pros to paron + // } else if (getres == "2") { + // cursor.close(); + // } io.in(room).emit("logsend", obj); }); } @@ -191,6 +206,9 @@ io.on("connection", (s) => { usersession.SOCKET.user = isvalid.user; usersession.SOCKET.scope = isvalid.scope; // space delimeter usersession.SOCKET.token = isvalid.token; + + //console.log("Reply: " + rep); + // ----- s.auth = true; } else { console.log("Authserver no ", s.id + " - " + token); @@ -201,22 +219,15 @@ io.on("connection", (s) => { s.on("onevent", function (data) { console.log("I GOT THE DATA: ", data); - var user = data; - - console.log("reading from db...."); + var binddata = { + user: data, + id: s.id, + }; + checkstream(binddata); + }); - // var url = "mongodb://mongo:27017/"; - // MongoClient.connect(url, function (err, db) { - // if (err) throw err; - // var dbo = db.db("fluentdb"); - // dbo.collection("test", onCollectionNew); - // }); - var url = "mongodb://mongo:27017/"; - MongoClient.connect(url, function (err, db) { - if (err) throw err; - var dbo = db.db("fluentdb"); - dbo.collection("test", onCollectionNew.bind(user)); - }); + s.on("disconnect", function () { + console.log("Socket: " + s.id + " Disconnected"); }); setTimeout(function () { @@ -493,6 +504,21 @@ function onlogfile(path) { } } +async function checkstream(data) { + var res = await getkey(data.user); + if (res == "1") { + console.log("Stream is on!"); + } else { + console.log("Creating Stream...."); + + var url = "mongodb://mongo:27017/"; + MongoClient.connect(url, function (err, db) { + if (err) throw err; + var dbo = db.db("fluentdb"); + dbo.collection("test", onCollectionNew.bind(data)); + }); + } +} // *************************************************** // rest get // *************************************************** @@ -853,10 +879,32 @@ function getSHA256ofJSON(data, inputEncoding, encoding) { return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); } +// --- LEFOS - get user via token from REDIS +async function getUser(token) { + return new Promise((resolve) => { + pubClient.get(token, function (err, reply) { + if (err) { + console.log("----------error------------"); + + resolve(null); + } else { + if (reply) { + console.log("---------found----------"); + resolve(1); + } else { + console.log("----------not found------------"); + resolve(2); + //return 2 + } + } + }); + }); +} + //var getkey = function getkey(key) { -async function getkey(key) { +async function getkey(id) { return new Promise((resolve) => { - pubClient.get(key, function (err, reply) { + pubClient.get(id, function (err, reply) { if (err) { console.log("----------error------------"); @@ -874,7 +922,20 @@ async function getkey(key) { }); }); } - +// Lefos === Set the user to redis +var setUser = function setus(id, user) { + return new Promise((resolve) => { + //pubClient.set(key,value, 'EX', expire, function(err,reply){ + pubClient.set(id, user, function (err, reply) { + if (err) { + resolve(null); + } else { + resolve(reply); + } + }); + }); +}; +// === var setkey = function setkv(key, value) { return new Promise((resolve) => { //pubClient.set(key,value, 'EX', expire, function(err,reply){