'use strict'; const Denque = require('denque'); const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; const Cursor = require('./cursor'); const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); const kResumeQueue = Symbol('resumeQueue'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( CHANGE_STREAM_OPTIONS ); const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), DATABASE: Symbol('Database'), CLUSTER: Symbol('Cluster') }; /** * @typedef ResumeToken * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server. * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token */ /** * @typedef OperationTime * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command} * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response */ /** * @typedef ChangeStreamOptions * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified. * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query. * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}. * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime. * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. */ /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @class ChangeStream * @since 3.0.0 * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents * @param {ChangeStreamOptions} [options] Optional settings * @fires ChangeStream#close * @fires ChangeStream#change * @fires ChangeStream#end * @fires ChangeStream#error * @fires ChangeStream#resumeTokenChanged * @return {ChangeStream} a ChangeStream instance. */ class ChangeStream extends EventEmitter { constructor(parent, pipeline, options) { super(); const Collection = require('./collection'); const Db = require('./db'); const MongoClient = require('./mongo_client'); this.pipeline = pipeline || []; this.options = options || {}; this.parent = parent; this.namespace = parent.s.namespace; if (parent instanceof Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; this.topology = parent.s.db.serverConfig; } else if (parent instanceof Db) { this.type = CHANGE_DOMAIN_TYPES.DATABASE; this.topology = parent.serverConfig; } else if (parent instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; this.topology = parent.topology; } else { throw new TypeError( 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' ); } this.promiseLibrary = parent.s.promiseLibrary; if (!this.options.readPreference && parent.s.readPreference) { this.options.readPreference = parent.s.readPreference; } this[kResumeQueue] = new Denque(); // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); this.closed = false; // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { this.cursor.on('data', change => processNewChange(this, change)); } }); // Listen for all `change` listeners being removed from ChangeStream this.on('removeListener', eventName => { if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) { this.cursor.removeAllListeners('data'); } }); } /** * @property {ResumeToken} resumeToken * The cached resume token that will be used to resume * after the most recently returned change. */ get resumeToken() { return this.cursor.resumeToken; } /** * Check if there is any document still available in the Change Stream * @function ChangeStream.prototype.hasNext * @param {ChangeStream~resultCallback} [callback] The result callback. * @throws {MongoError} * @returns {Promise|void} returns Promise if no callback passed */ hasNext(callback) { return maybePromise(this.parent, callback, cb => { getCursor(this, (err, cursor) => { if (err) return cb(err); // failed to resume, raise an error cursor.hasNext(cb); }); }); } /** * Get the next available document from the Change Stream, returns null if no more documents are available. * @function ChangeStream.prototype.next * @param {ChangeStream~resultCallback} [callback] The result callback. * @throws {MongoError} * @returns {Promise|void} returns Promise if no callback passed */ next(callback) { return maybePromise(this.parent, callback, cb => { getCursor(this, (err, cursor) => { if (err) return cb(err); // failed to resume, raise an error cursor.next((error, change) => { if (error) { this[kResumeQueue].push(() => this.next(cb)); processError(this, error, cb); return; } processNewChange(this, change, cb); }); }); }); } /** * Is the change stream closed * @method ChangeStream.prototype.isClosed * @param {boolean} [checkCursor=true] also check if the underlying cursor is closed * @return {boolean} */ isClosed() { return this.closed || (this.cursor && this.cursor.isClosed()); } /** * Close the Change Stream * @method ChangeStream.prototype.close * @param {ChangeStream~resultCallback} [callback] The result callback. * @return {Promise} returns Promise if no callback passed */ close(callback) { return maybePromise(this.parent, callback, cb => { if (this.closed) return cb(); // flag the change stream as explicitly closed this.closed = true; if (!this.cursor) return cb(); // Tidy up the existing cursor const cursor = this.cursor; return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); this.cursor = undefined; return cb(err); }); }); } /** * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream. * @method * @param {Writable} destination The destination for writing data * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options} * @return {null} */ pipe(destination, options) { if (!this.pipeDestinations) { this.pipeDestinations = []; } this.pipeDestinations.push(destination); return this.cursor.pipe(destination, options); } /** * This method will remove the hooks set up for a previous pipe() call. * @param {Writable} [destination] The destination for writing data * @return {null} */ unpipe(destination) { if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) { this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1); } return this.cursor.unpipe(destination); } /** * Return a modified Readable stream including a possible transform method. * @method * @param {object} [options] Optional settings. * @param {function} [options.transform] A transformation method applied to each document emitted by the stream. * @return {Cursor} */ stream(options) { this.streamOptions = options; return this.cursor.stream(options); } /** * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer. * @return {null} */ pause() { return this.cursor.pause(); } /** * This method will cause the readable stream to resume emitting data events. * @return {null} */ resume() { return this.cursor.resume(); } } class ChangeStreamCursor extends Cursor { constructor(topology, operation, options) { super(topology, operation, options); options = options || {}; this._resumeToken = null; this.startAtOperationTime = options.startAtOperationTime; if (options.startAfter) { this.resumeToken = options.startAfter; } else if (options.resumeAfter) { this.resumeToken = options.resumeAfter; } } set resumeToken(token) { this._resumeToken = token; this.emit('resumeTokenChanged', token); } get resumeToken() { return this._resumeToken; } get resumeOptions() { const result = {}; for (const optionName of CURSOR_OPTIONS) { if (this.options[optionName]) result[optionName] = this.options[optionName]; } if (this.resumeToken || this.startAtOperationTime) { ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]); if (this.resumeToken) { const resumeKey = this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; result[resumeKey] = this.resumeToken; } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { result.startAtOperationTime = this.startAtOperationTime; } } return result; } cacheResumeToken(resumeToken) { if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) { this.resumeToken = this.cursorState.postBatchResumeToken; } else { this.resumeToken = resumeToken; } this.hasReceived = true; } _processBatch(batchName, response) { const cursor = response.cursor; if (cursor.postBatchResumeToken) { this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken; if (cursor[batchName].length === 0) { this.resumeToken = cursor.postBatchResumeToken; } } } _initializeCursor(callback) { super._initializeCursor((err, result) => { if (err) { callback(err); return; } const response = result.documents[0]; if ( this.startAtOperationTime == null && this.resumeAfter == null && this.startAfter == null && maxWireVersion(this.server) >= 7 ) { this.startAtOperationTime = response.operationTime; } this._processBatch('firstBatch', response); this.emit('init', result); this.emit('response'); callback(err, result); }); } _getMore(callback) { super._getMore((err, response) => { if (err) { callback(err); return; } this._processBatch('nextBatch', response); this.emit('more', response); this.emit('response'); callback(err, response); }); } } /** * @event ChangeStreamCursor#response * internal event DO NOT USE * @ignore */ // Create a new change stream cursor based on self's configuration function createChangeStreamCursor(self, options) { const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline); const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS); const changeStreamCursor = new ChangeStreamCursor( self.topology, new AggregateOperation(self.parent, pipeline, options), cursorOptions ); relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']); /** * Fired for each new matching change in the specified namespace. Attaching a `change` * event listener to a Change Stream will switch the stream into flowing mode. Data will * then be passed as soon as it is available. * * @event ChangeStream#change * @type {object} */ if (self.listenerCount('change') > 0) { changeStreamCursor.on('data', function(change) { processNewChange(self, change); }); } /** * Change stream close event * * @event ChangeStream#close * @type {null} */ /** * Change stream end event * * @event ChangeStream#end * @type {null} */ /** * Emitted each time the change stream stores a new resume token. * * @event ChangeStream#resumeTokenChanged * @type {ResumeToken} */ /** * Fired when the stream encounters an error. * * @event ChangeStream#error * @type {Error} */ changeStreamCursor.on('error', function(error) { processError(self, error); }); if (self.pipeDestinations) { const cursorStream = changeStreamCursor.stream(self.streamOptions); for (let pipeDestination in self.pipeDestinations) { cursorStream.pipe(pipeDestination); } } return changeStreamCursor; } function applyKnownOptions(target, source, optionNames) { optionNames.forEach(name => { if (source[name]) { target[name] = source[name]; } }); return target; } // This method performs a basic server selection loop, satisfying the requirements of // ChangeStream resumability until the new SDAM layer can be used. const SELECTION_TIMEOUT = 30000; function waitForTopologyConnected(topology, options, callback) { setTimeout(() => { if (options && options.start == null) options.start = process.hrtime(); const start = options.start || process.hrtime(); const timeout = options.timeout || SELECTION_TIMEOUT; const readPreference = options.readPreference; if (topology.isConnected({ readPreference })) return callback(); const hrElapsed = process.hrtime(start); const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6; if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection')); waitForTopologyConnected(topology, options, callback); }, 500); // this is an arbitrary wait time to allow SDAM to transition } function processNewChange(changeStream, change, callback) { const cursor = changeStream.cursor; if (changeStream.closed) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); if (!callback) return changeStream.emit('error', noResumeTokenError); return callback(noResumeTokenError); } // cache the resume token cursor.cacheResumeToken(change._id); // wipe the startAtOperationTime if there was one so that there won't be a conflict // between resumeToken and startAtOperationTime if we need to reconnect the cursor changeStream.options.startAtOperationTime = undefined; // Return the change if (!callback) return changeStream.emit('change', change); return callback(undefined, change); } function processError(changeStream, error, callback) { const topology = changeStream.topology; const cursor = changeStream.cursor; // If the change stream has been closed explictly, do not process error. if (changeStream.closed) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } // if the resume succeeds, continue with the new cursor function resumeWithCursor(newCursor) { changeStream.cursor = newCursor; processResumeQueue(changeStream); } // otherwise, raise an error and close the change stream function unresumableError(err) { if (!callback) { changeStream.emit('error', err); changeStream.emit('close'); } processResumeQueue(changeStream, err); changeStream.closed = true; } if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { changeStream.cursor = undefined; // stop listening to all events from old cursor ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); // close internal cursor, ignore errors cursor.close(); waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { // if the topology can't reconnect, close the stream if (err) return unresumableError(err); // create a new cursor, preserving the old cursor's options const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); // attempt to continue in emitter mode if (!callback) return resumeWithCursor(newCursor); // attempt to continue in iterator mode newCursor.hasNext(err => { // if there's an error immediately after resuming, close the stream if (err) return unresumableError(err); resumeWithCursor(newCursor); }); }); return; } if (!callback) return changeStream.emit('error', error); return callback(error); } /** * Safely provides a cursor across resume attempts * * @param {ChangeStream} changeStream the parent ChangeStream * @param {function} callback gets the cursor or error * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ function getCursor(changeStream, callback) { if (changeStream.isClosed()) { callback(new MongoError('ChangeStream is closed.')); return; } // if a cursor exists and it is open, return it if (changeStream.cursor) { callback(undefined, changeStream.cursor); return; } // no cursor, queue callback until topology reconnects changeStream[kResumeQueue].push(callback); } /** * Drain the resume queue when a new has become available * * @param {ChangeStream} changeStream the parent ChangeStream * @param {ChangeStreamCursor?} changeStream.cursor the new cursor * @param {Error} [err] error getting a new cursor */ function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { const request = changeStream[kResumeQueue].pop(); if (changeStream.isClosed() && !err) { request(new MongoError('Change Stream is not open.')); return; } request(err, changeStream.cursor); } } /** * The callback format for results * @callback ChangeStream~resultCallback * @param {MongoError} error An error instance representing the error during the execution. * @param {(object|null)} result The result object if the command was executed successfully. */ module.exports = ChangeStream;