|
|
@ -197,6 +197,7 @@ app.get("/length", cors(corsOptions), (req, res) => { |
|
|
|
); |
|
|
|
}); |
|
|
|
|
|
|
|
// έλεγχος αν η δομή είναι κατάλληλη για μετατροπή σε json
|
|
|
|
function IsJsonString(str) { |
|
|
|
try { |
|
|
|
JSON.parse(str); |
|
|
@ -206,6 +207,7 @@ function IsJsonString(str) { |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// endpoint για την αναγνώριση των υπηρεσιών στο δίκτυο
|
|
|
|
app.get("/services", cors(corsOptions), (req, res) => { |
|
|
|
console.error("getting length of logs"); |
|
|
|
|
|
|
@ -317,6 +319,7 @@ app.get("/test", cors(corsOptions), (req, res) => { |
|
|
|
); |
|
|
|
}); |
|
|
|
|
|
|
|
// endpoint που επιστρέφει όλα τα logs.
|
|
|
|
app.get("/raw", cors(corsOptions), (req, res) => { |
|
|
|
console.log("reading from db...."); |
|
|
|
|
|
|
@ -350,6 +353,7 @@ app.get("/raw", cors(corsOptions), (req, res) => { |
|
|
|
); |
|
|
|
}); |
|
|
|
|
|
|
|
// βασικό endpoint που επιστρέφει κατάλληλα τα logs στην ευρετηρίαση του web-client
|
|
|
|
app.get("/test2", cors(corsOptions), (req, res) => { |
|
|
|
var RES = new Object(); |
|
|
|
const page = req.query["page"]; |
|
|
@ -582,16 +586,7 @@ app.get("/test2", cors(corsOptions), (req, res) => { |
|
|
|
|
|
|
|
console.log("reading from db...."); |
|
|
|
|
|
|
|
// Lefos-- variable poy krata to trexon room tou xrhsth kathe fora
|
|
|
|
var curRoom; |
|
|
|
|
|
|
|
// var url = "mongodb://mongo:27017/";
|
|
|
|
// MongoClient.connect(url, function (err, db) {
|
|
|
|
// if (err) throw err;
|
|
|
|
// var dbo = db.db("fluentdb");
|
|
|
|
// dbo.collection("test", onCollectionNew);
|
|
|
|
// });
|
|
|
|
|
|
|
|
// συνάρτηση που χρησιμοποιείται ως callback για την δημιουργία stream με τη βάση και συγκεκριμένο user
|
|
|
|
async function onCollectionNew(err, collection) { |
|
|
|
/* |
|
|
|
Prepei na elegxw kathe fora an to socket id tou user einai energo |
|
|
@ -635,22 +630,10 @@ async function onCollectionNew(err, collection) { |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
// sockets και διαχείριση συνδέσεων
|
|
|
|
io.on("connection", (s) => { |
|
|
|
console.error("socket connection"); |
|
|
|
|
|
|
|
// -------- Lefos section
|
|
|
|
//dbo.collection("test", onCollectionNew);
|
|
|
|
|
|
|
|
// --------
|
|
|
|
|
|
|
|
//s.set('transports', ['websocket']);
|
|
|
|
//s.set('pingTimeout', 30000);
|
|
|
|
//s.set('allowUpgrades', false);
|
|
|
|
//s.set('serveClient', false);
|
|
|
|
//s.set('pingInterval', 10000);
|
|
|
|
// ------------------------------
|
|
|
|
// --- set
|
|
|
|
// ------------------------------
|
|
|
|
var usersession = new Object(); |
|
|
|
usersession.SOCKET = {}; |
|
|
|
usersession.SOCKET.error = {}; |
|
|
@ -931,48 +914,49 @@ function sendlog(reslog, pathfileval) { |
|
|
|
//}
|
|
|
|
} |
|
|
|
|
|
|
|
function onlogfile(path) { |
|
|
|
console.log("File", path, "has been added"); |
|
|
|
var pathfileval = pathmodule.basename(path); |
|
|
|
var arrfile = pathfileval.toString().split("-"); |
|
|
|
var pathfile = arrfile[0]; |
|
|
|
var indexfind1 = global.pipelines.findIndex( |
|
|
|
(x) => x.pathlogfile == pathfileval |
|
|
|
); |
|
|
|
console.log( |
|
|
|
"file11111111111111111111111111111111 " + JSON.stringify(pathfileval) |
|
|
|
); |
|
|
|
if (indexfind1 === -1) { |
|
|
|
(async () => { |
|
|
|
console.log( |
|
|
|
"file2222222222222222222222222222222222222 " + |
|
|
|
JSON.stringify(pathfileval) |
|
|
|
); |
|
|
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
|
|
var resdata = await getpipelines(token, pathfile); |
|
|
|
//resdata.data.pathlogfile = 'test'
|
|
|
|
var resob = {}; |
|
|
|
resob.pathlogfile = pathfileval; |
|
|
|
var resobarray = []; |
|
|
|
for (let i in resdata.data) { |
|
|
|
var resob1 = {}; |
|
|
|
resob1.data = resdata.data[i].res25swarmlabname; |
|
|
|
resob1.user25user = resdata.data[i].res25user; |
|
|
|
resob1.res25creator = resdata.data[i].res25creator; |
|
|
|
resob1.res25fileforce = resdata.data[i].res25fileforce; |
|
|
|
resobarray.push(resob1); |
|
|
|
} |
|
|
|
resob.data = resobarray; |
|
|
|
var indexfind = global.pipelines.findIndex( |
|
|
|
(x) => x.pathlogfile == pathfileval |
|
|
|
); |
|
|
|
indexfind === -1 |
|
|
|
? global.pipelines.push(resob) |
|
|
|
: console.log("object already exists " + pathfileval); |
|
|
|
})(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// function onlogfile(path) {
|
|
|
|
// console.log("File", path, "has been added");
|
|
|
|
// var pathfileval = pathmodule.basename(path);
|
|
|
|
// var arrfile = pathfileval.toString().split("-");
|
|
|
|
// var pathfile = arrfile[0];
|
|
|
|
// var indexfind1 = global.pipelines.findIndex(
|
|
|
|
// (x) => x.pathlogfile == pathfileval
|
|
|
|
// );
|
|
|
|
// console.log(
|
|
|
|
// "file11111111111111111111111111111111 " + JSON.stringify(pathfileval)
|
|
|
|
// );
|
|
|
|
// if (indexfind1 === -1) {
|
|
|
|
// (async () => {
|
|
|
|
// console.log(
|
|
|
|
// "file2222222222222222222222222222222222222 " +
|
|
|
|
// JSON.stringify(pathfileval)
|
|
|
|
// );
|
|
|
|
// var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
|
|
// var resdata = await getpipelines(token, pathfile);
|
|
|
|
// //resdata.data.pathlogfile = 'test'
|
|
|
|
// var resob = {};
|
|
|
|
// resob.pathlogfile = pathfileval;
|
|
|
|
// var resobarray = [];
|
|
|
|
// for (let i in resdata.data) {
|
|
|
|
// var resob1 = {};
|
|
|
|
// resob1.data = resdata.data[i].res25swarmlabname;
|
|
|
|
// resob1.user25user = resdata.data[i].res25user;
|
|
|
|
// resob1.res25creator = resdata.data[i].res25creator;
|
|
|
|
// resob1.res25fileforce = resdata.data[i].res25fileforce;
|
|
|
|
// resobarray.push(resob1);
|
|
|
|
// }
|
|
|
|
// resob.data = resobarray;
|
|
|
|
// var indexfind = global.pipelines.findIndex(
|
|
|
|
// (x) => x.pathlogfile == pathfileval
|
|
|
|
// );
|
|
|
|
// indexfind === -1
|
|
|
|
// ? global.pipelines.push(resob)
|
|
|
|
// : console.log("object already exists " + pathfileval);
|
|
|
|
// })();
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
// Έλεγχος αν υπάρχει ήδη ενεργό stream με τη βάση δεδομένων για κάποιον
|
|
|
|
async function checkstream(data) { |
|
|
|
var res = await getkey(data.id); |
|
|
|
if (res == "1") { |
|
|
@ -997,6 +981,7 @@ function sleep(ms) { |
|
|
|
return new Promise((resolve) => setTimeout(resolve, ms)); |
|
|
|
} |
|
|
|
|
|
|
|
// δεν την χρησιμοποιώ κάπου
|
|
|
|
function getSHA256ofJSON(data, inputEncoding, encoding) { |
|
|
|
if (!data) { |
|
|
|
return ""; |
|
|
@ -1007,29 +992,28 @@ function getSHA256ofJSON(data, inputEncoding, encoding) { |
|
|
|
return hash.update(JSON.stringify(data), inputEncoding).digest(encoding); |
|
|
|
} |
|
|
|
|
|
|
|
// --- LEFOS - get user via token from REDIS
|
|
|
|
async function getUser(token) { |
|
|
|
return new Promise((resolve) => { |
|
|
|
pubClient.get(token, function (err, reply) { |
|
|
|
if (err) { |
|
|
|
console.log("----------error------------"); |
|
|
|
|
|
|
|
resolve(null); |
|
|
|
} else { |
|
|
|
if (reply) { |
|
|
|
console.log("---------found----------"); |
|
|
|
resolve(1); |
|
|
|
} else { |
|
|
|
console.log("----------not found------------"); |
|
|
|
resolve(2); |
|
|
|
//return 2
|
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
} |
|
|
|
// async function getUser(token) {
|
|
|
|
// return new Promise((resolve) => {
|
|
|
|
// pubClient.get(token, function (err, reply) {
|
|
|
|
// if (err) {
|
|
|
|
// console.log("----------error------------");
|
|
|
|
|
|
|
|
// resolve(null);
|
|
|
|
// } else {
|
|
|
|
// if (reply) {
|
|
|
|
// console.log("---------found----------");
|
|
|
|
// resolve(1);
|
|
|
|
// } else {
|
|
|
|
// console.log("----------not found------------");
|
|
|
|
// resolve(2);
|
|
|
|
// //return 2
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// });
|
|
|
|
// });
|
|
|
|
// }
|
|
|
|
|
|
|
|
//var getkey = function getkey(key) {
|
|
|
|
// Διαδικασία ελέγχου στοιχείων στο redis για την εξακρίβωση ύπαρξης stream με τη βάση
|
|
|
|
async function getkey(id) { |
|
|
|
return new Promise((resolve) => { |
|
|
|
pubClient.get(id, function (err, reply) { |
|
|
@ -1050,6 +1034,7 @@ async function getkey(id) { |
|
|
|
}); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
// Lefos === Set the user to redis
|
|
|
|
var setUser = function setus(id, user) { |
|
|
|
return new Promise((resolve) => { |
|
|
@ -1077,105 +1062,5 @@ var setkey = function setkv(key, value) { |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
async function iosend(data, issend, io, pubClient, user1) { |
|
|
|
var new1 = {}; |
|
|
|
new1.tailed_path = data.tailed_path; |
|
|
|
new1.message = data.message; |
|
|
|
|
|
|
|
var now = new Date(); |
|
|
|
var reslog1 = {}; |
|
|
|
//reslog1.data = resob1
|
|
|
|
reslog1.log = new1; |
|
|
|
reslog1.date = convertDateToUTC(now); |
|
|
|
var user = user1; |
|
|
|
|
|
|
|
const randomTimeInMs = Math.random() * 2000; |
|
|
|
await sleep(randomTimeInMs); |
|
|
|
var getres = await getkey(issend); |
|
|
|
|
|
|
|
if (getres == "1") { |
|
|
|
console.log(issend + " ---1 " + JSON.stringify(reslog1)); |
|
|
|
//io.in(user).emit("logdata", reslog1);
|
|
|
|
} else if (getres == "2") { |
|
|
|
console.log(issend + " ---2 " + JSON.stringify(reslog1)); |
|
|
|
setkey(issend, "1"); |
|
|
|
//pubClient.set(issend, '1', function(err, res) {
|
|
|
|
//});
|
|
|
|
io.in(user).emit("logdata", reslog1); |
|
|
|
//}
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
function onCollection(err, collection) { |
|
|
|
let options = { |
|
|
|
tailable: true, |
|
|
|
awaitdata: true, |
|
|
|
numberOfRetries: -1, |
|
|
|
tailableRetryInterval: 500, |
|
|
|
}; |
|
|
|
var cursor = collection.find({}, options).stream(); |
|
|
|
var itemsProcessed = 0; |
|
|
|
|
|
|
|
var reslog = new Object(); |
|
|
|
var now = new Date(); |
|
|
|
cursor.on("data", function (data) { |
|
|
|
var issendob = new Object(); |
|
|
|
issendob.id = data._id; |
|
|
|
issendob.message = data.message; |
|
|
|
issendob.tailed_path = data.tailed_path; |
|
|
|
|
|
|
|
var issend = getSHA256ofJSON(issendob); |
|
|
|
|
|
|
|
console.log("++++++++" + JSON.stringify(data)); |
|
|
|
console.log("++++++++ob" + JSON.stringify(issendob)); |
|
|
|
console.log("++++++++sha" + JSON.stringify(issend)); |
|
|
|
|
|
|
|
var pathfileval = pathmodule.basename(data.tailed_path); |
|
|
|
var arrfile = pathfileval.toString().split("-"); |
|
|
|
var pathfile = arrfile[0]; |
|
|
|
|
|
|
|
var indexupdate = "yes"; |
|
|
|
var resob = {}; |
|
|
|
pubClient.get(pathfileval, function (err, object) { |
|
|
|
var objecttmp = JSON.parse(object); |
|
|
|
if (object) { |
|
|
|
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, ""); |
|
|
|
iosend(data, issend, io, pubClient, user1); |
|
|
|
} else { |
|
|
|
(async () => { |
|
|
|
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
|
|
|
|
var resdata1 = await getpipelines(token, pathfile); |
|
|
|
resob.pathlogfile = pathfileval; |
|
|
|
var resob11 = {}; |
|
|
|
var i1 = 0; |
|
|
|
resob11.data = resdata1.data[i1].res25swarmlabname; |
|
|
|
resob11.user25user = resdata1.data[i1].res25user.replace( |
|
|
|
/[\n\t\r]/g, |
|
|
|
"" |
|
|
|
); |
|
|
|
resob11.res25creator = resdata1.data[i1].res25creator; |
|
|
|
resob11.res25fileforce = resdata1.data[i1].res25fileforce; |
|
|
|
resob11.tailed_path = pathfileval; |
|
|
|
var resob1string1 = JSON.stringify(resob11); |
|
|
|
await pubClient.set( |
|
|
|
pathfileval, |
|
|
|
resob1string1, |
|
|
|
function (err, res) {} |
|
|
|
); |
|
|
|
var user1 = resob11.user25user; |
|
|
|
iosend(data, issend, io, pubClient, user1); |
|
|
|
console.log(" ---no--- " + JSON.stringify(data)); |
|
|
|
})(); //await inside yes
|
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
setInterval(function () { |
|
|
|
console.log("itemsProcessed", itemsProcessed); |
|
|
|
// this method is also exposed by the Server instance
|
|
|
|
//console.log(adapter.rooms);
|
|
|
|
}, 8000); |
|
|
|
} |
|
|
|
|
|
|
|
http.listen(3000, () => console.error("listening on http://localhost:3000/")); |
|
|
|
console.error("socket.io example"); |
|
|
|