Swarmlab docs

Application development in a distributed system

Development of Distributed Systems from Design to Application


You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

170 lines
3.7 KiB

= readmongo_service
== App
src: readmongo/swarmlab-app/src/run/
=== stream2mongo
[source,javascript]
----
async function onCollectionNew(err, collection) {
let options = {
tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 500,
};
var cursor = collection.find({}, options).stream();
var itemsProcessed = 0;
var room = this.user;
var sid = this.id;
console.log("Inside callback: " + room + " Id: " + sid);
var rep = setUser(sid, room);
cursor.on("data", async function (data) {
cursor.pause();
var res = await getkey(sid);
if (res == "1") {
cursor.resume();
var obj = JSON.parse(JSON.stringify(data));
io.in(room).emit("logsend", obj);
} else if (res == "2") {
cursor.resume();
console.log("Cursor is closing...");
cursor.close();
}
});
}
----
var cursor = collection.find({}, options).stream();
A Tailable Cursor *remains open* after the client exhausts the results in the initial cursor.
Tailable cursors are conceptually equivalent to the *tail* Unix command with the *-f* option (i.e. with "follow" mode).
After clients insert new additional documents into a capped collection, the tailable cursor will continue to retrieve documents.
=== socket (open,checkstream,event)
[source,javascript]
----
const pubClient = new Redis({
host: REDIS,
port: REDIS_PORT,
});
// ------------------------------
// read from redis
// ------------------------------
async function getkey(id) {
return new Promise((resolve) => {
pubClient.get(id, function (err, reply) {
if (err) {
resolve(null);
} else {
if (reply) {
//console.log("---------fount----------");
resolve(1);
} else {
console.log("----------not fount------------");
resolve(2);
}
}
});
});
}
// ------------------------------
// check if stream exists
// ------------------------------
async function checkstream(data) {
var res = await getkey(data.id);
if (res == "1") {
console.log("Stream is on!");
} else {
console.log("Creating Stream....");
var url = URL;
MongoClient.connect(
url,
{ useNewUrlParser: true, useUnifiedTopology: true },
function (err, db) {
if (err) throw err;
var dbo = db.db(DATABASE);
dbo.collection(COLLECTION, onCollectionNew.bind(data));
}
);
}
}
// ------------------------------
// --- open socket -------------
// ------------------------------
io.on("connection", (s) => {
console.error("socket connection");
var usersession = new Object();
usersession.SOCKET = {};
usersession.SOCKET.error = {};
console.error("socket ...");
s.auth = false;
// ------------------------------
// --- authenticate
// ------------------------------
s.on("authenticate", function (data) {
const token = data;
(async () => {
var isvalid = await checkToken(token);
if (isvalid.action == "ok") {
usersession.SOCKET.user = isvalid.user;
usersession.SOCKET.scope = isvalid.scope; // space delimeter
usersession.SOCKET.token = isvalid.token;
usersession.SOCKET.id = s.id;
s.auth = true;
} else {
s.auth = false;
}
})();
});
// ------------------------------
// --- event ----------------
// ------------------------------
s.on("onevent", function (data) {
var binddata = {
user: data,
id: s.id,
};
checkstream(binddata);
});
}
----
=== restart
To make changes become effective a restart is *not* required
NOTE: You may have to wait (couple of minutes) for the system to fully provision resources. You may have to refresh the web interface a couple of times!