Skip to content

Commit

Permalink
feat(ws-retries): add support for ws re-connection in ApiStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Raphael Benitte committed Apr 11, 2016
1 parent 2c9ade6 commit e559b18
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 35 deletions.
176 changes: 141 additions & 35 deletions src/browser/stores/ApiStore.js
Original file line number Diff line number Diff line change
@@ -1,60 +1,166 @@
import Reflux from 'reflux';
import ApiActions from './../actions/ApiActions';
import ConfigStore from './ConfigStore';
import Reflux from 'reflux';
import ConfigStore from './ConfigStore';
import ApiActions from '../actions/ApiActions';
import ConfigActions from '../actions/ConfigActions';
import ConnectionStatusActions from '../actions/ConnectionStatusActions';
import NotificationsActions from '../actions/NotificationsActions';
import ConnectionStatus from '../components/ConnectionStatus.jsx';
import {
NOTIFICATION_STATUS_SUCCESS,
NOTIFICATION_STATUS_WARNING,
NOTIFICATION_STATUS_ERROR
} from './NotificationsStore';

const NOTIFICATION_ID = 'connection.status';

const CONNECTION_RETRY_DELAY_SECONDS = 15;
const CONNECTION_MAX_RETRIES = 10;
let retryCount = 0;

let reconnections = 0;

const buffer = [];
let ws = null;
let retryTimer;
let history = [];
let buffer = [];


const clearRetryTimer = () => {
if (retryTimer) {
clearTimeout(retryTimer);
retryTimer = null;
}
};


const connectWS = (config, store) => {
ConnectionStatusActions.connecting();
NotificationsActions.update(NOTIFICATION_ID, { status: NOTIFICATION_STATUS_WARNING });

let proto = 'ws';
if (config.useWssConnection === true) {
proto = 'wss';
}

let port = window.document.location.port;
if (config.wsPort !== undefined) {
port = config.wsPort;
}

let wsUrl = `${proto}://${window.document.location.hostname}`;
if (port && port !== '') {
wsUrl = `${wsUrl}:${port}`;
}

ws = new WebSocket(wsUrl);

ws.onopen = event => {
clearRetryTimer();

retryCount = 0;

ConnectionStatusActions.connected();
NotificationsActions.update(NOTIFICATION_ID, { status: NOTIFICATION_STATUS_SUCCESS });
NotificationsActions.close(NOTIFICATION_ID, 2000);

if (reconnections > 0) {
ConfigActions.loadConfig();
history.forEach(request => { ws.send(JSON.stringify(request)); });
} else {
buffer.forEach(request => { ws.send(JSON.stringify(request)); });
buffer = [];
}

reconnections++;
};

ws.onmessage = event => {
if (event.data !== '') {
store.trigger(JSON.parse(event.data));
}
};

ws.onclose = event => {
ws = null;

clearRetryTimer();

if (retryCount === 0) {
NotificationsActions.notify({
id: NOTIFICATION_ID,
component: ConnectionStatus,
status: NOTIFICATION_STATUS_WARNING,
ttl: -1
});
} else if (retryCount === CONNECTION_MAX_RETRIES) {
ConnectionStatusActions.failed(retryCount);
NotificationsActions.update(NOTIFICATION_ID, { status: NOTIFICATION_STATUS_ERROR });
return;
}

ConnectionStatusActions.delaying(retryCount, CONNECTION_RETRY_DELAY_SECONDS);
NotificationsActions.update(NOTIFICATION_ID, { status: NOTIFICATION_STATUS_WARNING });

retryTimer = setTimeout(() => {
connectWS(config, store);
}, CONNECTION_RETRY_DELAY_SECONDS * 1000);

retryCount++;
};
};


