Skip to content

Commit

Permalink
Merge branch 'fishnet' into ios-push-rm-cursor-fishnet
Browse files Browse the repository at this point in the history
* fishnet:
  fix duplicated logging on dev
  fishnet: proper transactional move handling
  use shared transactional memory for fishnet moves
  fishnet: in-memory move database for greater performances
  fishnet: request AI move on socket connection
  fishnet: average analysis evaluation stats before kamon
  show analysis in progress on analysis page
  fishnet: monitor acquirement
  improve wsmonitor
  fishnet: refine types
  fishnet: only monitor client play movetime on level 8
  fishnet: stop monitoring offline clients
  only monitor fishnet on stage [REVERT ME]
  more fishnet monitoring improvement
  don't block the future sequencer!
  {fishnet} parse only what's required for the current request
  monitor fishnet analysis engine hash size and threads
  • Loading branch information
ornicar committed Mar 16, 2016
2 parents 3b68709 + 77a7362 commit 4b1c500
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 109 deletions.
12 changes: 0 additions & 12 deletions conf/application-logger.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="play" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
<logger name="reactivemongo" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
<logger name="application" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
<logger name="lila" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
<logger name="boot" level="INFO">
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${application.home}/logs/boot.log</file>
Expand Down
1 change: 0 additions & 1 deletion conf/base.conf
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ history {
}
fishnet {
collection {
move = fishnet_move
analysis = fishnet_analysis
client = fishnet_client
}
Expand Down
4 changes: 2 additions & 2 deletions modules/fishnet/src/main/Analyser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lila.game.{ Game, GameRepo, UciMemo }
final class Analyser(
repo: FishnetRepo,
uciMemo: UciMemo,
sequencer: Sequencer,
sequencer: lila.hub.FutureSequencer,
limiter: Limiter) {

val maxPlies = 200
Expand All @@ -18,7 +18,7 @@ final class Analyser(
limiter(sender) flatMap { accepted =>
accepted ?? {
makeWork(game, sender) flatMap { work =>
sequencer.analysis {
sequencer {
repo getSimilarAnalysis work flatMap {
// already in progress, do nothing
case Some(similar) if similar.isAcquired => funit
Expand Down
21 changes: 10 additions & 11 deletions modules/fishnet/src/main/Cleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lila.db.Implicits._

private final class Cleaner(
repo: FishnetRepo,
moveColl: Coll,
moveDb: MoveDB,
analysisColl: Coll,
monitor: Monitor,
scheduler: lila.common.Scheduler) {
Expand All @@ -22,16 +22,15 @@ private final class Cleaner(

private def durationAgo(d: FiniteDuration) = DateTime.now.minusSeconds(d.toSeconds.toInt)

private def cleanMoves: Funit = moveColl.find(BSONDocument(
"acquired.date" -> BSONDocument("$lt" -> durationAgo(moveTimeout))
)).sort(BSONDocument("acquired.date" -> 1)).cursor[Work.Move]().collect[List](100).flatMap {
_.map { move =>
repo.updateOrGiveUpMove(move.timeout) >>- {
clientTimeout(move)
log.warn(s"Timeout move ${move.game.id}")
}
}.sequenceFu.void
} andThenAnyway scheduleMoves
private def cleanMoves: Unit = {
val since = durationAgo(moveTimeout)
moveDb.find(_ acquiredBefore since).map { move =>
moveDb updateOrGiveUp move
clientTimeout(move)
log.warn(s"Timeout move ${move.game.id}")
}
scheduleMoves
}

private def cleanAnalysis: Funit = analysisColl.find(BSONDocument(
"acquired.date" -> BSONDocument("$lt" -> durationAgo(analysisTimeoutBase))
Expand Down
28 changes: 11 additions & 17 deletions modules/fishnet/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,26 @@ final class Env(
private val ActorName = config getString "actor.name"
private val OfflineMode = config getBoolean "offline_mode"

private val moveColl = db(config getString "collection.move")
private val analysisColl = db(config getString "collection.analysis")
private val clientColl = db(config getString "collection.client")

private val repo = new FishnetRepo(
moveColl = moveColl,
analysisColl = analysisColl,
clientColl = clientColl)

private val sequencer = new Sequencer(
move = new lila.hub.FutureSequencer(
system = system,
receiveTimeout = None,
executionTimeout = Some(200 millis)),
analysis = new lila.hub.FutureSequencer(
system = system,
receiveTimeout = None,
executionTimeout = Some(500 millis)))
private val moveDb = new MoveDB

private val monitor = new Monitor(repo, sequencer, scheduler)
private val sequencer = new lila.hub.FutureSequencer(
system = system,
receiveTimeout = None,
executionTimeout = Some(500 millis))

private val monitor = new Monitor(moveDb, repo, sequencer, scheduler)

val api = new FishnetApi(
hub = hub,
repo = repo,
moveColl = moveColl,
moveDb = moveDb,
analysisColl = analysisColl,
clientColl = clientColl,
sequencer = sequencer,
Expand All @@ -51,9 +46,8 @@ final class Env(
offlineMode = OfflineMode)

val player = new Player(
repo = repo,
uciMemo = uciMemo,
sequencer = sequencer)
moveDb = moveDb,
uciMemo = uciMemo)

val analyser = new Analyser(
repo = repo,
Expand All @@ -65,7 +59,7 @@ final class Env(

private val cleaner = new Cleaner(
repo = repo,
moveColl = moveColl,
moveDb = moveDb,
analysisColl = analysisColl,
monitor = monitor,
scheduler = scheduler)
Expand Down
57 changes: 29 additions & 28 deletions modules/fishnet/src/main/FishnetApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import lila.hub.{ actorApi => hubApi }
final class FishnetApi(
hub: lila.hub.Env,
repo: FishnetRepo,
moveColl: Coll,
moveDb: MoveDB,
analysisColl: Coll,
clientColl: Coll,
sequencer: Sequencer,
sequencer: lila.hub.FutureSequencer,
monitor: Monitor,
saveAnalysis: lila.analyse.Analysis => Funit,
offlineMode: Boolean) {
Expand All @@ -35,17 +35,17 @@ final class FishnetApi(
case Skill.All => acquireMove(client) orElse acquireAnalysis(client)
}) >>- monitor.acquire(client)

private def acquireMove(client: Client): Fu[Option[JsonApi.Work]] = sequencer.move {
moveColl.find(BSONDocument(
"acquired" -> BSONDocument("$exists" -> false)
)).sort(BSONDocument("createdAt" -> 1)).one[Work.Move].flatMap {
_ ?? { work =>
repo.updateMove(work assignTo client) inject work.some
}
}
} map { _ map JsonApi.fromWork }
private def acquireMove(client: Client): Fu[Option[JsonApi.Work]] = fuccess {
moveDb.transaction { implicit tnx =>
moveDb.find(_.nonAcquired).toList.sortBy(_.createdAt).headOption
.map(_ assignTo client) ?? { work =>
moveDb.update(work)
work.some
}
} map JsonApi.fromWork
}

private def acquireAnalysis(client: Client): Fu[Option[JsonApi.Work]] = sequencer.analysis {
private def acquireAnalysis(client: Client): Fu[Option[JsonApi.Work]] = sequencer {
analysisColl.find(BSONDocument(
"acquired" -> BSONDocument("$exists" -> false)
)).sort(BSONDocument(
Expand All @@ -58,25 +58,26 @@ final class FishnetApi(
}
} map { _ map JsonApi.fromWork }

def postMove(workId: Work.Id, client: Client, data: JsonApi.Request.PostMove): Funit = sequencer.move {
repo.getMove(workId).map(_.filter(_ isAcquiredBy client)) flatMap {
case None =>
log.warn(s"Received unknown or unacquired move $workId by ${client.fullId}")
funit
case Some(work) => data.move.uci match {
case Some(uci) =>
monitor.move(work, client)
hub.actor.roundMap ! hubApi.map.Tell(work.game.id, hubApi.round.FishnetPlay(uci, work.currentFen))
repo.deleteMove(work)
case _ =>
monitor.failure(work, client)
log.warn(s"Received invalid move ${data.move} by ${client.fullId}")
repo.updateOrGiveUpMove(work.invalid)
def postMove(workId: Work.Id, client: Client, data: JsonApi.Request.PostMove): Funit = fuccess {
moveDb.transaction { implicit txn =>
moveDb.get(workId).filter(_ isAcquiredBy client) match {
case None =>
log.warn(s"Received unknown or unacquired move $workId by ${client.fullId}")
case Some(work) => data.move.uci match {
case Some(uci) =>
monitor.move(work, client)
hub.actor.roundMap ! hubApi.map.Tell(work.game.id, hubApi.round.FishnetPlay(uci, work.currentFen))
moveDb.delete(work)
case _ =>
monitor.failure(work, client)
log.warn(s"Received invalid move ${data.move} by ${client.fullId}")
moveDb.updateOrGiveUp(work.invalid)
}
}
}
}

def postAnalysis(workId: Work.Id, client: Client, data: JsonApi.Request.PostAnalysis): Funit = sequencer.analysis {
def postAnalysis(workId: Work.Id, client: Client, data: JsonApi.Request.PostAnalysis): Funit = sequencer {
repo.getAnalysis(workId).map(_.filter(_ isAcquiredBy client)) flatMap {
case None =>
log.warn(s"Received unknown or unacquired analysis $workId by ${client.fullId}")
Expand All @@ -93,7 +94,7 @@ final class FishnetApi(
}
} flatMap { _ ?? saveAnalysis }

def abort(workId: Work.Id, client: Client): Funit = sequencer.analysis {
def abort(workId: Work.Id, client: Client): Funit = sequencer {
repo.getAnalysis(workId).map(_.filter(_ isAcquiredBy client)) flatMap {
_ ?? { work =>
monitor.abort(work, client)
Expand Down
16 changes: 0 additions & 16 deletions modules/fishnet/src/main/FishnetRepo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import lila.db.BSON.BSONJodaDateTimeHandler
import lila.db.Implicits._

private final class FishnetRepo(
moveColl: Coll,
analysisColl: Coll,
clientColl: Coll) {

Expand All @@ -28,16 +27,6 @@ private final class FishnetRepo(
"instance.seenAt" -> BSONDocument("$gt" -> DateTime.now.minusMinutes(15))
)).cursor[Client]().collect[List]()

def addMove(move: Work.Move) = moveColl.insert(move).void
def getMove(id: Work.Id) = moveColl.find(selectWork(id)).one[Work.Move]
def updateMove(move: Work.Move) = moveColl.update(selectWork(move.id), move).void
def deleteMove(move: Work.Move) = moveColl.remove(selectWork(move.id)).void
def giveUpMove(move: Work.Move) = deleteMove(move) >>- log.warn(s"Give up on move $move")
def updateOrGiveUpMove(move: Work.Move) = if (move.isOutOfTries) giveUpMove(move) else updateMove(move)
def countMove(acquired: Boolean) = moveColl.count(BSONDocument(
"acquired" -> BSONDocument("$exists" -> acquired)
).some)

def addAnalysis(ana: Work.Analysis) = analysisColl.insert(ana).void
def getAnalysis(id: Work.Id) = analysisColl.find(selectWork(id)).one[Work.Analysis]
def updateAnalysis(ana: Work.Analysis) = analysisColl.update(selectWork(ana.id), ana).void
Expand All @@ -48,11 +37,6 @@ private final class FishnetRepo(
"acquired" -> BSONDocument("$exists" -> acquired)
).some)

def similarMoveExists(work: Work.Move): Fu[Boolean] = moveColl.count(BSONDocument(
"game.id" -> work.game.id,
"currentFen" -> work.currentFen
).some) map (0 !=)

def getSimilarAnalysis(work: Work.Analysis): Fu[Option[Work.Analysis]] =
analysisColl.find(BSONDocument("game.id" -> work.game.id)).one[Work.Analysis]

Expand Down
13 changes: 7 additions & 6 deletions modules/fishnet/src/main/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import scala.concurrent.duration._
import lila.db.Implicits._

private final class Monitor(
moveDb: MoveDB,
repo: FishnetRepo,
sequencer: Sequencer,
sequencer: lila.hub.FutureSequencer,
scheduler: lila.common.Scheduler) {

private[fishnet] def acquire(client: Client) =
Expand Down Expand Up @@ -88,12 +89,12 @@ private final class Monitor(
import lila.mon.fishnet.work._
import Client.Skill._

sequencer.move.withQueueSize(lila.mon.fishnet.queue.sequencer(Move.key)(_))
sequencer.analysis.withQueueSize(lila.mon.fishnet.queue.sequencer(Analysis.key)(_))
sequencer.withQueueSize(lila.mon.fishnet.queue.sequencer(Analysis.key)(_))

repo.countMove(acquired = false).map { queued(Move.key)(_) } >>
repo.countMove(acquired = true).map { acquired(Move.key)(_) } >>
repo.countAnalysis(acquired = false).map { queued(Analysis.key)(_) } >>
queued(Move.key)(moveDb.count(_.nonAcquired))
acquired(Move.key)(moveDb.count(_.isAcquired))

repo.countAnalysis(acquired = false).map { queued(Analysis.key)(_) } >>
repo.countAnalysis(acquired = true).map { acquired(Analysis.key)(_) }

} andThenAnyway scheduleWork
Expand Down
47 changes: 47 additions & 0 deletions modules/fishnet/src/main/MoveDB.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package lila.fishnet

import scala.concurrent.stm._

private final class MoveDB {

import Work.Move

private val maxSize = 1000

type CollType = TMap[Work.Id, Move]

private val coll: CollType = TMap.empty

def get = coll.single.get _

def add(move: Move)(implicit txn: InTxn): Unit =
if (coll.size < maxSize) coll.put(move.id, move)

def update(move: Move)(implicit tnx: InTxn): Unit =
if (coll.contains(move.id)) coll.put(move.id, move)

def delete(move: Move)(implicit tnx: InTxn): Unit =
coll.remove(move.id)

def transaction[A](f: InTxn => A): A = atomic { txn =>
f(txn)
}

def contains = coll.single.contains _

def exists = coll.single.values.exists _

def find = coll.single.values.filter _

def count = coll.single.values.count _

def size = coll.single.size

def updateOrGiveUp(move: Move) = transaction { implicit txn =>
if (move.isOutOfTries) {
log.warn(s"Give up on move $move")
delete(move)
}
else update(move)
}
}
13 changes: 5 additions & 8 deletions modules/fishnet/src/main/Player.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@ import chess.format.{ FEN, Forsyth }
import lila.game.{ Game, GameRepo, UciMemo }

final class Player(
repo: FishnetRepo,
uciMemo: UciMemo,
sequencer: Sequencer) {
moveDb: MoveDB,
uciMemo: UciMemo) {

val maxPlies = 300

def apply(game: Game): Funit = game.aiLevel ?? { level =>
makeWork(game, level) flatMap { move =>
sequencer.move {
repo similarMoveExists move flatMap {
_.fold(funit, repo addMove move)
}
makeWork(game, level) map { move =>
moveDb.transaction { implicit txn =>
if (!moveDb.exists(_ similar move)) moveDb.add(move)
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions modules/fishnet/src/main/Sequencer.scala

This file was deleted.

5 changes: 5 additions & 0 deletions modules/fishnet/src/main/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ sealed trait Work {
def acquiredByKey = acquired.map(_.clientKey)
def isAcquiredBy(client: Client) = acquiredByKey contains client.key
def isAcquired = acquired.isDefined
def nonAcquired = !isAcquired

def acquiredBefore(date: DateTime) = acquiredAt.??(_ isBefore date)
}

object Work {
Expand Down Expand Up @@ -70,6 +73,8 @@ object Work {

def isOutOfTries = tries >= 3

def similar(to: Move) = game.id == to.game.id && currentFen == to.currentFen

override def toString = s"id:$id game:${game.id} level:$level tries:$tries currentFen:$currentFen acquired:$acquired"
}

Expand Down
Loading

0 comments on commit 4b1c500

Please sign in to comment.