forked from lichess-org/lila
-
Notifications
You must be signed in to change notification settings - Fork 0
/
GameStateStream.scala
145 lines (125 loc) · 4.54 KB
/
GameStateStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package lila.bot
import akka.actor._
import akka.stream.scaladsl._
import play.api.i18n.Lang
import play.api.libs.json._
import scala.concurrent.duration._
import lila.chat.Chat
import lila.chat.UserLine
import lila.common.Bus
import lila.game.actorApi.{
AbortedBy,
BoardDrawOffer,
BoardTakeback,
BoardTakebackOffer,
FinishGame,
MoveGameEvent
}
import lila.game.{ Game, Pov }
import lila.hub.actorApi.map.Tell
import lila.round.actorApi.BotConnected
import lila.round.actorApi.round.QuietFlag
final class GameStateStream(
onlineApiUsers: OnlineApiUsers,
jsonView: BotJsonView
)(implicit
ec: scala.concurrent.ExecutionContext,
system: ActorSystem
) {
import GameStateStream._
private val blueprint =
Source.queue[Option[JsObject]](32, akka.stream.OverflowStrategy.dropHead)
def apply(init: Game.WithInitialFen, as: chess.Color, u: lila.user.User)(implicit
lang: Lang
): Source[Option[JsObject], _] = {
// terminate previous one if any
Bus.publish(PoisonPill, uniqChan(init.game pov as))
blueprint mapMaterializedValue { queue =>
val actor = system.actorOf(
Props(mkActor(init, as, User(u.id, u.isBot), queue)),
name = s"GameStateStream:${init.game.id}:${lila.common.ThreadLocalRandom nextString 8}"
)
queue.watchCompletion().foreach { _ =>
actor ! PoisonPill
}
}
}
private def uniqChan(pov: Pov) = s"gameStreamFor:${pov.fullId}"
private def mkActor(
init: Game.WithInitialFen,
as: chess.Color,
user: User,
queue: SourceQueueWithComplete[Option[JsObject]]
)(implicit lang: Lang) =
new Actor {
val id = init.game.id
var gameOver = false
private val classifiers = List(
MoveGameEvent makeChan id,
BoardDrawOffer makeChan id,
BoardTakeback makeChan id,
"finishGame",
"abortGame",
uniqChan(init.game pov as),
Chat chanOf Chat.Id(id)
) :::
user.isBot.option(Chat chanOf Chat.Id(s"$id/w")).toList
override def preStart(): Unit = {
super.preStart()
Bus.subscribe(self, classifiers)
jsonView gameFull init foreach { json =>
// prepend the full game JSON at the start of the stream
queue offer json.some
// close stream if game is over
if (init.game.finished) onGameOver(none)
else self ! SetOnline
}
lila.mon.bot.gameStream("start").increment()
Bus.publish(Tell(init.game.id, BotConnected(as, v = true)), "roundSocket")
}
override def postStop(): Unit = {
super.postStop()
Bus.unsubscribe(self, classifiers)
// hang around if game is over
// so the opponent has a chance to rematch
context.system.scheduler.scheduleOnce(if (gameOver) 10 second else 1 second) {
Bus.publish(Tell(init.game.id, BotConnected(as, v = false)), "roundSocket")
}
queue.complete()
lila.mon.bot.gameStream("stop").increment().unit
}
def receive = {
case MoveGameEvent(g, _, _) if g.id == id && !g.finished => pushState(g).unit
case lila.chat.actorApi.ChatLine(chatId, UserLine(username, _, _, text, false, false)) =>
pushChatLine(username, text, chatId.value.lengthIs == Game.gameIdSize).unit
case FinishGame(g, _, _) if g.id == id => onGameOver(g.some).unit
case AbortedBy(pov) if pov.gameId == id => onGameOver(pov.game.some).unit
case BoardDrawOffer(g) if g.id == id => pushState(g).unit
case BoardTakebackOffer(g) if g.id == id => pushState(g).unit
case BoardTakeback(g) if g.id == id => pushState(g).unit
case SetOnline =>
onlineApiUsers.setOnline(user.id)
context.system.scheduler
.scheduleOnce(6 second) {
// gotta send a message to check if the client has disconnected
queue offer None
self ! SetOnline
Bus.publish(Tell(id, QuietFlag), "roundSocket")
}
.unit
}
def pushState(g: Game): Funit =
jsonView gameState Game.WithInitialFen(g, init.fen) dmap some flatMap queue.offer void
def pushChatLine(username: String, text: String, player: Boolean): Funit =
queue offer jsonView.chatLine(username, text, player).some void
def onGameOver(g: Option[Game]) =
g ?? pushState >>- {
gameOver = true
self ! PoisonPill
}
}
}
private object GameStateStream {
private case object SetOnline
private case class User(id: lila.user.User.ID, isBot: Boolean)
}