forked from lichess-org/lila
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBoardApiHookStream.scala
61 lines (47 loc) · 1.53 KB
/
BoardApiHookStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package lila.lobby
import akka.actor._
import akka.stream.scaladsl._
import play.api.libs.json._
import scala.concurrent.duration._
import lila.common.Bus
final class BoardApiHookStream(
trouper: LobbyTrouper
)(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem) {
private case object SetOnline
private val blueprint =
Source.queue[Option[JsObject]](16, akka.stream.OverflowStrategy.dropHead)
def apply(hook: Hook): Source[Option[JsObject], _] =
blueprint mapMaterializedValue { queue =>
val actor = system.actorOf(Props(mkActor(hook, queue)))
queue.watchCompletion().foreach { _ =>
actor ! PoisonPill
}
}
private def mkActor(hook: Hook, queue: SourceQueueWithComplete[Option[JsObject]]) =
new Actor {
val classifiers = List(s"hookRemove:${hook.id}")
override def preStart(): Unit = {
super.preStart()
Bus.subscribe(self, classifiers)
trouper ! actorApi.AddHook(hook)
}
override def postStop() = {
super.postStop()
Bus.unsubscribe(self, classifiers)
trouper ! actorApi.CancelHook(hook.sri)
queue.complete()
}
self ! SetOnline
def receive = {
case actorApi.RemoveHook(_) => self ! PoisonPill
case SetOnline =>
context.system.scheduler
.scheduleOnce(3 second) {
// gotta send a message to check if the client has disconnected
queue offer None
self ! SetOnline
}
.unit
}
}
}