Browse Source

stream mongo

master
lefteris 4 years ago
parent
commit
b178cdea9b
  1. 186
      swarmlab-app/src/run/app.js

186
swarmlab-app/src/run/app.js

@ -95,17 +95,8 @@ const corsOptions = {
// --- LEFOS MONGO LOGGING
// ------------------------------
// Lefos - mongo connect
// mongoose
// .connect("mongodb://mongo:27017/fluentdb", {
// useNewUrlParser: true,
// useUnifiedTopology: true,
// })
// .then(() => console.log("MongoDB connected Volume test"))
// .catch((err) => console.log(err));
// Lefos - mongo test read
app.get("/run", (req, res) => {
app.get("/test", (req, res) => {
console.log("reading from db....");
var url = "mongodb://mongo:27017/";
@ -124,22 +115,57 @@ app.get("/run", (req, res) => {
obj.forEach((value) => {
if (value.log.includes("app_name")) {
jsonfinal.push(JSON.parse(value.log));
//console.log("skata");
}
});
// logs = JSON.parse(JSON.stringify(logs));
// logs = logs.replace(/\\/g, "");
//console.log("DATA FROM MONGO DB: " + result);
res.send(jsonfinal);
db.close();
});
});
});
console.log("reading from db....");
var url = "mongodb://mongo:27017/";
MongoClient.connect(url, function (err, db) {
if (err) throw err;
var dbo = db.db("fluentdb");
dbo.collection("test", onCollectionNew);
});
function onCollectionNew(err, collection) {
let options = {
tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500,
};
var cursor = collection.find({}, options).stream();
var itemsProcessed = 0;
cursor.on("data", function (data) {
var obj = JSON.parse(JSON.stringify(data));
// obj.log = JSON.parse(obj.log);
// var jsonfinal = [];
// obj.forEach((value) => {
// if (value.log.includes("app_name")) {
// jsonfinal.push(JSON.parse(value.log));
// }
// });
// Pernaw karfota to room pros to paron
io.in("cs141082@uniwa.gr").emit("logsend", obj);
});
}
io.on("connection", (s) => {
console.error("socket connection");
// -------- Lefos section
//dbo.collection("test", onCollectionNew);
// --------
//s.set('transports', ['websocket']);
//s.set('pingTimeout', 30000);
//s.set('allowUpgrades', false);
@ -159,7 +185,7 @@ io.on("connection", (s) => {
// ------------------------------
s.on("authenticate", function (data) {
const token = data;
console.log("TEST LOG INSIDE ATHENTICATE SOCKET: invalid 1 " + token);
console.log("TEST LOG INSIDE ATHENTICATE SOCKET: " + token);
(async () => {
var isvalid = await checkToken(token);
if (isvalid.action == "ok") {
@ -873,76 +899,76 @@ async function iosend(data, issend, io, pubClient, user1) {
}
}
// function onCollection(err, collection) {
// let options = {
// tailable: true,
// awaitdata: true,
// numberOfRetries: -1,
// tailableRetryInterval: 500,
// };
// var cursor = collection.find({}, options).stream();
// var itemsProcessed = 0;
function onCollection(err, collection) {
let options = {
tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500,
};
var cursor = collection.find({}, options).stream();
var itemsProcessed = 0;
// var reslog = new Object();
// var now = new Date();
// cursor.on("data", function (data) {
// var issendob = new Object();
// issendob.id = data._id;
// issendob.message = data.message;
// issendob.tailed_path = data.tailed_path;
var reslog = new Object();
var now = new Date();
cursor.on("data", function (data) {
var issendob = new Object();
issendob.id = data._id;
issendob.message = data.message;
issendob.tailed_path = data.tailed_path;
// var issend = getSHA256ofJSON(issendob);
var issend = getSHA256ofJSON(issendob);
// console.log("++++++++" + JSON.stringify(data));
// console.log("++++++++ob" + JSON.stringify(issendob));
// console.log("++++++++sha" + JSON.stringify(issend));
console.log("++++++++" + JSON.stringify(data));
console.log("++++++++ob" + JSON.stringify(issendob));
console.log("++++++++sha" + JSON.stringify(issend));
// var pathfileval = pathmodule.basename(data.tailed_path);
// var arrfile = pathfileval.toString().split("-");
// var pathfile = arrfile[0];
var pathfileval = pathmodule.basename(data.tailed_path);
var arrfile = pathfileval.toString().split("-");
var pathfile = arrfile[0];
// var indexupdate = "yes";
// var resob = {};
// pubClient.get(pathfileval, function (err, object) {
// var objecttmp = JSON.parse(object);
// if (object) {
// var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, "");
// iosend(data, issend, io, pubClient, user1);
// } else {
// (async () => {
// var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
// var resdata1 = await getpipelines(token, pathfile);
// resob.pathlogfile = pathfileval;
// var resob11 = {};
// var i1 = 0;
// resob11.data = resdata1.data[i1].res25swarmlabname;
// resob11.user25user = resdata1.data[i1].res25user.replace(
// /[\n\t\r]/g,
// ""
// );
// resob11.res25creator = resdata1.data[i1].res25creator;
// resob11.res25fileforce = resdata1.data[i1].res25fileforce;
// resob11.tailed_path = pathfileval;
// var resob1string1 = JSON.stringify(resob11);
// await pubClient.set(
// pathfileval,
// resob1string1,
// function (err, res) {}
// );
// var user1 = resob11.user25user;
// iosend(data, issend, io, pubClient, user1);
// console.log(" ---no--- " + JSON.stringify(data));
// })(); //await inside yes
// }
// });
// });
var indexupdate = "yes";
var resob = {};
pubClient.get(pathfileval, function (err, object) {
var objecttmp = JSON.parse(object);
if (object) {
var user1 = objecttmp.user25user.replace(/[\n\t\r]/g, "");
iosend(data, issend, io, pubClient, user1);
} else {
(async () => {
var token = "d2539e5a7ae1f9f1b0eb2b8f22ca467a86d28407"; // desto
var resdata1 = await getpipelines(token, pathfile);
resob.pathlogfile = pathfileval;
var resob11 = {};
var i1 = 0;
resob11.data = resdata1.data[i1].res25swarmlabname;
resob11.user25user = resdata1.data[i1].res25user.replace(
/[\n\t\r]/g,
""
);
resob11.res25creator = resdata1.data[i1].res25creator;
resob11.res25fileforce = resdata1.data[i1].res25fileforce;
resob11.tailed_path = pathfileval;
var resob1string1 = JSON.stringify(resob11);
await pubClient.set(
pathfileval,
resob1string1,
function (err, res) {}
);
var user1 = resob11.user25user;
iosend(data, issend, io, pubClient, user1);
console.log(" ---no--- " + JSON.stringify(data));
})(); //await inside yes
}
});
});
// setInterval(function () {
// console.log("itemsProcessed", itemsProcessed);
// // this method is also exposed by the Server instance
// //console.log(adapter.rooms);
// }, 8000);
// }
setInterval(function () {
console.log("itemsProcessed", itemsProcessed);
// this method is also exposed by the Server instance
//console.log(adapter.rooms);
}, 8000);
}
/// ARXIKO MONGO CONNECTION ///
// var mongourl =

Loading…
Cancel
Save