Skip to content

Commit

Permalink
count
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 19, 2015
1 parent 13e8b54 commit 52f153f
Show file tree
Hide file tree
Showing 2 changed files with 475 additions and 0 deletions.
50 changes: 50 additions & 0 deletions src/modular/observable/count.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var bindCallback = require('../internal/bindcallback');
var tryCatch = require('../internal/trycatchutils').tryCatch;
var inherits = require('util').inherits;

function CountObserver(o, fn, s) {
this._o = o;
this._fn = fn;
this._s = s;
this._i = 0;
this._c = 0;
AbstractObserver.call(this);
}

inherits(CountObserver, AbstractObserver);

CountObserver.prototype.next = function (x) {
if (this._fn) {
var result = tryCatch(this._fn)(x, this._i++, this._s);
if (result === global.Rx.errorObj) { return this._o.onError(result.e); }
Boolean(result) && (this._c++);
} else {
this._c++;
}
};
CountObserver.prototype.error = function (e) { this._o.onError(e); };
CountObserver.prototype.completed = function () {
this._o.onNext(this._c);
this._o.onCompleted();
};

function CountObservable(source, fn) {
this.source = source;
this._fn = fn;
ObservableBase.call(this);
}

inherits(CountObservable, ObservableBase);

CountObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new CountObserver(o, this._fn, this.source));
};

module.exports = function count (source, predicate, thisArg) {
var fn = bindCallback(predicate, thisArg, 3);
return new CountObservable(source, fn);
};
Loading

0 comments on commit 52f153f

Please sign in to comment.