Skip to content

Commit

Permalink
getCandidates is now async
Browse files Browse the repository at this point in the history
  • Loading branch information
louischatriot committed Jan 19, 2016
1 parent 6502443 commit 645d875
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 184 deletions.
111 changes: 59 additions & 52 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,76 +103,83 @@ Cursor.prototype.project = function (candidates) {
*
* @param {Function} callback - Signature: err, results
*/
Cursor.prototype._exec = function(callback) {
var candidates = this.db.getCandidates(this.query)
, res = [], added = 0, skipped = 0, self = this
Cursor.prototype._exec = function(_callback) {
var res = [], added = 0, skipped = 0, self = this
, error = null
, i, keys, key
;

try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], this.query)) {
// If a sort is defined, wait for the results to be sorted before applying limit and skip
if (!this._sort) {
if (this._skip && this._skip > skipped) {
skipped += 1;
function callback (error, res) {
if (self.execFn) {
return self.execFn(error, res, _callback);
} else {
return _callback(error, res);
}
}

this.db.getCandidates(this.query, function (err, candidates) {
if (err) { return callback(err); }

try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], self.query)) {
// If a sort is defined, wait for the results to be sorted before applying limit and skip
if (!self._sort) {
if (self._skip && self._skip > skipped) {
skipped += 1;
} else {
res.push(candidates[i]);
added += 1;
if (self._limit && self._limit <= added) { break; }
}
} else {
res.push(candidates[i]);
added += 1;
if (this._limit && this._limit <= added) { break; }
}
} else {
res.push(candidates[i]);
}
}
} catch (err) {
return callback(err);
}
} catch (err) {
return callback(err);
}

// Apply all sorts
if (this._sort) {
keys = Object.keys(this._sort);
// Apply all sorts
if (self._sort) {
keys = Object.keys(self._sort);

// Sorting
var criteria = [];
for (i = 0; i < keys.length; i++) {
key = keys[i];
criteria.push({ key: key, direction: self._sort[key] });
}
res.sort(function(a, b) {
var criterion, compare, i;
for (i = 0; i < criteria.length; i++) {
criterion = criteria[i];
compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings);
if (compare !== 0) {
return compare;
}
// Sorting
var criteria = [];
for (i = 0; i < keys.length; i++) {
key = keys[i];
criteria.push({ key: key, direction: self._sort[key] });
}
return 0;
});
res.sort(function(a, b) {
var criterion, compare, i;
for (i = 0; i < criteria.length; i++) {
criterion = criteria[i];
compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings);
if (compare !== 0) {
return compare;
}
}
return 0;
});

// Applying limit and skip
var limit = this._limit || res.length
, skip = this._skip || 0;
// Applying limit and skip
var limit = self._limit || res.length
, skip = self._skip || 0;

res = res.slice(skip, skip + limit);
}
res = res.slice(skip, skip + limit);
}

// Apply projection
try {
res = this.project(res);
} catch (e) {
error = e;
res = undefined;
}
// Apply projection
try {
res = self.project(res);
} catch (e) {
error = e;
res = undefined;
}

if (this.execFn) {
return this.execFn(error, res, callback);
} else {
return callback(error, res);
}
});
};

