Skip to content

Commit

Permalink
Add EventSource support
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Poitrey committed Feb 26, 2013
1 parent 5fce8d6 commit 6c5a5ca
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 65 deletions.
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Universal Mobile Push Daemon
Features
--------

- Multi protocols [APNs] \(iOS), C2DM/[GCM] \(Android), [MPNS] \(Windows Phone), [HTTP] POST
- Multi protocols [APNs] \(iOS), C2DM/[GCM] \(Android), [MPNS] \(Windows Phone), [HTTP] POST, [EventSource](#event-source)
- Pluggable protocols
- Register unlimited number of subscribers (device)
- Subscribe to unlimited number of events
Expand Down Expand Up @@ -108,6 +108,35 @@ Here we will send a message to all subscribers subscribed to the `sport` event:

$ curl -d msg=Test%20message http://localhost/event/sport

Event Source
------------

Pushd supports the [Event Source](http://www.w3.org/TR/eventsource/) protocol, also known as Server Sent Events. This allows your web application to benefits from the same pushed event than native apps.

This protocol is very different from other pushd supported protocol because it doesn't involve subsriber registration nor stored subscription. The web service connects to the pushd server and declars which event it is interested for, and pushd will push subscribed events in this same connections until the client stays connected.

You may want to use [Yaffle EventSource polyfill](https://github.com/Yaffle/EventSource) on the client side in order to support CORS requests with older browsers.

When Event Source is enabled, a new `/subscribe` API endpoint is available. You'll have to POST an `events` parameter with a list of events separated by commas:

> POST /subscribe HTTP/1.1
> Content-Type: application/x-www-form-urlencoded
> Accept: text/event-stream
>
> events=event1,event2,event3
>
---
< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Cache-Control: no-cache
< Access-Control-Allow-Origin: *
< Connection: close
<
... some time passes ...
< data: {"name": "event1", "title": {"default": "Title", "fr": "Titre"}, "message": {...}, "data": {"var1": "val1", "var2": "val2"}}
... some time passes ...
< data: {"name": "event2", "title": {"default": "Title", "fr": "Titre"}, "message": {...}, "data": {"var1": "val1", "var2": "val2"}}

API
---

Expand Down
Binary file modified doc/overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions lib/api.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ filterFields = (params) ->
fields[key] = val for own key, val of params when key in ['proto', 'token', 'lang', 'badge', 'version']
return fields

exports.setupRestApi = (app, createSubscriber, getEventFromId, authorize, testSubscriber) ->
exports.setupRestApi = (app, createSubscriber, getEventFromId, authorize, testSubscriber, eventPublisher) ->
authorize ?= (realm) ->

# subscriber registration
Expand Down Expand Up @@ -127,7 +127,7 @@ exports.setupRestApi = (app, createSubscriber, getEventFromId, authorize, testSu
# Publish an event
app.post '/event/:event_id', authorize('publish'), (req, res) ->
res.send 204
req.event.publish(req.body)
eventPublisher.publish(req.event, req.body)

# Delete an event
app.delete '/event/:event_id', authorize('publish'), (req, res) ->
Expand Down
54 changes: 12 additions & 42 deletions lib/event.coffee
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
async = require 'async'
Payload = require('./payload').Payload

class Event
OPTION_IGNORE_MESSAGE: 1
name_format: /^[a-zA-Z0-9:._-]{1,100}$/

constructor: (@redis, @pushservices, @name) ->
constructor: (@redis, @name) ->
throw new Error("Missing redis connection") if not redis?
throw new Error('Invalid event name ' + @name) if not Event::name_format.test @name
@key = "event:#{@name}"
Expand All @@ -28,47 +27,9 @@ class Event
else
cb(null)

publish: (data, cb) ->
try
payload = new Payload(data)
payload.event = @
catch e
# Invalid payload (empty, missing key or invalid key format)
cb(-1) if cb
return

exists: (cb) ->
@redis.sismember "events", @name, (err, exists) =>
if not exists
cb(0) if cb
return

try
# Do not compile templates before to know there's some subscribers for the event
# and do not start serving subscribers if payload won't compile
payload.compile()
catch e
# Invalid payload (templates doesn't compile)
cb(-1) if cb
return

@forEachSubscribers (subscriber, subOptions, done) =>
# action
@pushservices.push(subscriber, subOptions, payload, done)
, (totalSubscribers) =>
# finished
if totalSubscribers > 0
# update some event' stats
@redis.multi()
# account number of sent notification since event creation
.hincrby(@key, "total", 1)
# store last notification date for this event
.hset(@key, "last", Math.round(new Date().getTime() / 1000))
.exec =>
cb(totalSubscribers) if cb
else
# if there is no subscriber, cleanup the event
@delete =>
cb(0) if cb
cb(exists)

delete: (cb) ->
@forEachSubscribers (subscriber, subOptions, done) =>
Expand All @@ -84,6 +45,15 @@ class Event
.exec ->
cb() if cb

log: (cb) ->
@redis.multi()
# account number of sent notification since event creation
.hincrby(@key, "total", 1)
# store last notification date for this event
.hset(@key, "last", Math.round(new Date().getTime() / 1000))
.exec =>
cb() if cb

# Performs an action on each subscriber subsribed to this event
forEachSubscribers: (action, finished) ->
Subscriber = require('./subscriber').Subscriber
Expand Down
46 changes: 46 additions & 0 deletions lib/eventpublisher.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
events = require 'events'
Payload = require('./payload').Payload

class EventPublisher extends events.EventEmitter
constructor: (@pushServices) ->

publish: (event, data, cb) ->
try
payload = new Payload(data)
payload.event = event
catch e
# Invalid payload (empty, missing key or invalid key format)
cb(-1) if cb
return

@.emit(event.name, event, payload)

event.exists (exists) =>
if not exists
cb(0) if cb
return

try
# Do not compile templates before to know there's some subscribers for the event
# and do not start serving subscribers if payload won't compile
payload.compile()
catch e
# Invalid payload (templates doesn't compile)
cb(-1) if cb
return

event.forEachSubscribers (subscriber, subOptions, done) =>
# action
@pushServices.push(subscriber, subOptions, payload, done)
, (totalSubscribers) =>
# finished
if totalSubscribers > 0
# update some event' stats
event.log =>
cb(totalSubscribers) if cb
else
# if there is no subscriber, cleanup the event
event.delete =>
cb(0) if cb

exports.EventPublisher = EventPublisher
66 changes: 66 additions & 0 deletions lib/eventsource.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
policyFile = '<?xml version="1.0"?>' +
'<!DOCTYPE cross-domain-policy SYSTEM "http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd">' +
'<cross-domain-policy>' +
'<site-control permitted-cross-domain-policies="master-only"/>' +
'<allow-access-from domain="*" secure="false"/>' +
'<allow-http-request-headers-from domain="*" headers="Accept"/>' +
'</cross-domain-policy>'

exports.setup = (app, authorize, eventPublisher) ->
# In order to support access from flash apps
app.get '/crossdomain.xml', (req, res) ->
res.set 'Content-Type', 'application/xml'
res.send(policyFile)

app.options '/subscribe', authorize('listen'), (req, res) ->
res.set
'Content-Type': 'text/event-stream',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET'
'Access-Control-Max-Age': '86400'
res.end()

app.post '/subscribe', authorize('listen'), (req, res) ->
unless req.accepts('text/event-stream')
res.send 406
return

unless typeof req.body.events is 'string'
res.send 400
return

eventNames = req.body.events.split ','

req.socket.setTimeout(Infinity);
req.socket.setNoDelay(true);
res.set
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*',
'Connection': 'close'

if req.get('User-Agent')?.indexOf('MSIE') != -1
# Work around MSIE bug preventing Progress handler from behing thrown before first 2048 bytes
# See http://forums.adobe.com/message/478731
res.write new Array(2048).join('\n')

sendEvent = (event, payload) ->
data =
event: event.name
title: payload.title
message: payload.msg
data: payload.data

res.write("data: " + JSON.stringify(data) + "\n\n")

antiIdleInterval = setInterval ->
res.write "\n"
, 10000

res.socket.on 'close', =>
clearInterval antiIdleInterval
for eventName in eventNames
eventPublisher.removeListener eventName, sendEvent

for eventName in eventNames
eventPublisher.addListener eventName, sendEvent
4 changes: 2 additions & 2 deletions lib/subscriber.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class Subscriber
# check if some events have been rendered empty
emptyEvents = []
for eventName, i in events when results[4 + i + (i * 1) + 1] is 0
emptyEvents.push new Event(@redis, null, eventName)
emptyEvents.push new Event(@redis, eventName)

async.forEach emptyEvents, ((evt, done) => evt.delete(done)), =>
cb(results[1] is 1) if cb # true if deleted, false if did exist
Expand Down Expand Up @@ -180,7 +180,7 @@ class Subscriber
eventsWithOptions = results[1]
for eventName, i in eventsWithOptions by 2
subscriptions.push
event: new Event(@redis, null, eventName)
event: new Event(@redis, eventName)
options: parseInt(eventsWithOptions[i + 1], 10)
cb(subscriptions)
else
Expand Down
25 changes: 17 additions & 8 deletions pushd.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,30 @@ Netmask = require('netmask').Netmask
redis = require('redis').createClient()
settings = require './settings'
Subscriber = require('./lib/subscriber').Subscriber
EventPublisher = require('./lib/eventpublisher').EventPublisher
Event = require('./lib/event').Event
PushServices = require('./lib/pushservices').PushServices
Payload = require('./lib/payload').Payload
logger = console

createSubscriber = (fields, cb) ->
throw new Error("Invalid value for `proto'") unless service = pushservices.getService(fields.proto)
throw new Error("Invalid value for `proto'") unless service = pushServices.getService(fields.proto)
throw new Error("Invalid value for `token'") unless fields.token = service.validateToken(fields.token)
Subscriber::create(redis, fields, cb)

tokenResolver = (proto, token, cb) ->
Subscriber::getInstanceFromToken redis, proto, token, cb

pushservices = new PushServices()
eventSourceEnabled = no
pushServices = new PushServices()
for name, conf of settings when conf.enabled
logger.log "Registering push service: #{name}"
pushservices.addService(name, new conf.class(conf, logger, tokenResolver))
if name is 'event-source'
# special case for EventSource which isn't a pluggable push protocol
eventSourceEnabled = yes
else
pushServices.addService(name, new conf.class(conf, logger, tokenResolver))
eventPublisher = new EventPublisher(pushServices)

app = express()

Expand All @@ -42,10 +49,10 @@ app.param 'subscriber_id', (req, res, next, id) ->
res.json error: error.message, 400

getEventFromId = (id) ->
return new Event(redis, pushservices, id)
return new Event(redis, id)

testSubscriber = (subscriber) ->
pushservices.push(subscriber, null, new Payload({msg: "Test", "data.test": "ok"}))
pushServices.push(subscriber, null, new Payload({msg: "Test", "data.test": "ok"}))

app.param 'event_id', (req, res, next, id) ->
try
Expand All @@ -70,7 +77,9 @@ authorize = (realm) ->
else
return (req, res, next) -> next()

require('./lib/api').setupRestApi(app, createSubscriber, getEventFromId, authorize, testSubscriber)
require('./lib/api').setupRestApi(app, createSubscriber, getEventFromId, authorize, testSubscriber, eventPublisher)
if eventSourceEnabled
require('./lib/eventsource').setup(app, authorize, eventPublisher)

port = settings?.server?.tcp_port ? 80
app.listen port
Expand All @@ -96,10 +105,10 @@ udpApi.on 'message', (msg, rinfo) ->
status = 404
if m = req.pathname?.match(event_route)
try
event = new Event(redis, pushservices, m[1])
event = new Event(redis, m[1])
status = 204
switch method
when 'POST' then event.publish(req.query)
when 'POST' then eventPublisher.publish(event, req.query)
when 'DELETE' then event.delete()
else status = 404
catch error
Expand Down
3 changes: 3 additions & 0 deletions settings-sample.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ exports.server =
# restrict publish access to private networks
publish: ['127.0.0.1', '10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']

exports['event-source'] =
enabled: yes

exports['apns'] =
enabled: yes
class: require('./lib/pushservices/apns').PushServiceAPNS
Expand Down
Loading

0 comments on commit 6c5a5ca

Please sign in to comment.