diff --git a/packages/mongo-livedata/doc_fetcher.js b/packages/mongo-livedata/doc_fetcher.js index 86f7e82cf70..fbc73bd3f53 100644 --- a/packages/mongo-livedata/doc_fetcher.js +++ b/packages/mongo-livedata/doc_fetcher.js @@ -15,6 +15,9 @@ _.extend(DocFetcher.prototype, { // If you make multiple calls to fetch() with the same cacheKey (a string), // DocFetcher may assume that they all return the same document. (It does // not check to see if collectionName/id match.) + // + // You may assume that callback is never called synchronously (and in fact + // OplogObserveDriver does so). fetch: function (collectionName, id, cacheKey, callback) { var self = this; diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index d42eae9451c..e72895bdc70 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -2,7 +2,7 @@ var Fiber = Npm.require('fibers'); var Future = Npm.require('fibers/future'); var PHASE = { - INITIALIZING: 1, + QUERYING: 1, FETCHING: 2, STEADY: 3 }; @@ -29,7 +29,7 @@ OplogObserveDriver = function (options) { Package.facts && Package.facts.Facts.incrementServerFact( "mongo-livedata", "oplog-observers", 1); - self._phase = PHASE.INITIALIZING; + self._phase = PHASE.QUERYING; self._published = new LocalCollection._IdMap; var selector = self._cursorDescription.selector; @@ -61,8 +61,8 @@ OplogObserveDriver = function (options) { }); } else { // All other operators should be handled depending on phase - if (self._phase === PHASE.INITIALIZING) - self._handleOplogEntryInitializing(op); + if (self._phase === PHASE.QUERYING) + self._handleOplogEntryQuerying(op); else self._handleOplogEntrySteadyOrFetching(op); } @@ -125,11 +125,18 @@ _.extend(OplogObserveDriver.prototype, { self._published.remove(id); self._multiplexer.removed(id); }, - _handleDoc: function (id, newDoc) { + _handleDoc: function (id, newDoc, mustMatchNow) { var self = this; newDoc = _.clone(newDoc); + var matchesNow = newDoc && self._selectorFn(newDoc); + if (mustMatchNow && !matchesNow) { + throw Error("expected " + EJSON.stringify(newDoc) + " to match " + + EJSON.stringify(self._cursorDescription)); + } + var matchedBefore = self._published.has(id); + if (matchesNow && !matchedBefore) { self._add(newDoc); } else if (matchedBefore && !matchesNow) { @@ -168,17 +175,25 @@ _.extend(OplogObserveDriver.prototype, { if (err) { if (!anyError) anyError = err; - } else if (!self._stopped) { + } else if (!self._stopped && self._phase === PHASE.FETCHING) { + // We re-check the phase in case we've had an explicit _pollQuery + // call which pulls us out of FETCHING and back into QUERYING. self._handleDoc(id, doc); } waiting--; - if (waiting == 0) + // Because fetch() never calls its callback synchronously, this is + // safe (ie, we won't call fut.return() before the forEach is done). + if (waiting === 0) fut.return(); }); }); fut.wait(); + // XXX do this even if we've switched to PHASE.QUERYING? if (anyError) throw anyError; + // Exit now if we've had a _pollQuery call. + if (self._phase === PHASE.QUERYING) + return; self._currentlyFetching = null; } self._beSteady(); @@ -194,7 +209,7 @@ _.extend(OplogObserveDriver.prototype, { }); }); }, - _handleOplogEntryInitializing: function (op) { + _handleOplogEntryQuerying: function (op) { var self = this; self._needToFetch.set(idForOp(op), op.ts.toString()); }, @@ -251,7 +266,7 @@ _.extend(OplogObserveDriver.prototype, { if (self._stopped) throw new Error("oplog stopped surprisingly early"); - var initialCursor = new Cursor(self._mongoHandle, self._cursorDescription); + var initialCursor = self._cursorForQuery(); initialCursor.forEach(function (initialDoc) { self._add(initialDoc); }); @@ -261,13 +276,52 @@ _.extend(OplogObserveDriver.prototype, { // stop() to be called.) self._multiplexer.ready(); + self._doneQuerying(); + }, + + // In various circumstances, we may just want to stop processing the oplog and + // re-run the initial query, just as if we were a PollingObserveDriver. + _pollQuery: function () { + var self = this; + + // XXX maybe this should just return? + if (self._stopped) + throw new Error("can't re-poll a stopped query"); + + // XXX maybe this should either just return, or queue up another poll + // somehow? + if (self._phase === PHASE.QUERYING) + throw new Error("can't re-poll re-entrantly"); + + if (self._phase === PHASE.FETCHING) { + // Yay, we get to forget about all the things we thought we had to fetch. + self._needToFetch = new LocalCollection._IdMap; + self._currentlyFetching = null; + } + self._phase = PHASE.QUERYING; + + // subtle note: _published does not contain _id fields, but newResults does + var newResults = new LocalCollection._IdMap; + var cursor = self._cursorForQuery(); + cursor.forEach(function (doc) { + newResults.set(doc._id, doc); + }); + + self._publishNewResults(newResults); + + self._doneQuerying(); + }, + + _doneQuerying: function () { + var self = this; + if (self._stopped) return; self._mongoHandle._oplogHandle.waitUntilCaughtUp(); if (self._stopped) return; - if (self._phase !== PHASE.INITIALIZING) + if (self._phase !== PHASE.QUERYING) throw Error("Phase unexpectedly " + self._phase); if (self._needToFetch.empty()) { @@ -276,6 +330,42 @@ _.extend(OplogObserveDriver.prototype, { self._fetchModifiedDocuments(); } }, + + _cursorForQuery: function () { + var self = this; + // XXX this is WRONG we need to use the shared projection!! + return new Cursor(self._mongoHandle, self._cursorDescription); + }, + + + // Replace self._published with newResults (both are IdMaps), invoking observe + // callbacks on the multiplexer. + // + // XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We + // should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b) + // Rewrite diff.js to use these classes instead of arrays and objects. + _publishNewResults: function (newResults) { + var self = this; + + // First remove anything that's gone. Be careful not to modify + // self._published while iterating over it. + var idsToRemove = []; + self._published.each(function (doc, id) { + if (!newResults.has(id)) + idsToRemove.push(id); + }); + _.each(idsToRemove, function (id) { + self._remove(id); + }); + + // Now do adds and changes. + newResults.each(function (doc, id) { + // "true" here means to throw if we think this doc doesn't match the + // selector. + self._handleDoc(id, doc, true); + }); + }, + // This stop function is invoked from the onStop of the ObserveMultiplexer, so // it shouldn't actually be possible to call it until the multiplexer is // ready. @@ -358,5 +448,4 @@ OplogObserveDriver.cursorSupported = function (cursorDescription) { }); }; - MongoTest.OplogObserveDriver = OplogObserveDriver;