Skip to content

Commit

Permalink
implemented error handling (with retry and exp backoff) when receivin…
Browse files Browse the repository at this point in the history
…g a response from the GCM servers
  • Loading branch information
pjay committed Dec 10, 2012
1 parent f984e9c commit eb8b53f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 35 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ This project is still under active development. Basic features are already worki

Lots:
* APIs for sending notifications.
* Error handling when receiving a response from the GCM servers.
* Analytics.
* Connecting to the APNS feedback service.
* Pool of GCM workers.
Expand Down
6 changes: 5 additions & 1 deletion app/models/DeviceToken.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import com.redis._
import com.redis.serialization._
import Parse.Implicits.parseLong

case class DeviceToken(appKey: String, value: String, lastRegistrationDate: Date)
case class DeviceToken(appKey: String, value: String, lastRegistrationDate: Date) {

def delete = DeviceToken.delete(appKey, value)

}

object DeviceToken extends RedisConnection {

Expand Down
135 changes: 103 additions & 32 deletions app/models/Push.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import play.api.libs.concurrent._
import play.api.libs.json._
import play.api.libs.ws._
import play.Logger
import akka.actor._
import akka.routing.RoundRobinRouter
import akka.util.Duration
Expand All @@ -14,11 +15,11 @@ case class SendIosBroadcast(app: App, payload: Map[String, Any]) extends PushMes
case class SendIosNotifications(deviceTokens: List[DeviceToken], payload: Map[String, Any]) extends PushMessage
case class StopIosWorkers(app: App) extends PushMessage
case class SendGcmBroadcast(app: App, payload: Map[String, Any]) extends PushMessage
case class SendGcmNotifications(app: App, registrations: List[Registration], payload: Map[String, Any]) extends PushMessage
case class SendGcmMessage(app: App, registrations: List[Registration], payload: Map[String, Any]) extends PushMessage

object Push {

private val system = ActorSystem("PushSystem")
val system = ActorSystem("PushSystem")
private val iosDispatcher = system.actorOf(Props(new IosDispatcher()))
private val gcmDispatcher = system.actorOf(Props(new GcmDispatcher()))

Expand All @@ -31,6 +32,9 @@ object Push {
def sendGcmBroadcast(app: App, payload: Map[String, Any]) =
gcmDispatcher ! SendGcmBroadcast(app, payload)

def sendGcmMessage(app: App, registrations: List[Registration], payload: Map[String, Any]) =
gcmDispatcher ! SendGcmMessage(app, registrations, payload)

}

class IosDispatcher extends Actor {
Expand Down Expand Up @@ -64,9 +68,13 @@ class GcmDispatcher extends Actor {

def receive = {
case SendGcmBroadcast(app, payload) => {
Registration.findAllByAppKey(app.key) grouped(maxMessagesByWorker) foreach { registrations =>
val registrations = Registration.findAllByAppKey(app.key)
self ! SendGcmMessage(app, registrations, payload)
}
case SendGcmMessage(app, registrations, payload) => {
registrations grouped(maxMessagesByWorker) foreach { registrations =>
val worker = context.actorOf(Props(new GcmDispatchWorker()))
worker ! SendGcmNotifications(app, registrations, payload)
worker ! SendGcmMessage(app, registrations, payload)
}
}
}
Expand Down Expand Up @@ -97,34 +105,37 @@ class IosDispatchWorker(app: App) extends Actor {
class GcmDispatchWorker extends Actor {

val apiEndpoint = "https://android.googleapis.com/gcm/send"
var backoffDelay = 500
val maxBackoffDelay = 60000
val startTime = System.currentTimeMillis
var finished = false

def receive = {
case SendGcmNotifications(app, registrations, payload) => {
case SendGcmMessage(app, registrations, payload) => {
val payloadWithRegistrations = payload.updated("registration_ids", registrations map (_.value))
// TODO: set a timeout
WS.url(apiEndpoint).withHeaders(
"Authorization" -> ("key=" + app.gcmApiKey.getOrElse("")),
"Content-Type" -> "application/json"
) post JsonUtil.toJson(payloadWithRegistrations) map { response =>
handleResponse(app, registrations, response)
handleResponse(app, registrations, payload, response)
} extend1 {
// TODO: retry mechanism based on the exception type (e.g. ConnectException)
case Thrown(throwable) => {
play.Logger.debug(throwable.getClass.getName)
Logger.debug(throwable.getClass.getName)
throwable.printStackTrace
val log = "An error occured while sending the GCM notifications - please contact the developers (error: " + throwable.getMessage() + ")"
Event.create(app.key, Event.Severity.ERROR, log)
context.stop(self)
if (finished) context.stop(self)
}
case Redeemed(value) => {
context.stop(self)
if (finished) context.stop(self)
}
}
}
}

def handleResponse(app: App, registrations: List[Registration], response: Response) = response.status match {
def handleResponse(app: App, registrations: List[Registration], payload: Map[String, Any], response: Response) = response.status match {
case 200 => {
// Success
val elapsed = (System.currentTimeMillis - startTime).toFloat / 1000
Expand All @@ -137,68 +148,112 @@ class GcmDispatchWorker extends Actor {
if (failure == 0 && canonicalIds == 0) {
val log = "Successfully delivered %d GCM messages in %.3f seconds".format(registrations.length, elapsed)
Event.create(app.key, Event.Severity.INFO, log)

// TODO: stats (including the delivery time)
// TODO: mark messages as sent (once the temporary messages are persistent)
finished = true
} else {
val log = "Delivered %d GCM messages (%d successes, %d failures) in %.3f seconds".format(registrations.length, success, failure, elapsed)
Event.create(app.key, Event.Severity.INFO, log)

(json \ "results").asOpt[List[Map[String, String]]] match {
case Some(results: List[Map[String, String]]) => {
val retryItems = processResultItems(app, registrations, results)
// TODO: retry items
processResultItems(app, registrations, results) match {
case retryRegistrations if retryRegistrations.length > 0 => Push.sendGcmMessage(app, retryRegistrations, payload)
case _ => finished = true
}
}
case _ => {
// TODO: handle error
Event.create(app.key, Event.Severity.ERROR, "Cannot parse response from GCM servers (%s) - please contact the developers")
finished = true
}
}
}
}
case 400 => {
// Invalid JSON format or invalid fields
// TODO
play.Logger.debug("HTTP 400")
Logger.error("HTTP Status Code 400 received from GCM server (%s)".format(response.body))
Event.create(app.key, Event.Severity.ERROR, "Invalid request (JSON format or invalid fields) - please contact the developers")
finished = true
}
case 401 => {
// Authentication error
// TODO
play.Logger.debug("HTTP 401")
Logger.error("HTTP Status Code 401 received from GCM server(%s)".format(response.body))
Event.create(app.key, Event.Severity.ERROR, "Authentication error - please check the GCM API Key")
finished = true
}
case r if r >= 500 && r < 600 => {
case status if status >= 500 && status < 600 => {
// Internal server error (500) or Server temporary unavailable (503)
// TODO
play.Logger.debug("HTTP 5xx")
response.header("Retry-After") map { retryAfterHeader =>
val Numeric = "\\A\\d+\\Z".r

retryAfterHeader match {
case Numeric => {
val secs = retryAfterHeader.toInt
val log = "GCM server returned HTTP status code %i with Retry-After - will retry in %i seconds".format(status, secs)
Event.create(app.key, Event.Severity.ERROR, log)
retryAfter(secs seconds, app, registrations, payload)
}
case _ => {
DateParser.parseDate(retryAfterHeader) match {
case Some(date) => {
val delta = date.getTime - System.currentTimeMillis
val log = "GCM server returned HTTP status code %i with Retry-After - will retry in %.1f seconds".format(status, delta)
Event.create(app.key, Event.Severity.ERROR, log)
retryAfter(delta milliseconds, app, registrations, payload)
}
case _ => {
val log = "GCM server returned HTTP status code %i with an unparsable Retry-After - will retry in %.1f seconds".format(status, backoffDelay.toFloat / 1000)
Event.create(app.key, Event.Severity.ERROR, log)
retryWithExponentialBackoff(app, registrations, payload)
}
}
}
}
} getOrElse {
// No Retry-After header
val log = "GCM server returned HTTP status code %i without Retry-After - will retry in %.1f seconds".format(status, backoffDelay.toFloat / 1000)
Event.create(app.key, Event.Severity.ERROR, log)
retryWithExponentialBackoff(app, registrations, payload)
}
}
case _ => {
// Unknown status code
// TODO
play.Logger.debug("HTTP " + response.status)
Event.create(app.key, Event.Severity.ERROR, "GCM server returned an unknown error - will retry in %.1f seconds".format(backoffDelay))
retryWithExponentialBackoff(app, registrations, payload)
}
}

def processResultItems(app: App, registrations: List[Registration], resultItems: List[Map[String, String]]) = {
((resultItems zip registrations) foldLeft List[Registration]()) { case (acc, (item, reg)) =>
if (item.get("message_id").isDefined) {
if (item.get("registration_id").isDefined) {
// TODO: check canonical registration ID and remove duplicate
Registration.create(app.key, item.get("registration_id").get) // No duplicates can be created
reg.delete
}
acc
// TODO: stats (including the delivery time)
} else {
item.get("error") match {
case Some("InvalidRegistration") => /* TODO: delete registration */ acc
case Some("InvalidRegistration") => reg.delete; acc
case Some("Unavailable") => reg :: acc
case Some("InternalServerError") => reg :: acc
case Some("NotRegistered") => /* TODO: delete registration */ acc
case Some("MismatchSenderId") => /* TODO: delete registration */ acc
case Some("MessageTooBig") => /* TODO: mark message as sent, log error */ acc
case Some("InvalidTtl") => /* TODO: mark message as sent, log error */ acc
case _ => /* TODO: log error */ acc
case Some("NotRegistered") => reg.delete; acc
case Some("MismatchSenderId") => reg.delete; acc
case Some("MessageTooBig") => Logger.warn("Received error from GCM (" + app.key + "): Payload is too big"); acc
case Some("InvalidTtl") => Logger.warn("Received error from GCM (" + app.key + "): invalid TTL"); acc
case Some(error) => Logger.warn("Received unknown error from GCM (" + app.key + "): " + error); acc
}
}
}
}

def retryAfter(delay: Duration, app: App, registrations: List[Registration], payload: Map[String, Any]) =
Push.system.scheduler.scheduleOnce(delay, self, SendGcmMessage(app, registrations, payload))

def retryWithExponentialBackoff(app: App, registrations: List[Registration], payload: Map[String, Any]) = {
retryAfter(backoffDelay milliseconds, app, registrations, payload)
backoffDelay = List(backoffDelay * 2, maxBackoffDelay).min
}

}

object JsonUtil {
Expand All @@ -216,4 +271,20 @@ object JsonUtil {
case _ => JsNull
}

}

// Partially taken from https://github.com/unfiltered/unfiltered/blob/master/library/src/main/scala/request/headers.scala
object DateParser {

import java.text.SimpleDateFormat
import java.util.{ Date, Locale }

def parseAs(fmt: String)(value: String): Option[Date] =
try { Some(new SimpleDateFormat(fmt, Locale.US).parse(value)) }
catch { case _ => None }

def RFC1123 = parseAs("EEE, dd MMM yyyy HH:mm:ss z")_

def parseDate(raw: String) = RFC1123(raw)

}
6 changes: 5 additions & 1 deletion app/models/Registration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import com.redis._
import com.redis.serialization._
import Parse.Implicits.parseLong

case class Registration(appKey: String, value: String, lastRegistrationDate: Date)
case class Registration(appKey: String, value: String, lastRegistrationDate: Date) {

def delete = Registration.delete(appKey, value)

}

object Registration extends RedisConnection {

Expand Down

0 comments on commit eb8b53f

Please sign in to comment.