Skip to content

Commit

Permalink
Implement _pollQuery. Don't call it.
Browse files Browse the repository at this point in the history
  • Loading branch information
glasser committed Dec 11, 2013
1 parent 89ec9dd commit b611db5
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 11 deletions.
3 changes: 3 additions & 0 deletions packages/mongo-livedata/doc_fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ _.extend(DocFetcher.prototype, {
// If you make multiple calls to fetch() with the same cacheKey (a string),
// DocFetcher may assume that they all return the same document. (It does
// not check to see if collectionName/id match.)
//
// You may assume that callback is never called synchronously (and in fact
// OplogObserveDriver does so).
fetch: function (collectionName, id, cacheKey, callback) {
var self = this;

Expand Down
111 changes: 100 additions & 11 deletions packages/mongo-livedata/oplog_observe_driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');

var PHASE = {
INITIALIZING: 1,
QUERYING: 1,
FETCHING: 2,
STEADY: 3
};
Expand All @@ -29,7 +29,7 @@ OplogObserveDriver = function (options) {
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-observers", 1);

self._phase = PHASE.INITIALIZING;
self._phase = PHASE.QUERYING;

self._published = new LocalCollection._IdMap;
var selector = self._cursorDescription.selector;
Expand Down Expand Up @@ -61,8 +61,8 @@ OplogObserveDriver = function (options) {
});
} else {
// All other operators should be handled depending on phase
if (self._phase === PHASE.INITIALIZING)
self._handleOplogEntryInitializing(op);
if (self._phase === PHASE.QUERYING)
self._handleOplogEntryQuerying(op);
else
self._handleOplogEntrySteadyOrFetching(op);
}
Expand Down Expand Up @@ -125,11 +125,18 @@ _.extend(OplogObserveDriver.prototype, {
self._published.remove(id);
self._multiplexer.removed(id);
},
_handleDoc: function (id, newDoc) {
_handleDoc: function (id, newDoc, mustMatchNow) {
var self = this;
newDoc = _.clone(newDoc);

var matchesNow = newDoc && self._selectorFn(newDoc);
if (mustMatchNow && !matchesNow) {
throw Error("expected " + EJSON.stringify(newDoc) + " to match "
+ EJSON.stringify(self._cursorDescription));
}

var matchedBefore = self._published.has(id);

if (matchesNow && !matchedBefore) {
self._add(newDoc);
} else if (matchedBefore && !matchesNow) {
Expand Down Expand Up @@ -168,17 +175,25 @@ _.extend(OplogObserveDriver.prototype, {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped) {
} else if (!self._stopped && self._phase === PHASE.FETCHING) {
// We re-check the phase in case we've had an explicit _pollQuery
// call which pulls us out of FETCHING and back into QUERYING.
self._handleDoc(id, doc);
}
waiting--;
if (waiting == 0)
// Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is done).
if (waiting === 0)
fut.return();
});
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call.
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
self._beSteady();
Expand All @@ -194,7 +209,7 @@ _.extend(OplogObserveDriver.prototype, {
});
});
},
_handleOplogEntryInitializing: function (op) {
_handleOplogEntryQuerying: function (op) {
var self = this;
self._needToFetch.set(idForOp(op), op.ts.toString());
},
Expand Down Expand Up @@ -251,7 +266,7 @@ _.extend(OplogObserveDriver.prototype, {
if (self._stopped)
throw new Error("oplog stopped surprisingly early");

var initialCursor = new Cursor(self._mongoHandle, self._cursorDescription);
var initialCursor = self._cursorForQuery();
initialCursor.forEach(function (initialDoc) {
self._add(initialDoc);
});
Expand All @@ -261,13 +276,52 @@ _.extend(OplogObserveDriver.prototype, {
// stop() to be called.)
self._multiplexer.ready();

self._doneQuerying();
},

// In various circumstances, we may just want to stop processing the oplog and
// re-run the initial query, just as if we were a PollingObserveDriver.
_pollQuery: function () {
var self = this;

// XXX maybe this should just return?
if (self._stopped)
throw new Error("can't re-poll a stopped query");

// XXX maybe this should either just return, or queue up another poll
// somehow?
if (self._phase === PHASE.QUERYING)
throw new Error("can't re-poll re-entrantly");

if (self._phase === PHASE.FETCHING) {
// Yay, we get to forget about all the things we thought we had to fetch.
self._needToFetch = new LocalCollection._IdMap;
self._currentlyFetching = null;
}
self._phase = PHASE.QUERYING;

// subtle note: _published does not contain _id fields, but newResults does
var newResults = new LocalCollection._IdMap;
var cursor = self._cursorForQuery();
cursor.forEach(function (doc) {
newResults.set(doc._id, doc);
});

self._publishNewResults(newResults);

self._doneQuerying();
},

_doneQuerying: function () {
var self = this;

if (self._stopped)
return;
self._mongoHandle._oplogHandle.waitUntilCaughtUp();

if (self._stopped)
return;
if (self._phase !== PHASE.INITIALIZING)
if (self._phase !== PHASE.QUERYING)
throw Error("Phase unexpectedly " + self._phase);

if (self._needToFetch.empty()) {
Expand All @@ -276,6 +330,42 @@ _.extend(OplogObserveDriver.prototype, {
self._fetchModifiedDocuments();
}
},

_cursorForQuery: function () {
var self = this;
// XXX this is WRONG we need to use the shared projection!!
return new Cursor(self._mongoHandle, self._cursorDescription);
},


// Replace self._published with newResults (both are IdMaps), invoking observe
// callbacks on the multiplexer.
//
// XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We
// should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b)
// Rewrite diff.js to use these classes instead of arrays and objects.
_publishNewResults: function (newResults) {
var self = this;

// First remove anything that's gone. Be careful not to modify
// self._published while iterating over it.
var idsToRemove = [];
self._published.each(function (doc, id) {
if (!newResults.has(id))
idsToRemove.push(id);
});
_.each(idsToRemove, function (id) {
self._remove(id);
});

// Now do adds and changes.
newResults.each(function (doc, id) {
// "true" here means to throw if we think this doc doesn't match the
// selector.
self._handleDoc(id, doc, true);
});
},

// This stop function is invoked from the onStop of the ObserveMultiplexer, so
// it shouldn't actually be possible to call it until the multiplexer is
// ready.
Expand Down Expand Up @@ -358,5 +448,4 @@ OplogObserveDriver.cursorSupported = function (cursorDescription) {
});
};


MongoTest.OplogObserveDriver = OplogObserveDriver;

0 comments on commit b611db5

Please sign in to comment.