Cursor.prototype.exec = function () {
Expand Down
188 changes: 102 additions & 86 deletions lib/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Datastore.prototype.resetIndexes = function (newData) {
* @param {String} options.fieldName
* @param {Boolean} options.unique
* @param {Boolean} options.sparse
* @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index
* @param {Function} cb Optional callback, signature: err
*/
Datastore.prototype.ensureIndex = function (options, cb) {
Expand Down Expand Up @@ -243,50 +244,63 @@ Datastore.prototype.updateIndexes = function (oldDoc, newDoc) {
* One way to make it better would be to enable the use of multiple indexes if the first usable index
* returns too much data. I may do it in the future.
*
* TODO: needs to be moved to the Cursor module
* Returned candidates will be scanned to find and remove all expired documents
*
* @param {Query} query
* @param {Function} callback Signature err, docs
*/
Datastore.prototype.getCandidates = function (query) {
Datastore.prototype.getCandidates = function (query, callback) {
var indexNames = Object.keys(this.indexes)
, self = this
, usableQueryKeys;

// For a basic match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) {
usableQueryKeys.push(k);

async.waterfall([
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
function (cb) {
// For a basic match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]);
}

// For a $in match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && query[k].hasOwnProperty('$in')) {
usableQueryKeys.push(k);
// For a $in match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && query[k].hasOwnProperty('$in')) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in);
}

// For a comparison match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) {
usableQueryKeys.push(k);
// For a comparison match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]);
}

// By default, return all the DB data
return this.getAllData();
// By default, return all the DB data
return cb(null, self.getAllData());
}
// STEP 2: remove all expired documents
, function (docs) {
return callback(null, docs);
}]);
};


Expand Down Expand Up @@ -548,48 +562,49 @@ Datastore.prototype._update = function (query, updateQuery, options, cb) {
});
}
, function () { // Perform the update
var modifiedDoc
, candidates = self.getCandidates(query)
, modifications = []
;
var modifiedDoc , modifications = [];

// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], query) && (multi || numReplaced === 0)) {
numReplaced += 1;
modifiedDoc = model.modify(candidates[i], updateQuery);
if (self.timestampData) { modifiedDoc.updatedAt = new Date(); }
modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc });
self.getCandidates(query, function (err, candidates) {
if (err) { return callback(err); }

// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], query) && (multi || numReplaced === 0)) {
numReplaced += 1;
modifiedDoc = model.modify(candidates[i], updateQuery);
if (self.timestampData) { modifiedDoc.updatedAt = new Date(); }
modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc });
}
}
} catch (err) {
return callback(err);
}
} catch (err) {
return callback(err);
}

// Change the docs in memory
try {
self.updateIndexes(modifications);
} catch (err) {
return callback(err);
}

// Update the datafile
var updatedDocs = _.pluck(modifications, 'newDoc');
self.persistence.persistNewState(updatedDocs, function (err) {
if (err) { return callback(err); }
if (!options.returnUpdatedDocs) {
return callback(null, numReplaced);
} else {
var updatedDocsDC = [];
updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); });
return callback(null, numReplaced, updatedDocsDC);
// Change the docs in memory
try {
self.updateIndexes(modifications);
} catch (err) {
return callback(err);
}

// Update the datafile
var updatedDocs = _.pluck(modifications, 'newDoc');
self.persistence.persistNewState(updatedDocs, function (err) {
if (err) { return callback(err); }
if (!options.returnUpdatedDocs) {
return callback(null, numReplaced);
} else {
var updatedDocsDC = [];
updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); });
return callback(null, numReplaced, updatedDocsDC);
}
});
});
}
]);
}]);
};

Datastore.prototype.update = function () {
this.executor.push({ this: this, fn: this._update, arguments: arguments });
};
Expand All @@ -607,32 +622,33 @@ Datastore.prototype.update = function () {
*/
Datastore.prototype._remove = function (query, options, cb) {
var callback
, self = this
, numRemoved = 0
, multi
, removedDocs = []
, candidates = this.getCandidates(query)
, self = this, numRemoved = 0, removedDocs = [], multi
;

if (typeof options === 'function') { cb = options; options = {}; }
callback = cb || function () {};
multi = options.multi !== undefined ? options.multi : false;

try {
candidates.forEach(function (d) {
if (model.match(d, query) && (multi || numRemoved === 0)) {
numRemoved += 1;
removedDocs.push({ $$deleted: true, _id: d._id });
self.removeFromIndexes(d);
}
});
} catch (err) { return callback(err); }

self.persistence.persistNewState(removedDocs, function (err) {
this.getCandidates(query, function (err, candidates) {
if (err) { return callback(err); }
return callback(null, numRemoved);

try {
candidates.forEach(function (d) {
if (model.match(d, query) && (multi || numRemoved === 0)) {
numRemoved += 1;
removedDocs.push({ $$deleted: true, _id: d._id });
self.removeFromIndexes(d);
}
});
} catch (err) { return callback(err); }

self.persistence.persistNewState(removedDocs, function (err) {
if (err) { return callback(err); }
return callback(null, numRemoved);
});
});
};

Datastore.prototype.remove = function () {
this.executor.push({ this: this, fn: this._remove, arguments: arguments });
};
Expand Down
Loading

0 comments on commit 645d875

Please sign in to comment.