Skip to content

Commit

Permalink
Factor socket.io into stream package on the server as well. Much clea…
Browse files Browse the repository at this point in the history
…ner now.
  • Loading branch information
Nick Martin committed Dec 13, 2011
1 parent 46effaa commit 918d15a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 46 deletions.
33 changes: 1 addition & 32 deletions app/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ var connect = require('connect');
var gzip = require('connect-gzip');
var argv = require('optimist').argv;
var mime = require('mime');
var socketio = require('socket.io');
var handlebars = require('handlebars');
var useragent = require('useragent');

Expand Down Expand Up @@ -67,26 +66,13 @@ var run = function (bundle_dir) {
res.end();
});

// socket.io setup
var io = socketio.listen(app);
io.configure(function() {
// Don't serve static files from socket.io. We serve them separately
// to get gzip and other fun things.
io.set('browser client', false);

io.set('log level', 1);
// XXX disable websockets! they break chrome both debugging
// and node-http-proxy (used in outer app)
io.set('transports', _.without(io.transports(), 'websocket'));
});

// read bundle config file
var info_raw =
fs.readFileSync(path.join(bundle_dir, 'app.json'), 'utf8');
var info = JSON.parse(info_raw);

// start up app
__skybreak_bootstrap__ = {require: require, startup_hooks: []};
__skybreak_bootstrap__ = {require: require, startup_hooks: [], app: app};
Fiber(function () {
// (put in a fiber to let Sky.db operations happen during loading)

Expand All @@ -111,23 +97,6 @@ var run = function (bundle_dir) {
require('vm').runInThisContext(code, filename, true);
});

// connect socket.io to skybreak server libraries
io.sockets.on('connection', function (socket) {
__skybreak_bootstrap__.register_socket(socket);

socket.on('subscribe', function (data) {
__skybreak_bootstrap__.register_subscription(socket, data);
});
socket.on('unsubscribe', function (data) {
__skybreak_bootstrap__.unregister_subscription(socket, data);
});

socket.on('handle', function (data) {
__skybreak_bootstrap__.run_handler(socket, data,
io.sockets.sockets);
});
});

// run the user startup hooks.
_.each(__skybreak_bootstrap__.startup_hooks, function (x) { x(); });

Expand Down
43 changes: 29 additions & 14 deletions packages/livedata/livedata_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,31 +220,20 @@ Sky.Collection = function (name) {
return ret;
};

__skybreak_bootstrap__.register_socket = function (socket) {
socket.sky = {};
socket.sky.subs = [];
socket.sky.cache = {};

// 5/sec updates tops, once every 10sec min.
socket.sky.throttled_poll = _.throttle(function () {
Sky._poll_subscriptions(socket)
}, 50); // XXX only 50ms! for great speed. might want higher in prod.
socket.sky.timer = setInterval(socket.sky.throttled_poll, 10000);
};

__skybreak_bootstrap__.register_subscription = function (socket, data) {
var register_subscription = function (socket, data) {
socket.sky.subs.push(data);
Sky._poll_subscriptions(socket);
};

__skybreak_bootstrap__.unregister_subscription = function (socket, data) {
var unregister_subscription = function (socket, data) {
socket.sky.subs = _.filter(socket.sky.subs, function (x) {
return x._id !== data._id;
});
Sky._poll_subscriptions(socket);
};

__skybreak_bootstrap__.run_handler = function (socket, data, other_sockets) {
var run_handler = function (socket, data, other_sockets) {
// XXX note that running this in a fiber means that two serial
// requests from the client can try to execute in parallel.. we're
// going to have to think that through at some point. also, consider
Expand Down Expand Up @@ -287,3 +276,29 @@ __skybreak_bootstrap__.run_handler = function (socket, data, other_sockets) {

}).run();
};

Sky._stream.register(function (socket) {
socket.sky = {};
socket.sky.subs = [];
socket.sky.cache = {};


socket.on('subscribe', function (data) {
register_subscription(socket, data);
});

socket.on('unsubscribe', function (data) {
unregister_subscription(socket, data);
});

socket.on('handle', function (data) {
run_handler(socket, data, Sky._stream.all_sockets());
});

// 5/sec updates tops, once every 10sec min.
socket.sky.throttled_poll = _.throttle(function () {
Sky._poll_subscriptions(socket)
}, 50); // XXX only 50ms! for great speed. might want higher in prod.
socket.sky.timer = setInterval(socket.sky.throttled_poll, 10000);
});

50 changes: 50 additions & 0 deletions packages/stream/stream_server.js
Original file line number Diff line number Diff line change
@@ -1 +1,51 @@
if (typeof Sky === "undefined") Sky = {};

(function () {

////////// Internals //////////

var registration_callbacks = [];


// basic socketio setup
var socketio = __skybreak_bootstrap__.require('socket.io');

var io = socketio.listen(__skybreak_bootstrap__.app);
io.configure(function() {
// Don't serve static files from socket.io. We serve them separately
// to get gzip and other fun things.
io.set('browser client', false);

io.set('log level', 1);
// XXX disable websockets! they break chrome both debugging
// and node-http-proxy (used in outer app)
io.set('transports', _.without(io.transports(), 'websocket'));
});

// call all our callbacks when we get a new socket. they will do the
// work of setting up handlers and such for specific messages.
io.sockets.on('connection', function (socket) {
_.each(registration_callbacks, function (callback) {
callback(socket);
});
});

////////// API for other packages //////////

Sky._stream = {
// call my callback when a new socket connects.
// also call it for all current connections.
register: function (callback) {
registration_callbacks.push(callback);
_.each(io.sockets.sockets, function (socket) {
callback(socket);
});
},

// get a list of all sockets
all_sockets: function () {
return io.sockets.sockets;
}
};

})();

0 comments on commit 918d15a

Please sign in to comment.