Skip to content

Commit

Permalink
Adding skipLastWithTime
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 18, 2015
1 parent 15f1d2c commit c183061
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 0 deletions.
55 changes: 55 additions & 0 deletions src/modular/observable/skiplastwithtime.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';

var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var isScheduler = require('../scheduler').isScheduler;
var inherits = require('util').inherits;

global.Rx || (global.Rx = {});
if (!global.Rx.defaultScheduler) {
require('../scheduler/defaultscheduler');
}

function SkipLastWithTimeObserver(o, p) {
this._o = o;
this._s = p._s;
this._d = p._d;
this._q = [];
AbstractObserver.call(this);
}

inherits(SkipLastWithTimeObserver, AbstractObserver);

SkipLastWithTimeObserver.prototype.next = function (x) {
var now = this._s.now();
this._q.push({ interval: now, value: x });
while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
this._o.onNext(this._q.shift().value);
}
};
SkipLastWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
SkipLastWithTimeObserver.prototype.completed = function () {
var now = this._s.now();
while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
this._o.onNext(this._q.shift().value);
}
this._o.onCompleted();
};

function SkipLastWithTimeObservable(source, d, s) {
this.source = source;
this._d = d;
this._s = s;
ObservableBase.call(this);
}

inherits(SkipLastWithTimeObservable, ObservableBase);

SkipLastWithTimeObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new SkipLastWithTimeObserver(o, this));
};

module.exports = function skipLastWithTime (source, duration, scheduler) {
isScheduler(scheduler) || (scheduler = global.Rx.defaultScheduler);
return new SkipLastWithTimeObservable(source, duration, scheduler);
};
199 changes: 199 additions & 0 deletions src/modular/test/skiplastwithtime.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
'use strict';

var test = require('tape');
var Observable = require('../observable');
var TestScheduler = require('../testing/testscheduler');
var reactiveAssert = require('../testing/reactiveassert');
var ReactiveTest = require('../testing/reactivetest');
var onNext = ReactiveTest.onNext,
onError = ReactiveTest.onError,
onCompleted = ReactiveTest.onCompleted,
subscribe = ReactiveTest.subscribe;

Observable.addToPrototype({
skipLastWithTime: require('../observable/skiplastwithtime')
});

test('Observable#skipLastWithTime zero 1', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(210, 1),
onNext(220, 2),
onCompleted(230)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(0, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(210, 1),
onNext(220, 2),
onCompleted(230)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 230)
]);

t.end();
});

test('Observable#skipLastWithTime zero 2', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(210, 1),
onNext(220, 2),
onNext(230, 3),
onCompleted(230)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(0, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(210, 1),
onNext(220, 2),
onNext(230, 3),
onCompleted(230)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 230)
]);

t.end();
});

test('Observable#skipLastWithTime some 1', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(210, 1),
onNext(220, 2),
onNext(230, 3),
onCompleted(230)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(15, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(230, 1),
onCompleted(230)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 230)
]);

t.end();
});

test('Observable#skipLastWithTime some 2', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(210, 1),
onNext(220, 2),
onNext(230, 3),
onNext(240, 4),
onNext(250, 5),
onNext(260, 6),
onNext(270, 7),
onNext(280, 8),
onNext(290, 9),
onCompleted(300)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(45, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(260, 1),
onNext(270, 2),
onNext(280, 3),
onNext(290, 4),
onNext(300, 5),
onCompleted(300)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 300)
]);

t.end();
});

test('Observable#skipLastWithTime some all', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(210, 1),
onNext(220, 2),
onCompleted(230)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(45, scheduler);
});

reactiveAssert(t, results.messages, [
onCompleted(230)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 230)
]);

t.end();
});

test('Observable#skipLastWithTime error', function (t) {
var error = new Error();

var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onError(210, error)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(45, scheduler);
});

reactiveAssert(t, results.messages, [
onError(210, error)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 210)
]);

t.end();
});

test('Observable#skipLastWithTime never', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1)
);

var results = scheduler.startScheduler(function () {
return xs.skipLastWithTime(50, scheduler);
});

reactiveAssert(t, results.messages, []);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 1000)
]);

t.end();
});

0 comments on commit c183061

Please sign in to comment.