diff --git a/packages/mongo-livedata/doc_fetcher.js b/packages/mongo-livedata/doc_fetcher.js index a33eabc7553..cdcbfb9cd23 100644 --- a/packages/mongo-livedata/doc_fetcher.js +++ b/packages/mongo-livedata/doc_fetcher.js @@ -1,20 +1,21 @@ +var Fiber = Npm.require('fibers'); var Future = Npm.require('fibers/future'); DocFetcher = function (mongoConnection) { var self = this; self._mongoConnection = mongoConnection; - // Map from cache key -> [Future] - self._futuresForCacheKey = {}; + // Map from cache key -> [callback] + self._callbacksForCacheKey = {}; }; _.extend(DocFetcher.prototype, { // Fetches document "id" from collectionName, returning it or null if not - // found. Throws other errors. Can yield. + // found. // // If you make multiple calls to fetch() with the same cacheKey (a string), // DocFetcher may assume that they all return the same document. (It does // not check to see if collectionName/id match.) - fetch: function (collectionName, id, cacheKey) { + fetch: function (collectionName, id, cacheKey, callback) { var self = this; check(collectionName, String); @@ -23,38 +24,36 @@ _.extend(DocFetcher.prototype, { // If there's already an in-progress fetch for this cache key, yield until // it's done and return whatever it returns. - if (_.has(self._futuresForCacheKey, cacheKey)) { - var f = new Future; - self._futuresForCacheKey[cacheKey].push(f); - return f.wait(); + if (_.has(self._callbacksForCacheKey, cacheKey)) { + self._callbacksForCacheKey[cacheKey].push(callback); + return; } - var futures = self._futuresForCacheKey[cacheKey] = []; + var callbacks = self._callbacksForCacheKey[cacheKey] = [callback]; - try { - var doc = self._mongoConnection.findOne( - collectionName, {_id: id}) || null; - // Return doc to all fibers that are blocking on us. Note that this array - // can continue to grow during calls to Future.return. - while (!_.isEmpty(futures)) { - // Clone the document so that the various calls to fetch don't return - // objects that are intertwingled with each other. Clone before popping - // the future, so that if clone throws, the error gets thrown to the - // next future instead of that fiber hanging. - var clonedDoc = EJSON.clone(doc); - futures.pop().return(clonedDoc); + Fiber(function () { + try { + var doc = self._mongoConnection.findOne( + collectionName, {_id: id}) || null; + // Return doc to all relevant callbacks. Note that this array can + // continue to grow during callback excecution. + while (!_.isEmpty(callbacks)) { + // Clone the document so that the various calls to fetch don't return + // objects that are intertwingled with each other. Clone before + // popping the future, so that if clone throws, the error gets passed + // to the next callback. + var clonedDoc = EJSON.clone(doc); + callbacks.pop()(null, clonedDoc); + } + } catch (e) { + while (!_.isEmpty(callbacks)) { + callbacks.pop()(e); + } + } finally { + // XXX consider keeping the doc around for a period of time before + // removing from the cache + delete self._callbacksForCacheKey[cacheKey]; } - } catch (e) { - while (!_.isEmpty(futures)) { - futures.pop().throw(e); - } - throw e; - } finally { - // XXX consider keeping the doc around for a period of time before - // removing from the cache - delete self._futuresForCacheKey[cacheKey]; - } - - return doc; + }).run(); } }); diff --git a/packages/mongo-livedata/doc_fetcher_tests.js b/packages/mongo-livedata/doc_fetcher_tests.js index cf4e05a8d0c..c2affe7b17c 100644 --- a/packages/mongo-livedata/doc_fetcher_tests.js +++ b/packages/mongo-livedata/doc_fetcher_tests.js @@ -1,38 +1,38 @@ var Fiber = Npm.require('fibers'); var Future = Npm.require('fibers/future'); -Tinytest.add("mongo-livedata - doc fetcher", function (test) { - var collName = "docfetcher-" + Random.id(); - var collection = new Meteor.Collection(collName); - var id1 = collection.insert({x: 1}); - var id2 = collection.insert({y: 2}); +testAsyncMulti("mongo-livedata - doc fetcher", [ + function (test, expect) { + var self = this; + var collName = "docfetcher-" + Random.id(); + var collection = new Meteor.Collection(collName); + var id1 = collection.insert({x: 1}); + var id2 = collection.insert({y: 2}); - var fetcher = new MongoTest.DocFetcher( - MongoInternals.defaultRemoteCollectionDriver().mongo); + var fetcher = new MongoTest.DocFetcher( + MongoInternals.defaultRemoteCollectionDriver().mongo); - // Test basic operation. - test.equal(fetcher.fetch(collName, id1, Random.id()), - {_id: id1, x: 1}); - test.equal(fetcher.fetch(collName, "nonexistent!", Random.id()), null); + // Test basic operation. + fetcher.fetch(collName, id1, Random.id(), expect(null, {_id: id1, x: 1})); + fetcher.fetch(collName, "nonexistent!", Random.id(), expect(null, null)); - var future = new Future; - var fetched = false; - var cacheKey = Random.id(); - Fiber(function () { - var d = fetcher.fetch(collName, id2, cacheKey); - fetched = true; - future.return(d); - }).run(); - // The fetcher yields: - test.isFalse(fetched); + var fetched = false; + var cacheKey = Random.id(); + var expected = {_id: id2, y: 2}; + fetcher.fetch(collName, id2, cacheKey, expect(function (e, d) { + fetched = true; + test.isFalse(e); + test.equal(d, expected); + })); + // The fetcher yields. + test.isFalse(fetched); - // Now ask for another document with the same cache key. Because a fetch for - // that cache key is in flight, we will get the other fetch's document, not - // this random document. - var doc2a = fetcher.fetch(collName, Random.id(), cacheKey); - // Finally, wait for the original fetch to return: - var doc2b = future.wait(); - var expected = {_id: id2, y: 2}; - test.equal(doc2a, expected); - test.equal(doc2b, expected); -}); + // Now ask for another document with the same cache key. Because a fetch for + // that cache key is in flight, we will get the other fetch's document, not + // this random document. + fetcher.fetch(collName, Random.id(), cacheKey, expect(function (e, d) { + test.isFalse(e); + test.equal(d, expected); + })); + } +]); diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index b6c00717e7e..37e200b6dab 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -81,28 +81,31 @@ MongoConnection.prototype._observeChangesWithOplog = function ( if (phase !== PHASE.FETCHING) throw new Error("Surprising phase in fetchModifiedDocuments: " + phase); - var futures = []; currentlyFetching = needToFetch; needToFetch = new IdMap; - currentlyFetching.each(function (cacheKey, id) { - // Run each until they yield. This implies that needToFetch will not be - // updated during this loop. - Fiber(function () { - var f = new Future; - futures.push(f); - var doc = self._docFetcher.fetch(cursorDescription.collectionName, id, - cacheKey); - if (!stopped) - handleDoc(id, doc); - f.return(); - }).run(); - }); - Future.wait(futures); - // Throw if any throw. - // XXX this means the observe will now be stalled - _.each(futures, function (f) { - f.get(); - }); + var waiting = 0; + var error = null; + var fut = new Future; + Fiber(function () { + currentlyFetching.each(function (cacheKey, id) { + // currentlyFetching will not be updated during this loop. + waiting++; + self._docFetcher.fetch(cursorDescription.collectionName, id, cacheKey, function (err, doc) { + if (err) { + if (!error) + error = err; + } else if (!stopped) { + handleDoc(id, doc); + } + waiting--; + if (waiting == 0) + fut.return(); + }); + }); + }).run(); + fut.wait(); + if (error) + throw error; currentlyFetching = new IdMap; } beSteady();