Skip to content

Commit

Permalink
Make DocFetcher more async.
Browse files Browse the repository at this point in the history
This should use fewer fibers. nim, can you benchmark?
  • Loading branch information
glasser committed Dec 3, 2013
1 parent 81c23f4 commit 74b4bd2
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 84 deletions.
65 changes: 32 additions & 33 deletions packages/mongo-livedata/doc_fetcher.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');

DocFetcher = function (mongoConnection) {
var self = this;
self._mongoConnection = mongoConnection;
// Map from cache key -> [Future]
self._futuresForCacheKey = {};
// Map from cache key -> [callback]
self._callbacksForCacheKey = {};
};

_.extend(DocFetcher.prototype, {
// Fetches document "id" from collectionName, returning it or null if not
// found. Throws other errors. Can yield.
// found.
//
// If you make multiple calls to fetch() with the same cacheKey (a string),
// DocFetcher may assume that they all return the same document. (It does
// not check to see if collectionName/id match.)
fetch: function (collectionName, id, cacheKey) {
fetch: function (collectionName, id, cacheKey, callback) {
var self = this;

check(collectionName, String);
Expand All @@ -23,38 +24,36 @@ _.extend(DocFetcher.prototype, {

// If there's already an in-progress fetch for this cache key, yield until
// it's done and return whatever it returns.
if (_.has(self._futuresForCacheKey, cacheKey)) {
var f = new Future;
self._futuresForCacheKey[cacheKey].push(f);
return f.wait();
if (_.has(self._callbacksForCacheKey, cacheKey)) {
self._callbacksForCacheKey[cacheKey].push(callback);
return;
}

var futures = self._futuresForCacheKey[cacheKey] = [];
var callbacks = self._callbacksForCacheKey[cacheKey] = [callback];

try {
var doc = self._mongoConnection.findOne(
collectionName, {_id: id}) || null;
// Return doc to all fibers that are blocking on us. Note that this array
// can continue to grow during calls to Future.return.
while (!_.isEmpty(futures)) {
// Clone the document so that the various calls to fetch don't return
// objects that are intertwingled with each other. Clone before popping
// the future, so that if clone throws, the error gets thrown to the
// next future instead of that fiber hanging.
var clonedDoc = EJSON.clone(doc);
futures.pop().return(clonedDoc);
Fiber(function () {
try {
var doc = self._mongoConnection.findOne(
collectionName, {_id: id}) || null;
// Return doc to all relevant callbacks. Note that this array can
// continue to grow during callback excecution.
while (!_.isEmpty(callbacks)) {
// Clone the document so that the various calls to fetch don't return
// objects that are intertwingled with each other. Clone before
// popping the future, so that if clone throws, the error gets passed
// to the next callback.
var clonedDoc = EJSON.clone(doc);
callbacks.pop()(null, clonedDoc);
}
} catch (e) {
while (!_.isEmpty(callbacks)) {
callbacks.pop()(e);
}
} finally {
// XXX consider keeping the doc around for a period of time before
// removing from the cache
delete self._callbacksForCacheKey[cacheKey];
}
} catch (e) {
while (!_.isEmpty(futures)) {
futures.pop().throw(e);
}
throw e;
} finally {
// XXX consider keeping the doc around for a period of time before
// removing from the cache
delete self._futuresForCacheKey[cacheKey];
}

return doc;
}).run();
}
});
62 changes: 31 additions & 31 deletions packages/mongo-livedata/doc_fetcher_tests.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');

Tinytest.add("mongo-livedata - doc fetcher", function (test) {
var collName = "docfetcher-" + Random.id();
var collection = new Meteor.Collection(collName);
var id1 = collection.insert({x: 1});
var id2 = collection.insert({y: 2});
testAsyncMulti("mongo-livedata - doc fetcher", [
function (test, expect) {
var self = this;
var collName = "docfetcher-" + Random.id();
var collection = new Meteor.Collection(collName);
var id1 = collection.insert({x: 1});
var id2 = collection.insert({y: 2});

var fetcher = new MongoTest.DocFetcher(
MongoInternals.defaultRemoteCollectionDriver().mongo);
var fetcher = new MongoTest.DocFetcher(
MongoInternals.defaultRemoteCollectionDriver().mongo);

// Test basic operation.
test.equal(fetcher.fetch(collName, id1, Random.id()),
{_id: id1, x: 1});
test.equal(fetcher.fetch(collName, "nonexistent!", Random.id()), null);
// Test basic operation.
fetcher.fetch(collName, id1, Random.id(), expect(null, {_id: id1, x: 1}));
fetcher.fetch(collName, "nonexistent!", Random.id(), expect(null, null));

var future = new Future;
var fetched = false;
var cacheKey = Random.id();
Fiber(function () {
var d = fetcher.fetch(collName, id2, cacheKey);
fetched = true;
future.return(d);
}).run();
// The fetcher yields:
test.isFalse(fetched);
var fetched = false;
var cacheKey = Random.id();
var expected = {_id: id2, y: 2};
fetcher.fetch(collName, id2, cacheKey, expect(function (e, d) {
fetched = true;
test.isFalse(e);
test.equal(d, expected);
}));
// The fetcher yields.
test.isFalse(fetched);

// Now ask for another document with the same cache key. Because a fetch for
// that cache key is in flight, we will get the other fetch's document, not
// this random document.
var doc2a = fetcher.fetch(collName, Random.id(), cacheKey);
// Finally, wait for the original fetch to return:
var doc2b = future.wait();
var expected = {_id: id2, y: 2};
test.equal(doc2a, expected);
test.equal(doc2b, expected);
});
// Now ask for another document with the same cache key. Because a fetch for
// that cache key is in flight, we will get the other fetch's document, not
// this random document.
fetcher.fetch(collName, Random.id(), cacheKey, expect(function (e, d) {
test.isFalse(e);
test.equal(d, expected);
}));
}
]);
43 changes: 23 additions & 20 deletions packages/mongo-livedata/oplog.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,31 @@ MongoConnection.prototype._observeChangesWithOplog = function (
if (phase !== PHASE.FETCHING)
throw new Error("Surprising phase in fetchModifiedDocuments: " + phase);

var futures = [];
currentlyFetching = needToFetch;
needToFetch = new IdMap;
currentlyFetching.each(function (cacheKey, id) {
// Run each until they yield. This implies that needToFetch will not be
// updated during this loop.
Fiber(function () {
var f = new Future;
futures.push(f);
var doc = self._docFetcher.fetch(cursorDescription.collectionName, id,
cacheKey);
if (!stopped)
handleDoc(id, doc);
f.return();
}).run();
});
Future.wait(futures);
// Throw if any throw.
// XXX this means the observe will now be stalled
_.each(futures, function (f) {
f.get();
});
var waiting = 0;
var error = null;
var fut = new Future;
Fiber(function () {
currentlyFetching.each(function (cacheKey, id) {
// currentlyFetching will not be updated during this loop.
waiting++;
self._docFetcher.fetch(cursorDescription.collectionName, id, cacheKey, function (err, doc) {
if (err) {
if (!error)
error = err;
} else if (!stopped) {
handleDoc(id, doc);
}
waiting--;
if (waiting == 0)
fut.return();
});
});
}).run();
fut.wait();
if (error)
throw error;
currentlyFetching = new IdMap;
}
beSteady();
Expand Down

0 comments on commit 74b4bd2

Please sign in to comment.