Browse Source

Redis, stream on/off

master
lefteris 3 years ago
parent
commit
2ec53a3c70
  1. 107
      swarmlab-app/src/run/app.js

107
swarmlab-app/src/run/app.js

@ -62,6 +62,7 @@ app.use(helmet());
const cors = require("cors");
const whitelist = [
"http://localhost:3000",
"http://localhost:8080",
"http://localhost:3080",
"http://localhost:3081",
@ -96,7 +97,7 @@ const corsOptions = {
// ------------------------------
// Lefos - mongo test read
app.get("/test", (req, res) => {
app.get("/test", cors(corsOptions), (req, res) => {
console.log("reading from db....");
var url = "mongodb://mongo:27017/";
@ -109,7 +110,7 @@ app.get("/test", (req, res) => {
.find({})
.toArray(function (err, result) {
if (err) throw err;
// LOGO OTI EXW NESTED JSON PREPEI NA TO KANW PARSE DUO FORES
// EPIDI EXW NESTED JSON PREPEI NA TO KANW PARSE DUO FORES
var obj = JSON.parse(JSON.stringify(result));
var jsonfinal = [];
obj.forEach((value) => {
@ -136,6 +137,11 @@ var curRoom;
// });
function onCollectionNew(err, collection) {
/*
Prepei na elegxw kathe fora an to socket id tou user einai energo
wste na mhn diathreitai zwntanh h callback kai lamvanw dublicate
data ston client
*/
let options = {
tailable: true,
awaitdata: true,
@ -144,12 +150,21 @@ function onCollectionNew(err, collection) {
};
var cursor = collection.find({}, options).stream();
var itemsProcessed = 0;
var room = this;
console.log("Inside callback: " + room);
var room = this.user;
var sid = this.id;
console.log("Inside callback: " + room + " Id: " + sid);
// LEFOS --- STORE USER IN REDIS
var rep = setUser(room, room);
cursor.on("data", function (data) {
var obj = JSON.parse(JSON.stringify(data));
// var getres = getkey(sid);
// if (getres == "1") {
// console.log("sending on event log");
// Pernaw karfota to room pros to paron
// } else if (getres == "2") {
// cursor.close();
// }
io.in(room).emit("logsend", obj);
});
}
@ -191,6 +206,9 @@ io.on("connection", (s) => {
usersession.SOCKET.user = isvalid.user;
usersession.SOCKET.scope = isvalid.scope; // space delimeter
usersession.SOCKET.token = isvalid.token;
//console.log("Reply: " + rep);
// -----
s.auth = true;
} else {
console.log("Authserver no ", s.id + " - " + token);
@ -201,22 +219,15 @@ io.on("connection", (s) => {
s.on("onevent", function (data) {
console.log("I GOT THE DATA: ", data);
var user = data;
console.log("reading from db....");
var binddata = {
user: data,
id: s.id,
};
checkstream(binddata);
});
// var url = "mongodb://mongo:27017/";
// MongoClient.connect(url, function (err, db) {
// if (err) throw err;
// var dbo = db.db("fluentdb");
// dbo.collection("test", onCollectionNew);
// });
var url = "mongodb://mongo:27017/";
MongoClient.connect(url, function (err, db) {
if (err) throw err;
var dbo = db.db("fluentdb");
dbo.collection("test", onCollectionNew.bind(user));
});
s.on("disconnect", function () {
console.log("Socket: " + s.id + " Disconnected");
});
setTimeout(function () {
@ -493,6 +504,21 @@ function onlogfile(path) {
}
}
async function checkstream(data) {
var res = await getkey(data.user);
if (res == "1") {
console.log("Stream is on!");
} else {
console.log("Creating Stream....");
var url = "mongodb://mongo:27017/";
MongoClient.connect(url, function (err, db) {
if (err) throw err;
var dbo = db.db("fluentdb");
dbo.collection("test", onCollectionNew.bind(data));
});
}
}
// ***************************************************
// rest get
// ***************************************************
@ -853,10 +879,32 @@ function getSHA256ofJSON(data, inputEncoding, encoding) {
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding);
}
// --- LEFOS - get user via token from REDIS
async function getUser(token) {
return new Promise((resolve) => {
pubClient.get(token, function (err, reply) {
if (err) {
console.log("----------error------------");
resolve(null);
} else {
if (reply) {
console.log("---------found----------");
resolve(1);
} else {
console.log("----------not found------------");
resolve(2);
//return 2
}
}
});
});
}
//var getkey = function getkey(key) {
async function getkey(key) {
async function getkey(id) {
return new Promise((resolve) => {
pubClient.get(key, function (err, reply) {
pubClient.get(id, function (err, reply) {
if (err) {
console.log("----------error------------");
@ -874,7 +922,20 @@ async function getkey(key) {
});
});
}
// Lefos === Set the user to redis
var setUser = function setus(id, user) {
return new Promise((resolve) => {
//pubClient.set(key,value, 'EX', expire, function(err,reply){
pubClient.set(id, user, function (err, reply) {
if (err) {
resolve(null);
} else {
resolve(reply);
}
});
});
};
// ===
var setkey = function setkv(key, value) {
return new Promise((resolve) => {
//pubClient.set(key,value, 'EX', expire, function(err,reply){

Loading…
Cancel
Save