forked from meteor/meteor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwritefence.js
101 lines (89 loc) · 3.01 KB
/
writefence.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var path = Npm.require('path');
var Future = Npm.require(path.join('fibers', 'future'));
// A write fence collects a group of writes, and provides a callback
// when all of the writes are fully committed and propagated (all
// observers have been notified of the write and acknowledged it.)
//
DDPServer._WriteFence = function () {
var self = this;
self.armed = false;
self.fired = false;
self.retired = false;
self.outstanding_writes = 0;
self.completion_callbacks = [];
};
// The current write fence. When there is a current write fence, code
// that writes to databases should register their writes with it using
// beginWrite().
//
DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable;
_.extend(DDPServer._WriteFence.prototype, {
// Start tracking a write, and return an object to represent it. The
// object has a single method, committed(). This method should be
// called when the write is fully committed and propagated. You can
// continue to add writes to the WriteFence up until it is triggered
// (calls its callbacks because all writes have committed.)
beginWrite: function () {
var self = this;
if (self.retired)
return { committed: function () {} };
if (self.fired)
throw new Error("fence has already activated -- too late to add writes");
self.outstanding_writes++;
var committed = false;
return {
committed: function () {
if (committed)
throw new Error("committed called twice on the same write");
committed = true;
self.outstanding_writes--;
self._maybeFire();
}
};
},
// Arm the fence. Once the fence is armed, and there are no more
// uncommitted writes, it will activate.
arm: function () {
var self = this;
if (self === DDPServer._CurrentWriteFence.get())
throw Error("Can't arm the current fence");
self.armed = true;
self._maybeFire();
},
// Register a function to be called when the fence fires.
onAllCommitted: function (func) {
var self = this;
if (self.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
self.completion_callbacks.push(func);
},
// Convenience function. Arms the fence, then blocks until it fires.
armAndWait: function () {
var self = this;
var future = new Future;
self.onAllCommitted(function () {
future['return']();
});
self.arm();
future.wait();
},
_maybeFire: function () {
var self = this;
if (self.fired)
throw new Error("write fence already activated?");
if (self.armed && !self.outstanding_writes) {
self.fired = true;
_.each(self.completion_callbacks, function (f) {f(self);});
self.completion_callbacks = [];
}
},
// Deactivate this fence so that adding more writes has no effect.
// The fence must have already fired.
retire: function () {
var self = this;
if (! self.fired)
throw new Error("Can't retire a fence that hasn't fired.");
self.retired = true;
}
});