Skip to content

Commit

Permalink
Adding delaySubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 17, 2015
1 parent 53b6fd3 commit 9cd9c35
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/modular/observable/delaysubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict';

var ObservableBase = require('./observablebase');
var SerialDisposable = require('../serialdisposable');
var isScheduler = require('../scheduler').isScheduler;
var inherits = require('util').inherits;

global.Rx || (global.Rx = {});
if (!global.Rx.defaultScheduler) {
require('../scheduler/defaultscheduler');
}

function DelaySubscription(source, dt, s) {
this.source = source;
this._dt = dt;
this._s = s;
ObservableBase.call(this);
}

inherits(DelaySubscription, ObservableBase);

function scheduleMethod(s, state) {
var source = state[0], o = state[1], d = state[2];
d.setDisposable(source.subscribe(o));
}

DelaySubscription.prototype.subscribeCore = function (o) {
var d = new SerialDisposable();
d.setDisposable(this._s.scheduleFuture([this.source, o, d], this._dt, scheduleMethod));
return d;
};

/**
* Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
* @param {Number} dueTime Relative or absolute time shift of the subscription.
* @param {Scheduler} [scheduler] Scheduler to run the subscription delay timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Time-shifted sequence.
*/
module.exports = function delaySubscription (source, dueTime, scheduler) {
isScheduler(scheduler) || (scheduler = global.Rx.defaultScheduler);
return new DelaySubscription(source, dueTime, scheduler);
};
285 changes: 285 additions & 0 deletions src/modular/test/delaysubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
'use strict';

'use strict';

var test = require('tape');
var Observable = require('../observable');
var TestScheduler = require('../testing/testscheduler');
var reactiveAssert = require('../testing/reactiveassert');
var ReactiveTest = require('../testing/reactivetest');
var onNext = ReactiveTest.onNext,
onError = ReactiveTest.onError,
onCompleted = ReactiveTest.onCompleted,
subscribe = ReactiveTest.subscribe;

Observable.addToPrototype({
delaySubscription: require('../observable/delaysubscription')
});

test('Observable#delaySubscription relative', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createColdObservable(
onNext(50, 42),
onNext(60, 43),
onCompleted(70)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(30, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(280, 42),
onNext(290, 43),
onCompleted(300)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(230, 300)
]);

t.end();
});

test('Observable#delaySubscription relative hot', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(20, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(220, 250)
]);

t.end();
});

test('Observable#delaySubscription relative hot misses all', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(200, scheduler);
});

reactiveAssert(t, results.messages, [
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(400, 1000)
]);

t.end();
});

test('Observable#delaySubscription relative hot cancel', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(20, scheduler);
}, { disposed: 210 });

reactiveAssert(t, results.messages, [
]);

reactiveAssert(t, xs.subscriptions, [
]);

t.end();
});

test('Observable#delaySubscription relative error', function (t) {
var error = new Error();

var scheduler = new TestScheduler();

var xs = scheduler.createColdObservable(
onNext(50, 42),
onNext(60, 43),
onError(70, error)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(30, scheduler);
});

reactiveAssert(t, results.messages, [
onNext(280, 42),
onNext(290, 43),
onError(300, error)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(230, 300)
]);

t.end();
});

test('Observable#delaySubscription absolute', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createColdObservable(
onNext(50, 42),
onNext(60, 43),
onCompleted(70)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(new Date(230), scheduler);
});

reactiveAssert(t, results.messages, [
onNext(280, 42),
onNext(290, 43),
onCompleted(300)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(230, 300)
]);

t.end();
});

test('Observable#delaySubscription absolute hot', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(new Date(220), scheduler);
});

reactiveAssert(t, results.messages, [
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(220, 250)
]);

t.end();
});

test('Observable#delaySubscription relative hot misses all', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(new Date(400), scheduler);
});

reactiveAssert(t, results.messages, [
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(400, 1000)
]);

t.end();
});

test('Observable#delaySubscription relative hot cancel', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(new Date(220), scheduler);
}, { disposed: 210 });

reactiveAssert(t, results.messages, [
]);

reactiveAssert(t, xs.subscriptions, [
]);

t.end();
});

test('Observable#delaySubscription absolute error', function (t) {
var error = new Error();

var scheduler = new TestScheduler();

var xs = scheduler.createColdObservable(
onNext(50, 42),
onNext(60, 43),
onError(70, error)
);

var results = scheduler.startScheduler(function() {
return xs.delaySubscription(new Date(230), scheduler);
});

reactiveAssert(t, results.messages, [
onNext(280, 42),
onNext(290, 43),
onError(300, error)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(230, 300)
]);

t.end();
});

0 comments on commit 9cd9c35

Please sign in to comment.