You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
105 lines
3.1 KiB
105 lines
3.1 KiB
4 years ago
|
'use strict';
|
||
|
|
||
|
const applyRetryableWrites = require('../utils').applyRetryableWrites;
|
||
|
const applyWriteConcern = require('../utils').applyWriteConcern;
|
||
|
const MongoError = require('../core').MongoError;
|
||
|
const OperationBase = require('./operation').OperationBase;
|
||
|
|
||
|
class BulkWriteOperation extends OperationBase {
|
||
|
constructor(collection, operations, options) {
|
||
|
super(options);
|
||
|
|
||
|
this.collection = collection;
|
||
|
this.operations = operations;
|
||
|
}
|
||
|
|
||
|
execute(callback) {
|
||
|
const coll = this.collection;
|
||
|
const operations = this.operations;
|
||
|
let options = this.options;
|
||
|
|
||
|
// Add ignoreUndfined
|
||
|
if (coll.s.options.ignoreUndefined) {
|
||
|
options = Object.assign({}, options);
|
||
|
options.ignoreUndefined = coll.s.options.ignoreUndefined;
|
||
|
}
|
||
|
|
||
|
// Create the bulk operation
|
||
|
const bulk =
|
||
|
options.ordered === true || options.ordered == null
|
||
|
? coll.initializeOrderedBulkOp(options)
|
||
|
: coll.initializeUnorderedBulkOp(options);
|
||
|
|
||
|
// Do we have a collation
|
||
|
let collation = false;
|
||
|
|
||
|
// for each op go through and add to the bulk
|
||
|
try {
|
||
|
for (let i = 0; i < operations.length; i++) {
|
||
|
// Get the operation type
|
||
|
const key = Object.keys(operations[i])[0];
|
||
|
// Check if we have a collation
|
||
|
if (operations[i][key].collation) {
|
||
|
collation = true;
|
||
|
}
|
||
|
|
||
|
// Pass to the raw bulk
|
||
|
bulk.raw(operations[i]);
|
||
|
}
|
||
|
} catch (err) {
|
||
|
return callback(err, null);
|
||
|
}
|
||
|
|
||
|
// Final options for retryable writes and write concern
|
||
|
let finalOptions = Object.assign({}, options);
|
||
|
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
|
||
|
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
|
||
|
|
||
|
const writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
|
||
|
const capabilities = coll.s.topology.capabilities();
|
||
|
|
||
|
// Did the user pass in a collation, check if our write server supports it
|
||
|
if (collation && capabilities && !capabilities.commandsTakeCollation) {
|
||
|
return callback(new MongoError('server/primary/mongos does not support collation'));
|
||
|
}
|
||
|
|
||
|
// Execute the bulk
|
||
|
bulk.execute(writeCon, finalOptions, (err, r) => {
|
||
|
// We have connection level error
|
||
|
if (!r && err) {
|
||
|
return callback(err, null);
|
||
|
}
|
||
|
|
||
|
r.insertedCount = r.nInserted;
|
||
|
r.matchedCount = r.nMatched;
|
||
|
r.modifiedCount = r.nModified || 0;
|
||
|
r.deletedCount = r.nRemoved;
|
||
|
r.upsertedCount = r.getUpsertedIds().length;
|
||
|
r.upsertedIds = {};
|
||
|
r.insertedIds = {};
|
||
|
|
||
|
// Update the n
|
||
|
r.n = r.insertedCount;
|
||
|
|
||
|
// Inserted documents
|
||
|
const inserted = r.getInsertedIds();
|
||
|
// Map inserted ids
|
||
|
for (let i = 0; i < inserted.length; i++) {
|
||
|
r.insertedIds[inserted[i].index] = inserted[i]._id;
|
||
|
}
|
||
|
|
||
|
// Upserted documents
|
||
|
const upserted = r.getUpsertedIds();
|
||
|
// Map upserted ids
|
||
|
for (let i = 0; i < upserted.length; i++) {
|
||
|
r.upsertedIds[upserted[i].index] = upserted[i]._id;
|
||
|
}
|
||
|
|
||
|
// Return the results
|
||
|
callback(null, r);
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = BulkWriteOperation;
|