= readmongo_service == App src: readmongo/swarmlab-app/src/run/ === stream2mongo [source,javascript] ---- async function onCollectionNew(err, collection) { let options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 500, }; var cursor = collection.find({}, options).stream(); var itemsProcessed = 0; var room = this.user; var sid = this.id; console.log("Inside callback: " + room + " Id: " + sid); var rep = setUser(sid, room); cursor.on("data", async function (data) { cursor.pause(); var res = await getkey(sid); if (res == "1") { cursor.resume(); var obj = JSON.parse(JSON.stringify(data)); io.in(room).emit("logsend", obj); } else if (res == "2") { cursor.resume(); console.log("Cursor is closing..."); cursor.close(); } }); } ---- var cursor = collection.find({}, options).stream(); A Tailable Cursor *remains open* after the client exhausts the results in the initial cursor. Tailable cursors are conceptually equivalent to the *tail* Unix command with the *-f* option (i.e. with "follow" mode). After clients insert new additional documents into a capped collection, the tailable cursor will continue to retrieve documents. === socket (open,event) [source,javascript] ---- const pubClient = new Redis({ host: REDIS, port: REDIS_PORT, }); // ------------------------------ // read from redis // ------------------------------ async function getkey(id) { return new Promise((resolve) => { pubClient.get(id, function (err, reply) { if (err) { resolve(null); } else { if (reply) { //console.log("---------fount----------"); resolve(1); } else { console.log("----------not fount------------"); resolve(2); } } }); }); } // ------------------------------ // check if stream exists // ------------------------------ async function checkstream(data) { var res = await getkey(data.id); if (res == "1") { console.log("Stream is on!"); } else { console.log("Creating Stream...."); var url = URL; MongoClient.connect( url, { useNewUrlParser: true, useUnifiedTopology: true }, function (err, db) { if (err) throw err; var dbo = db.db(DATABASE); dbo.collection(COLLECTION, onCollectionNew.bind(data)); } ); } } // ------------------------------ // --- open socket ------------- // ------------------------------ io.on("connection", (s) => { console.error("socket connection"); var usersession = new Object(); usersession.SOCKET = {}; usersession.SOCKET.error = {}; console.error("socket ..."); s.auth = false; // ------------------------------ // --- authenticate // ------------------------------ s.on("authenticate", function (data) { const token = data; (async () => { var isvalid = await checkToken(token); if (isvalid.action == "ok") { usersession.SOCKET.user = isvalid.user; usersession.SOCKET.scope = isvalid.scope; // space delimeter usersession.SOCKET.token = isvalid.token; usersession.SOCKET.id = s.id; s.auth = true; } else { s.auth = false; } })(); }); // ------------------------------ // --- event ---------------- // ------------------------------ s.on("onevent", function (data) { var binddata = { user: data, id: s.id, }; checkstream(binddata); }); } ---- === restart To make changes become effective a restart is *not* required