const ApiStore = Reflux.createStore({
init() {
this.listenTo(ConfigStore, this.initWs);
},

initWs(config) {
let proto = 'ws';
if (config.useWssConnection === true) {
proto = 'wss';
// only connect ws if it's not already connected, when connection is lost and we succeed in re-establishing it
// we reload configuration, so without this check we'll end in an infinite loop.
if (ws === null) {
connectWS(config, this);
}

let port = window.document.location.port;
if (config.wsPort !== undefined) {
port = config.wsPort;
}
this.listenTo(ApiActions.get, this.fetch);
},

let wsUrl = `${proto}://${window.document.location.hostname}`;
if (port && port !== '') {
wsUrl = `${wsUrl}:${port}`;
}
fetch(id, params = {}) {
const request = { id, params };

ws = new WebSocket(wsUrl);
ws.onmessage = event => {
if (event.data !== '') {
ApiStore.trigger(JSON.parse(event.data));
}
};
// keep track to use when re-connecting
history.push(request);

ws.onopen = () => {
buffer.forEach(request => {
ws.send(JSON.stringify(request));
});
};
this.listenTo(ApiActions.get, this.get);
},

get(id, params) {
// if websockets not ready, add request to buffer
if (ws === null || ws.readyState !== WebSocket.OPEN) {
buffer.push({
id: id,
params: params || {}
});

buffer.push(request);
return;
}

ws.send(JSON.stringify({
id: id,
params: params || {}
}));
},

getHistory() {
return history;
},

getBuffer() {
return buffer;
},

reset() {
clearRetryTimer();

history = [];
buffer = [];

if (ws !== null) {
ws.close();
ws = null;
}
}
});

Expand Down
164 changes: 164 additions & 0 deletions test/frontend/stores/ApiStore.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/* global describe it */
import _ from 'lodash';
import expect from 'expect';
import sinon from 'sinon';
import { expectTriggers } from '../../helpers/storeHelper';
import { getFakeTimerCount } from '../../helpers/timersHelper';


let clock;
let triggerSpy;
let ApiStore;
let wsStub;
let wsStubInstance;


describe('Mozaïk | ApiStore', () => {
beforeEach(() => {
clock = sinon.useFakeTimers();

ApiStore = require('../../../src/browser/stores/ApiStore').default;

triggerSpy = sinon.spy();
ApiStore.trigger = triggerSpy;

global.window = {
document: {
location: {
port: '',
hostname: 'test.com'
}
}
};

wsStub = sinon.stub();
wsStubInstance = {
close() {},
send: sinon.spy()
};
wsStub.returns(wsStubInstance);
global.WebSocket = wsStub;

ApiStore.reset();
});

afterEach(() => {
clock.restore();
delete global.window;
delete global.WebSocket;
});

describe('initWs()', () => {
it('should create a new ws connection', () => {
ApiStore.initWs({});

expect(wsStub.calledOnce).toEqual(true);
expect(wsStub.getCall(0).args[0]).toEqual('ws://test.com');
});

it(`should create a new wss connection if 'useWssConnection' is true`, () => {
ApiStore.initWs({ useWssConnection: true });

expect(wsStub.calledOnce).toEqual(true);
expect(wsStub.getCall(0).args[0]).toEqual('wss://test.com');
});

it(`should create a new ws on custom port if 'wsPort' defined`, () => {
ApiStore.initWs({ wsPort: 2000 });

expect(wsStub.calledOnce).toEqual(true);
expect(wsStub.getCall(0).args[0]).toEqual('ws://test.com:2000');
});

it (`should not create a new ws if there's already one created`, () => {
ApiStore.initWs({});
ApiStore.initWs({});

expect(wsStub.calledOnce).toEqual(true);
});
});

describe('on ws message', () => {
it('should trigger received data', () => {
ApiStore.initWs({});

const data = { foo: 'bar' };

wsStubInstance.onmessage({ data: JSON.stringify(data) });

expectTriggers(triggerSpy, [data]);
});

it('should not trigger if data is an empty string', () => {
ApiStore.initWs({});

wsStubInstance.onmessage({ data: '' });

expect(triggerSpy.called).toEqual(false);
});
});

describe('fetch', () => {
it('should send request', () => {
ApiStore.initWs({});
ApiStore.fetch('foo');

expect(wsStubInstance.send.calledOnce).toEqual(true);
expect(wsStubInstance.send.getCall(0).args[0]).toEqual(JSON.stringify({
id: 'foo',
params: {}
}));
});

it('should add request to history', () => {
ApiStore.initWs({});
ApiStore.fetch('foo');

expect(ApiStore.getHistory()).toEqual([{
id: 'foo',
params: {}
}]);
});

it('should add request to buffer if ws is null', () => {
ApiStore.fetch('foo');

expect(ApiStore.getBuffer()).toEqual([{
id: 'foo',
params: {}
}]);
});

it('should add request to buffer if ws is not ready', () => {
ApiStore.initWs({});
wsStubInstance.readyState = 'not_ready';
ApiStore.fetch('foo');

expect(ApiStore.getBuffer()).toEqual([{
id: 'foo',
params: {}
}]);
});

it('should not add request to buffer if ws not null and ready', () => {
ApiStore.initWs({});
ApiStore.fetch('foo');

expect(ApiStore.getBuffer()).toEqual([]);
});
});

describe('on ws close', () => {
it('should try to reconnect 10 times each 15 seconds', () => {
ApiStore.initWs({});

for (let i = 0; i < 12; i++) {
clock.tick(15000);
wsStubInstance.onclose();
}

// 12th call ignored
expect(wsStub.callCount).toEqual(11);
});
});
});

0 comments on commit e559b18

Please sign in to comment.