You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
3.6 KiB
169 lines
3.6 KiB
= readmongo_service
|
|
|
|
|
|
== App
|
|
|
|
src: readmongo/swarmlab-app/src/run/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
=== stream2mongo
|
|
|
|
[source,javascript]
|
|
----
|
|
|
|
async function onCollectionNew(err, collection) {
|
|
let options = {
|
|
tailable: true,
|
|
awaitdata: true,
|
|
numberOfRetries: -1,
|
|
tailableRetryInterval: 500,
|
|
};
|
|
var cursor = collection.find({}, options).stream();
|
|
var itemsProcessed = 0;
|
|
var room = this.user;
|
|
var sid = this.id;
|
|
console.log("Inside callback: " + room + " Id: " + sid);
|
|
var rep = setUser(sid, room);
|
|
|
|
cursor.on("data", async function (data) {
|
|
cursor.pause();
|
|
var res = await getkey(sid);
|
|
|
|
if (res == "1") {
|
|
cursor.resume();
|
|
var obj = JSON.parse(JSON.stringify(data));
|
|
io.in(room).emit("logsend", obj);
|
|
} else if (res == "2") {
|
|
cursor.resume();
|
|
console.log("Cursor is closing...");
|
|
cursor.close();
|
|
}
|
|
});
|
|
}
|
|
|
|
----
|
|
|
|
var cursor = collection.find({}, options).stream();
|
|
|
|
A Tailable Cursor *remains open* after the client exhausts the results in the initial cursor.
|
|
|
|
Tailable cursors are conceptually equivalent to the *tail* Unix command with the *-f* option (i.e. with "follow" mode).
|
|
|
|
After clients insert new additional documents into a capped collection, the tailable cursor will continue to retrieve documents.
|
|
|
|
|
|
=== socket (open,checkstream,event)
|
|
|
|
[source,javascript]
|
|
----
|
|
|
|
const pubClient = new Redis({
|
|
host: REDIS,
|
|
port: REDIS_PORT,
|
|
});
|
|
|
|
// ------------------------------
|
|
// read from redis
|
|
// ------------------------------
|
|
async function getkey(id) {
|
|
return new Promise((resolve) => {
|
|
pubClient.get(id, function (err, reply) {
|
|
if (err) {
|
|
resolve(null);
|
|
} else {
|
|
if (reply) {
|
|
//console.log("---------fount----------");
|
|
resolve(1);
|
|
} else {
|
|
console.log("----------not fount------------");
|
|
resolve(2);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
// ------------------------------
|
|
// check if stream exists
|
|
// ------------------------------
|
|
async function checkstream(data) {
|
|
var res = await getkey(data.id);
|
|
if (res == "1") {
|
|
console.log("Stream is on!");
|
|
} else {
|
|
console.log("Creating Stream....");
|
|
|
|
var url = URL;
|
|
MongoClient.connect(
|
|
url,
|
|
{ useNewUrlParser: true, useUnifiedTopology: true },
|
|
function (err, db) {
|
|
if (err) throw err;
|
|
var dbo = db.db(DATABASE);
|
|
dbo.collection(COLLECTION, onCollectionNew.bind(data));
|
|
}
|
|
);
|
|
}
|
|
}
|
|
|
|
// ------------------------------
|
|
// --- open socket -------------
|
|
// ------------------------------
|
|
io.on("connection", (s) => {
|
|
console.error("socket connection");
|
|
var usersession = new Object();
|
|
usersession.SOCKET = {};
|
|
usersession.SOCKET.error = {};
|
|
console.error("socket ...");
|
|
s.auth = false;
|
|
|
|
// ------------------------------
|
|
// --- authenticate
|
|
// ------------------------------
|
|
s.on("authenticate", function (data) {
|
|
const token = data;
|
|
(async () => {
|
|
var isvalid = await checkToken(token);
|
|
if (isvalid.action == "ok") {
|
|
usersession.SOCKET.user = isvalid.user;
|
|
usersession.SOCKET.scope = isvalid.scope; // space delimeter
|
|
usersession.SOCKET.token = isvalid.token;
|
|
usersession.SOCKET.id = s.id;
|
|
s.auth = true;
|
|
} else {
|
|
s.auth = false;
|
|
}
|
|
})();
|
|
});
|
|
|
|
// ------------------------------
|
|
// --- event ----------------
|
|
// ------------------------------
|
|
s.on("onevent", function (data) {
|
|
var binddata = {
|
|
user: data,
|
|
id: s.id,
|
|
};
|
|
checkstream(binddata);
|
|
});
|
|
|
|
}
|
|
----
|
|
|
|
|
|
=== restart
|
|
|
|
To make changes become effective a restart is *not* required
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|