Compare commits

...

2 Commits

  1. 259
      swarmlab-app/src/run/app.js

259
swarmlab-app/src/run/app.js

@ -197,6 +197,7 @@ app.get("/length", cors(corsOptions), (req, res) => {
); );
}); });
// έλεγχος αν η δομή είναι κατάλληλη για μετατροπή σε json
function IsJsonString(str) { function IsJsonString(str) {
try { try {
JSON.parse(str); JSON.parse(str);
@ -206,6 +207,7 @@ function IsJsonString(str) {
return true; return true;
} }
// endpoint για την αναγνώριση των υπηρεσιών στο δίκτυο
app.get("/services", cors(corsOptions), (req, res) => { app.get("/services", cors(corsOptions), (req, res) => {
console.error("getting length of logs"); console.error("getting length of logs");
@ -317,6 +319,7 @@ app.get("/test", cors(corsOptions), (req, res) => {
); );
}); });
// endpoint που επιστρέφει όλα τα logs.
app.get("/raw", cors(corsOptions), (req, res) => { app.get("/raw", cors(corsOptions), (req, res) => {
console.log("reading from db...."); console.log("reading from db....");
@ -350,6 +353,7 @@ app.get("/raw", cors(corsOptions), (req, res) => {
); );
}); });
// βασικό endpoint που επιστρέφει κατάλληλα τα logs στην ευρετηρίαση του web-client
app.get("/test2", cors(corsOptions), (req, res) => { app.get("/test2", cors(corsOptions), (req, res) => {
var RES = new Object(); var RES = new Object();
const page = req.query["page"]; const page = req.query["page"];
@ -582,16 +586,7 @@ app.get("/test2", cors(corsOptions), (req, res) => {
console.log("reading from db...."); console.log("reading from db....");
// Lefos-- variable poy krata to trexon room tou xrhsth kathe fora // συνάρτηση που χρησιμοποιείται ως callback για την δημιουργία stream με τη βάση και συγκεκριμένο user
var curRoom;
// var url = "mongodb://mongo:27017/";
// MongoClient.connect(url, function (err, db) {
// if (err) throw err;
// var dbo = db.db("fluentdb");
// dbo.collection("test", onCollectionNew);
// });
async function onCollectionNew(err, collection) { async function onCollectionNew(err, collection) {
/* /*
Prepei na elegxw kathe fora an to socket id tou user einai energo Prepei na elegxw kathe fora an to socket id tou user einai energo
@ -635,22 +630,10 @@ async function onCollectionNew(err, collection) {
}); });
} }
// sockets και διαχείριση συνδέσεων
io.on("connection", (s) => { io.on("connection", (s) => {
console.error("socket connection"); console.error("socket connection");
// -------- Lefos section
//dbo.collection("test", onCollectionNew);
// --------
//s.set('transports', ['websocket']);
//s.set('pingTimeout', 30000);
//s.set('allowUpgrades', false);
//s.set('serveClient', false);
//s.set('pingInterval', 10000);
// ------------------------------
// --- set
// ------------------------------
var usersession = new Object(); var usersession = new Object();
usersession.SOCKET = {}; usersession.SOCKET = {};
usersession.SOCKET.error = {}; usersession.SOCKET.error = {};
@ -931,48 +914,49 @@ function sendlog(reslog, pathfileval) {
//} //}
} }
function onlogfile(path) { // function onlogfile(path) {
console.log("File", path, "has been added"); // console.log("File", path, "has been added");
var pathfileval = pathmodule.basename(path); // var pathfileval = pathmodule.basename(path);
var arrfile = pathfileval.toString().split("-"); // var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0]; // var pathfile = arrfile[0];
var indexfind1 = global.pipelines.findIndex( // var indexfind1 = global.pipelines.findIndex(
(x) => x.pathlogfile == pathfileval // (x) => x.pathlogfile == pathfileval
); // );
console.log( // console.log(
"file11111111111111111111111111111111 " + JSON.stringify(pathfileval) // "file11111111111111111111111111111111 " + JSON.stringify(pathfileval)
); // );
if (indexfind1 === -1) { // if (indexfind1 === -1) {
(async () => { // (async () => {
console.log( // console.log(
"file2222222222222222222222222222222222222 " + // "file2222222222222222222222222222222222222 " +
JSON.stringify(pathfileval) // JSON.stringify(pathfileval)
); // );
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto // var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata = await getpipelines(token, pathfile); // var resdata = await getpipelines(token, pathfile);
//resdata.data.pathlogfile = 'test' // //resdata.data.pathlogfile = 'test'
var resob = {}; // var resob = {};
resob.pathlogfile = pathfileval; // resob.pathlogfile = pathfileval;
var resobarray = []; // var resobarray = [];
for (let i in resdata.data) { // for (let i in resdata.data) {
var resob1 = {}; // var resob1 = {};
resob1.data = resdata.data[i].res25swarmlabname; // resob1.data = resdata.data[i].res25swarmlabname;
resob1.user25user = resdata.data[i].res25user; // resob1.user25user = resdata.data[i].res25user;
resob1.res25creator = resdata.data[i].res25creator; // resob1.res25creator = resdata.data[i].res25creator;
resob1.res25fileforce = resdata.data[i].res25fileforce; // resob1.res25fileforce = resdata.data[i].res25fileforce;
resobarray.push(resob1); // resobarray.push(resob1);
} // }
resob.data = resobarray; // resob.data = resobarray;
var indexfind = global.pipelines.findIndex( // var indexfind = global.pipelines.findIndex(
(x) => x.pathlogfile == pathfileval // (x) => x.pathlogfile == pathfileval
); // );
indexfind === -1 // indexfind === -1
? global.pipelines.push(resob) // ? global.pipelines.push(resob)
: console.log("object already exists " + pathfileval); // : console.log("object already exists " + pathfileval);
})(); // })();
} // }
} // }
// Έλεγχος αν υπάρχει ήδη ενεργό stream με τη βάση δεδομένων για κάποιον
async function checkstream(data) { async function checkstream(data) {
var res = await getkey(data.id); var res = await getkey(data.id);
if (res == "1") { if (res == "1") {
@ -997,6 +981,7 @@ function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }
// δεν την χρησιμοποιώ κάπου
function getSHA256ofJSON(data, inputEncoding, encoding) { function getSHA256ofJSON(data, inputEncoding, encoding) {
if (!data) { if (!data) {
return ""; return "";
@ -1007,29 +992,28 @@ function getSHA256ofJSON(data, inputEncoding, encoding) {
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); return hash.update(JSON.stringify(data), inputEncoding).digest(encoding);
} }
// --- LEFOS - get user via token from REDIS // async function getUser(token) {
async function getUser(token) { // return new Promise((resolve) => {
return new Promise((resolve) => { // pubClient.get(token, function (err, reply) {
pubClient.get(token, function (err, reply) { // if (err) {
if (err) { // console.log("----------error------------");
console.log("----------error------------");
// resolve(null);
resolve(null); // } else {
} else { // if (reply) {
if (reply) { // console.log("---------found----------");
console.log("---------found----------"); // resolve(1);
resolve(1); // } else {
} else { // console.log("----------not found------------");
console.log("----------not found------------"); // resolve(2);
resolve(2); // //return 2
//return 2 // }
} // }
} // });
}); // });
}); // }
}
//var getkey = function getkey(key) { // Διαδικασία ελέγχου στοιχείων στο redis για την εξακρίβωση ύπαρξης stream με τη βάση
async function getkey(id) { async function getkey(id) {
return new Promise((resolve) => { return new Promise((resolve) => {
pubClient.get(id, function (err, reply) { pubClient.get(id, function (err, reply) {
@ -1050,6 +1034,7 @@ async function getkey(id) {
}); });
}); });
} }
// Lefos === Set the user to redis // Lefos === Set the user to redis
var setUser = function setus(id, user) { var setUser = function setus(id, user) {
return new Promise((resolve) => { return new Promise((resolve) => {
@ -1077,105 +1062,5 @@ var setkey = function setkv(key, value) {
}); });
}; };
async function iosend(data, issend, io, pubClient, user1) {
var new1 = {};
new1.tailed_path = data.tailed_path;
new1.message = data.message;
var now = new Date();
var reslog1 = {};
//reslog1.data = resob1
reslog1.log = new1;
reslog1.date = convertDateToUTC(now);
var user = user1;
const randomTimeInMs = Math.random() * 2000;
await sleep(randomTimeInMs);
var getres = await getkey(issend);
if (getres == "1") {
console.log(issend + " ---1 " + JSON.stringify(reslog1));
//io.in(user).emit("logdata", reslog1);
} else if (getres == "2") {
console.log(issend + " ---2 " + JSON.stringify(reslog1));
setkey(issend, "1");
//pubClient.set(issend, '1', function(err, res) {
//});
io.in(user).emit("logdata", reslog1);
//}
}
}
function onCollection(err, collection) {
let options = {
tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500,
};
var cursor = collection.find({}, options).stream();
var itemsProcessed = 0;
var reslog = new Object();
var now = new Date();
cursor.on("data", function (data) {
var issendob = new Object();
issendob.id = data._id;
issendob.message = data.message;
issendob.tailed_path = data.tailed_path;
var issend = getSHA256ofJSON(issendob);
console.log("++++++++" + JSON.stringify(data));
console.log("++++++++ob" + JSON.stringify(issendob));
console.log("++++++++sha" + JSON.stringify(issend));
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
var indexupdate = "yes";
var resob = {};
pubClient.get(pathfileval, function (err, object) {
var objecttmp = JSON.parse(object);
if (object) {
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, "");
iosend(data, issend, io, pubClient, user1);
} else {
(async () => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata1 = await getpipelines(token, pathfile);
resob.pathlogfile = pathfileval;
var resob11 = {};
var i1 = 0;
resob11.data = resdata1.data[i1].res25swarmlabname;
resob11.user25user = resdata1.data[i1].res25user.replace(
/[\n\t\r]/g,
""
);
resob11.res25creator = resdata1.data[i1].res25creator;
resob11.res25fileforce = resdata1.data[i1].res25fileforce;
resob11.tailed_path = pathfileval;
var resob1string1 = JSON.stringify(resob11);
await pubClient.set(
pathfileval,
resob1string1,
function (err, res) {}
);
var user1 = resob11.user25user;
iosend(data, issend, io, pubClient, user1);
console.log(" ---no--- " + JSON.stringify(data));
})(); //await inside yes
}
});
});
setInterval(function () {
console.log("itemsProcessed", itemsProcessed);
// this method is also exposed by the Server instance
//console.log(adapter.rooms);
}, 8000);
}
http.listen(3000, () => console.error("listening on http://localhost:3000/")); http.listen(3000, () => console.error("listening on http://localhost:3000/"));
console.error("socket.io example"); console.error("socket.io example");

Loading…
Cancel
Save