You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
539 lines
13 KiB
539 lines
13 KiB
5 years ago
|
'use strict';
|
||
|
|
||
|
var core = require('../core');
|
||
|
var crypto = require('crypto');
|
||
|
var stream = require('stream');
|
||
|
var util = require('util');
|
||
|
var Buffer = require('safe-buffer').Buffer;
|
||
|
|
||
|
var ERROR_NAMESPACE_NOT_FOUND = 26;
|
||
|
|
||
|
module.exports = GridFSBucketWriteStream;
|
||
|
|
||
|
/**
|
||
|
* A writable stream that enables you to write buffers to GridFS.
|
||
|
*
|
||
|
* Do not instantiate this class directly. Use `openUploadStream()` instead.
|
||
|
*
|
||
|
* @class
|
||
|
* @extends external:Writable
|
||
|
* @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
|
||
|
* @param {string} filename The value of the 'filename' key in the files doc
|
||
|
* @param {object} [options] Optional settings.
|
||
|
* @param {string|number|object} [options.id] Custom file id for the GridFS file.
|
||
|
* @param {number} [options.chunkSizeBytes] The chunk size to use, in bytes
|
||
|
* @param {number} [options.w] The write concern
|
||
|
* @param {number} [options.wtimeout] The write concern timeout
|
||
|
* @param {number} [options.j] The journal write concern
|
||
|
* @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
|
||
|
* @fires GridFSBucketWriteStream#error
|
||
|
* @fires GridFSBucketWriteStream#finish
|
||
|
*/
|
||
|
|
||
|
function GridFSBucketWriteStream(bucket, filename, options) {
|
||
|
options = options || {};
|
||
|
this.bucket = bucket;
|
||
|
this.chunks = bucket.s._chunksCollection;
|
||
|
this.filename = filename;
|
||
|
this.files = bucket.s._filesCollection;
|
||
|
this.options = options;
|
||
|
// Signals the write is all done
|
||
|
this.done = false;
|
||
|
|
||
|
this.id = options.id ? options.id : core.BSON.ObjectId();
|
||
|
this.chunkSizeBytes = this.options.chunkSizeBytes;
|
||
|
this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
|
||
|
this.length = 0;
|
||
|
this.md5 = !options.disableMD5 && crypto.createHash('md5');
|
||
|
this.n = 0;
|
||
|
this.pos = 0;
|
||
|
this.state = {
|
||
|
streamEnd: false,
|
||
|
outstandingRequests: 0,
|
||
|
errored: false,
|
||
|
aborted: false,
|
||
|
promiseLibrary: this.bucket.s.promiseLibrary
|
||
|
};
|
||
|
|
||
|
if (!this.bucket.s.calledOpenUploadStream) {
|
||
|
this.bucket.s.calledOpenUploadStream = true;
|
||
|
|
||
|
var _this = this;
|
||
|
checkIndexes(this, function() {
|
||
|
_this.bucket.s.checkedIndexes = true;
|
||
|
_this.bucket.emit('index');
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
util.inherits(GridFSBucketWriteStream, stream.Writable);
|
||
|
|
||
|
/**
|
||
|
* An error occurred
|
||
|
*
|
||
|
* @event GridFSBucketWriteStream#error
|
||
|
* @type {Error}
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* `end()` was called and the write stream successfully wrote the file
|
||
|
* metadata and all the chunks to MongoDB.
|
||
|
*
|
||
|
* @event GridFSBucketWriteStream#finish
|
||
|
* @type {object}
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Write a buffer to the stream.
|
||
|
*
|
||
|
* @method
|
||
|
* @param {Buffer} chunk Buffer to write
|
||
|
* @param {String} encoding Optional encoding for the buffer
|
||
|
* @param {GridFSBucket~errorCallback} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
|
||
|
* @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
|
||
|
*/
|
||
|
|
||
|
GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
|
||
|
var _this = this;
|
||
|
return waitForIndexes(this, function() {
|
||
|
return doWrite(_this, chunk, encoding, callback);
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Places this write stream into an aborted state (all future writes fail)
|
||
|
* and deletes all chunks that have already been written.
|
||
|
*
|
||
|
* @method
|
||
|
* @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
|
||
|
* @return {Promise} if no callback specified
|
||
|
*/
|
||
|
|
||
|
GridFSBucketWriteStream.prototype.abort = function(callback) {
|
||
|
if (this.state.streamEnd) {
|
||
|
var error = new Error('Cannot abort a stream that has already completed');
|
||
|
if (typeof callback === 'function') {
|
||
|
return callback(error);
|
||
|
}
|
||
|
return this.state.promiseLibrary.reject(error);
|
||
|
}
|
||
|
if (this.state.aborted) {
|
||
|
error = new Error('Cannot call abort() on a stream twice');
|
||
|
if (typeof callback === 'function') {
|
||
|
return callback(error);
|
||
|
}
|
||
|
return this.state.promiseLibrary.reject(error);
|
||
|
}
|
||
|
this.state.aborted = true;
|
||
|
this.chunks.deleteMany({ files_id: this.id }, function(error) {
|
||
|
if (typeof callback === 'function') callback(error);
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Tells the stream that no more data will be coming in. The stream will
|
||
|
* persist the remaining data to MongoDB, write the files document, and
|
||
|
* then emit a 'finish' event.
|
||
|
*
|
||
|
* @method
|
||
|
* @param {Buffer} chunk Buffer to write
|
||
|
* @param {String} encoding Optional encoding for the buffer
|
||
|
* @param {GridFSBucket~errorCallback} callback Function to call when all files and chunks have been persisted to MongoDB
|
||
|
*/
|
||
|
|
||
|
GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
|
||
|
var _this = this;
|
||
|
if (typeof chunk === 'function') {
|
||
|
(callback = chunk), (chunk = null), (encoding = null);
|
||
|
} else if (typeof encoding === 'function') {
|
||
|
(callback = encoding), (encoding = null);
|
||
|
}
|
||
|
|
||
|
if (checkAborted(this, callback)) {
|
||
|
return;
|
||
|
}
|
||
|
this.state.streamEnd = true;
|
||
|
|
||
|
if (callback) {
|
||
|
this.once('finish', function(result) {
|
||
|
callback(null, result);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
if (!chunk) {
|
||
|
waitForIndexes(this, function() {
|
||
|
writeRemnant(_this);
|
||
|
});
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this.write(chunk, encoding, function() {
|
||
|
writeRemnant(_this);
|
||
|
});
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function __handleError(_this, error, callback) {
|
||
|
if (_this.state.errored) {
|
||
|
return;
|
||
|
}
|
||
|
_this.state.errored = true;
|
||
|
if (callback) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
_this.emit('error', error);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function createChunkDoc(filesId, n, data) {
|
||
|
return {
|
||
|
_id: core.BSON.ObjectId(),
|
||
|
files_id: filesId,
|
||
|
n: n,
|
||
|
data: data
|
||
|
};
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function checkChunksIndex(_this, callback) {
|
||
|
_this.chunks.listIndexes().toArray(function(error, indexes) {
|
||
|
if (error) {
|
||
|
// Collection doesn't exist so create index
|
||
|
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
|
||
|
var index = { files_id: 1, n: 1 };
|
||
|
_this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
|
||
|
if (error) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
callback();
|
||
|
});
|
||
|
return;
|
||
|
}
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
var hasChunksIndex = false;
|
||
|
indexes.forEach(function(index) {
|
||
|
if (index.key) {
|
||
|
var keys = Object.keys(index.key);
|
||
|
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
|
||
|
hasChunksIndex = true;
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
|
||
|
if (hasChunksIndex) {
|
||
|
callback();
|
||
|
} else {
|
||
|
index = { files_id: 1, n: 1 };
|
||
|
var indexOptions = getWriteOptions(_this);
|
||
|
|
||
|
indexOptions.background = false;
|
||
|
indexOptions.unique = true;
|
||
|
|
||
|
_this.chunks.createIndex(index, indexOptions, function(error) {
|
||
|
if (error) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
callback();
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function checkDone(_this, callback) {
|
||
|
if (_this.done) return true;
|
||
|
if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) {
|
||
|
// Set done so we dont' trigger duplicate createFilesDoc
|
||
|
_this.done = true;
|
||
|
// Create a new files doc
|
||
|
var filesDoc = createFilesDoc(
|
||
|
_this.id,
|
||
|
_this.length,
|
||
|
_this.chunkSizeBytes,
|
||
|
_this.md5 && _this.md5.digest('hex'),
|
||
|
_this.filename,
|
||
|
_this.options.contentType,
|
||
|
_this.options.aliases,
|
||
|
_this.options.metadata
|
||
|
);
|
||
|
|
||
|
if (checkAborted(_this, callback)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
_this.files.insertOne(filesDoc, getWriteOptions(_this), function(error) {
|
||
|
if (error) {
|
||
|
return __handleError(_this, error, callback);
|
||
|
}
|
||
|
_this.emit('finish', filesDoc);
|
||
|
});
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function checkIndexes(_this, callback) {
|
||
|
_this.files.findOne({}, { _id: 1 }, function(error, doc) {
|
||
|
if (error) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
if (doc) {
|
||
|
return callback();
|
||
|
}
|
||
|
|
||
|
_this.files.listIndexes().toArray(function(error, indexes) {
|
||
|
if (error) {
|
||
|
// Collection doesn't exist so create index
|
||
|
if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
|
||
|
var index = { filename: 1, uploadDate: 1 };
|
||
|
_this.files.createIndex(index, { background: false }, function(error) {
|
||
|
if (error) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
checkChunksIndex(_this, callback);
|
||
|
});
|
||
|
return;
|
||
|
}
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
var hasFileIndex = false;
|
||
|
indexes.forEach(function(index) {
|
||
|
var keys = Object.keys(index.key);
|
||
|
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
|
||
|
hasFileIndex = true;
|
||
|
}
|
||
|
});
|
||
|
|
||
|
if (hasFileIndex) {
|
||
|
checkChunksIndex(_this, callback);
|
||
|
} else {
|
||
|
index = { filename: 1, uploadDate: 1 };
|
||
|
|
||
|
var indexOptions = getWriteOptions(_this);
|
||
|
|
||
|
indexOptions.background = false;
|
||
|
|
||
|
_this.files.createIndex(index, indexOptions, function(error) {
|
||
|
if (error) {
|
||
|
return callback(error);
|
||
|
}
|
||
|
|
||
|
checkChunksIndex(_this, callback);
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) {
|
||
|
var ret = {
|
||
|
_id: _id,
|
||
|
length: length,
|
||
|
chunkSize: chunkSize,
|
||
|
uploadDate: new Date(),
|
||
|
filename: filename
|
||
|
};
|
||
|
|
||
|
if (md5) {
|
||
|
ret.md5 = md5;
|
||
|
}
|
||
|
|
||
|
if (contentType) {
|
||
|
ret.contentType = contentType;
|
||
|
}
|
||
|
|
||
|
if (aliases) {
|
||
|
ret.aliases = aliases;
|
||
|
}
|
||
|
|
||
|
if (metadata) {
|
||
|
ret.metadata = metadata;
|
||
|
}
|
||
|
|
||
|
return ret;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function doWrite(_this, chunk, encoding, callback) {
|
||
|
if (checkAborted(_this, callback)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
var inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
|
||
|
|
||
|
_this.length += inputBuf.length;
|
||
|
|
||
|
// Input is small enough to fit in our buffer
|
||
|
if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
|
||
|
inputBuf.copy(_this.bufToStore, _this.pos);
|
||
|
_this.pos += inputBuf.length;
|
||
|
|
||
|
callback && callback();
|
||
|
|
||
|
// Note that we reverse the typical semantics of write's return value
|
||
|
// to be compatible with node's `.pipe()` function.
|
||
|
// True means client can keep writing.
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
// Otherwise, buffer is too big for current chunk, so we need to flush
|
||
|
// to MongoDB.
|
||
|
var inputBufRemaining = inputBuf.length;
|
||
|
var spaceRemaining = _this.chunkSizeBytes - _this.pos;
|
||
|
var numToCopy = Math.min(spaceRemaining, inputBuf.length);
|
||
|
var outstandingRequests = 0;
|
||
|
while (inputBufRemaining > 0) {
|
||
|
var inputBufPos = inputBuf.length - inputBufRemaining;
|
||
|
inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy);
|
||
|
_this.pos += numToCopy;
|
||
|
spaceRemaining -= numToCopy;
|
||
|
if (spaceRemaining === 0) {
|
||
|
if (_this.md5) {
|
||
|
_this.md5.update(_this.bufToStore);
|
||
|
}
|
||
|
var doc = createChunkDoc(_this.id, _this.n, Buffer.from(_this.bufToStore));
|
||
|
++_this.state.outstandingRequests;
|
||
|
++outstandingRequests;
|
||
|
|
||
|
if (checkAborted(_this, callback)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
_this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
|
||
|
if (error) {
|
||
|
return __handleError(_this, error);
|
||
|
}
|
||
|
--_this.state.outstandingRequests;
|
||
|
--outstandingRequests;
|
||
|
|
||
|
if (!outstandingRequests) {
|
||
|
_this.emit('drain', doc);
|
||
|
callback && callback();
|
||
|
checkDone(_this);
|
||
|
}
|
||
|
});
|
||
|
|
||
|
spaceRemaining = _this.chunkSizeBytes;
|
||
|
_this.pos = 0;
|
||
|
++_this.n;
|
||
|
}
|
||
|
inputBufRemaining -= numToCopy;
|
||
|
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
|
||
|
}
|
||
|
|
||
|
// Note that we reverse the typical semantics of write's return value
|
||
|
// to be compatible with node's `.pipe()` function.
|
||
|
// False means the client should wait for the 'drain' event.
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function getWriteOptions(_this) {
|
||
|
var obj = {};
|
||
|
if (_this.options.writeConcern) {
|
||
|
obj.w = _this.options.writeConcern.w;
|
||
|
obj.wtimeout = _this.options.writeConcern.wtimeout;
|
||
|
obj.j = _this.options.writeConcern.j;
|
||
|
}
|
||
|
return obj;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function waitForIndexes(_this, callback) {
|
||
|
if (_this.bucket.s.checkedIndexes) {
|
||
|
return callback(false);
|
||
|
}
|
||
|
|
||
|
_this.bucket.once('index', function() {
|
||
|
callback(true);
|
||
|
});
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function writeRemnant(_this, callback) {
|
||
|
// Buffer is empty, so don't bother to insert
|
||
|
if (_this.pos === 0) {
|
||
|
return checkDone(_this, callback);
|
||
|
}
|
||
|
|
||
|
++_this.state.outstandingRequests;
|
||
|
|
||
|
// Create a new buffer to make sure the buffer isn't bigger than it needs
|
||
|
// to be.
|
||
|
var remnant = Buffer.alloc(_this.pos);
|
||
|
_this.bufToStore.copy(remnant, 0, 0, _this.pos);
|
||
|
if (_this.md5) {
|
||
|
_this.md5.update(remnant);
|
||
|
}
|
||
|
var doc = createChunkDoc(_this.id, _this.n, remnant);
|
||
|
|
||
|
// If the stream was aborted, do not write remnant
|
||
|
if (checkAborted(_this, callback)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
_this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
|
||
|
if (error) {
|
||
|
return __handleError(_this, error);
|
||
|
}
|
||
|
--_this.state.outstandingRequests;
|
||
|
checkDone(_this);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @ignore
|
||
|
*/
|
||
|
|
||
|
function checkAborted(_this, callback) {
|
||
|
if (_this.state.aborted) {
|
||
|
if (typeof callback === 'function') {
|
||
|
callback(new Error('this stream has been aborted'));
|
||
|
}
|
||
|
return true;
|
||
|
}
|
||
|
return false;
|
||
|
}
|