Skip to content

Commit

Permalink
schedule bulk challenges WIP - for lichess-org#8059
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Feb 1, 2021
1 parent 830daae commit 639a765
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 65 deletions.
19 changes: 10 additions & 9 deletions app/controllers/Challenge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ final class Challenge(
.fold(
newJsonFormError,
data =>
env.setup.bulk(data) map {
env.setup.bulk(data, me) flatMap {
case Left(badTokens) =>
import lila.setup.SetupBulk.BadToken
import play.api.libs.json._
Expand All @@ -329,15 +329,16 @@ final class Challenge(
}
}
)
)
).fuccess
case Right(bulk) =>
env.challenge.bulk(me, bulk).thenPp
Ok(Json.obj("games" -> bulk.games.map { g =>
Json.obj(
"gameId" -> g.id,
"userIds" -> Json.arr(g.white, g.black)
)
})) as JSON
env.challenge.bulk.schedule(bulk) inject {
Ok(Json.obj("games" -> bulk.games.map { g =>
Json.obj(
"gameId" -> g.id,
"userIds" -> Json.arr(g.white, g.black)
)
})) as JSON
}
}
)
}
Expand Down
1 change: 0 additions & 1 deletion modules/challenge/src/main/BSONHandlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ private object BSONHandlers {
{ case BSONInteger(v) => Status(v) toTry s"No such status: $v" },
x => BSONInteger(x.id)
)
implicit val ModeBSONHandler = BSONBooleanHandler.as[Mode](Mode.apply, _.rated)
implicit val RatingBSONHandler = new BSON[Rating] {
def reads(r: Reader) = Rating(r.int("i"), r.boolD("p"))
def writes(w: Writer, r: Rating) =
Expand Down
83 changes: 65 additions & 18 deletions modules/challenge/src/main/ChallengeBulk.scala
Original file line number Diff line number Diff line change
@@ -1,33 +1,80 @@
package lila.challenge

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import chess.variant.Variant
import chess.{ Clock, Mode, Situation, Speed }
import org.joda.time.DateTime
import scala.util.chaining._
import reactivemongo.api.bson.Macros
import scala.concurrent.duration._

import lila.common.Bus
import lila.common.LilaStream
import lila.db.dsl._
import lila.game.{ Game, Player }
import lila.hub.actorApi.map.TellMany
import lila.hub.DuctSequencers
import lila.rating.PerfType
import lila.setup.SetupBulk.ScheduledBulk
import lila.setup.SetupBulk.{ ScheduledBulk, ScheduledGame }
import lila.user.User

final class ChallengeBulkApi(
colls: ChallengeColls,
gameRepo: lila.game.GameRepo,
userRepo: lila.user.UserRepo,
onStart: lila.round.OnStart
)(implicit
ec: scala.concurrent.ExecutionContext,
mat: akka.stream.Materializer
mat: akka.stream.Materializer,
system: ActorSystem,
mode: play.api.Mode
) {

def apply(
by: User,
scheduled: ScheduledBulk
): Fu[Int] = {
val perfType = PerfType(scheduled.variant, Speed(scheduled.clock))
val startClock = scheduled.startClocksAt isBefore DateTime.now
Source(scheduled.games)
implicit private val gameHandler = Macros.handler[ScheduledGame]
implicit private val variantHandler = variantByKeyHandler
implicit private val clockHandler = clockConfigHandler
implicit private val bulkHandler = Macros.handler[ScheduledBulk]

private val coll = colls.bulk

private val workQueue =
new DuctSequencers(maxSize = 64, expiration = 1 minute, timeout = 10 seconds, name = "challenge.bulk")

def schedule(bulk: ScheduledBulk): Fu[Option[String]] = workQueue(bulk.by) {
if (bulk.pairAt.isBeforeNow) makePairings(bulk) inject none
else
coll.list[ScheduledBulk]($doc("by" -> bulk.by, "pairedAt" $exists false)) flatMap { bulks =>
val nbGames = bulks.map(_.games.size).sum
if (bulks.sizeIs >= 10) fuccess("Already too many bulks queued".some)
else if (bulks.map(_.games.size).sum >= 1000) fuccess("Already too many games queued".some)
else if (bulks.exists(_ collidesWith bulk))
fuccess("A bulk containing the same players is scheduled at the same time".some)
else coll.insert.one(bulk) inject none
}
}

private[challenge] def tick: Funit =
checkForPairing >> checkForClocks

private def checkForPairing: Funit =
coll.one[ScheduledBulk]($doc("pairAt" $lte DateTime.now, "pairedAt" $exists false)) flatMap {
_ ?? makePairings
}

private def checkForClocks: Funit =
coll.one[ScheduledBulk]($doc("startClocksAt" $lte DateTime.now, "pairedAt" $exists true)) flatMap {
_ ?? startClocks
}

private def startClocks(bulk: ScheduledBulk): Funit = workQueue(bulk.by) {
fuccess {
Bus.publish(TellMany(bulk.games.map(_.id), lila.round.actorApi.round.StartClock), "roundSocket")
}
}

private def makePairings(bulk: ScheduledBulk): Funit = workQueue(bulk.by) {
val perfType = PerfType(bulk.variant, Speed(bulk.clock))
Source(bulk.games)
.mapAsyncUnordered(8) { game =>
userRepo.pair(game.white, game.black) map2 { case (white, black) =>
(game.id, white, black)
Expand All @@ -37,27 +84,27 @@ final class ChallengeBulkApi(
.map[Game] { case (id, white, black) =>
Game
.make(
chess =
chess.Game(situation = Situation(scheduled.variant), clock = scheduled.clock.toClock.some),
chess = chess.Game(situation = Situation(bulk.variant), clock = bulk.clock.toClock.some),
whitePlayer = Player.make(chess.White, white.some, _(perfType)),
blackPlayer = Player.make(chess.Black, black.some, _(perfType)),
mode = scheduled.mode,
mode = bulk.mode,
source = lila.game.Source.Api,
pgnImport = None
)
.withId(id)
.start
.pipe { g =>
if (startClock) g.startClock.fold(g)(_.game) else g
}
}
.mapAsyncUnordered(8) { game =>
(gameRepo insertDenormalized game) >>- onStart(game.id)
}
.toMat(LilaStream.sinkCount)(Keep.right)
.run()
.addEffect { nb =>
lila.mon.api.challenge.bulk.createNb(by.id).increment(nb).unit
}
lila.mon.api.challenge.bulk.createNb(bulk.by).increment(nb).unit
} >> {
if (bulk.startClocksAt.isDefined)
coll.updateField($id(bulk._id), "pairedAt", DateTime.now)
else coll.delete.one($id(bulk._id))
}.void
}
}
4 changes: 3 additions & 1 deletion modules/challenge/src/main/ChallengeRepo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import scala.annotation.nowarn
import lila.common.config.Max
import lila.db.dsl._

final private class ChallengeRepo(coll: Coll, maxPerUser: Max)(implicit
final private class ChallengeRepo(colls: ChallengeColls, maxPerUser: Max)(implicit
ec: scala.concurrent.ExecutionContext
) {

import BSONHandlers._
import Challenge._

private val coll = colls.challenge

def byId(id: Challenge.ID) = coll.find($id(id)).one[Challenge]

def byIdFor(id: Challenge.ID, dest: lila.user.User) =
Expand Down
18 changes: 13 additions & 5 deletions modules/challenge/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ final class Env(

private lazy val maxPlaying = appConfig.get[Max]("setup.max_playing")

private val colls = wire[ChallengeColls]

def version(challengeId: Challenge.ID): Fu[SocketVersion] =
socket.rooms.ask[SocketVersion](challengeId)(GetVersion)

Expand All @@ -43,18 +45,24 @@ final class Env(

lazy val granter = wire[ChallengeGranter]

private lazy val repo = new ChallengeRepo(
coll = db(CollName("challenge")),
maxPerUser = maxPlaying
)
private lazy val repo = wire[ChallengeRepo]

lazy val jsonView = wire[JsonView]

lazy val bulk = wire[ChallengeBulkApi]

val forms = new ChallengeForm

system.scheduler.scheduleWithFixedDelay(10 seconds, 3 seconds) { () =>
system.scheduler.scheduleWithFixedDelay(10 seconds, 3343 millis) { () =>
api.sweep.unit
}

system.scheduler.scheduleWithFixedDelay(20 seconds, 2897 millis) { () =>
bulk.tick.unit
}
}

private class ChallengeColls(db: lila.db.Db) {
val challenge = db(CollName("challenge"))
val bulk = db(CollName("challenge_bulk"))
}
25 changes: 25 additions & 0 deletions modules/db/src/main/Handlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.util.{ Failure, Success, Try }
import lila.common.Iso._
import lila.common.{ EmailAddress, IpAddress, Iso, NormalizedEmailAddress }
import chess.format.FEN
import chess.variant.Variant

trait Handlers {

Expand Down Expand Up @@ -117,4 +118,28 @@ trait Handlers {
implicit val colorBoolHandler = BSONBooleanHandler.as[chess.Color](chess.Color.fromWhite, _.white)

implicit val FENHandler: BSONHandler[FEN] = stringAnyValHandler[FEN](_.value, FEN.apply)

implicit val modeHandler = BSONBooleanHandler.as[chess.Mode](chess.Mode.apply, _.rated)

val variantByKeyHandler: BSONHandler[Variant] = quickHandler[Variant](
{
case BSONString(v) => Variant orDefault v
case _ => Variant.default
},
v => BSONString(v.key)
)

val clockConfigHandler = tryHandler[chess.Clock.Config](
{ case doc: BSONDocument =>
for {
limit <- doc.getAsTry[Int]("limit")
inc <- doc.getAsTry[Int]("increment")
} yield chess.Clock.Config(limit, inc)
},
c =>
BSONDocument(
"limit" -> c.limitSeconds,
"increment" -> c.incrementSeconds
)
)
}
66 changes: 56 additions & 10 deletions modules/setup/src/main/SetupBulk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ object SetupBulk {

val maxGames = 500

case class BulkFormData(tokens: String, variant: Variant, clock: Clock.Config, rated: Boolean)
case class BulkFormData(
tokens: String,
variant: Variant,
clock: Clock.Config,
rated: Boolean,
pairAt: Option[DateTime],
startClocksAt: Option[DateTime]
)

val form = Form[BulkFormData](
private def timestampInNearFuture = longNumber(max = DateTime.now.plusDays(1).getMillis)

def form = Form[BulkFormData](
mapping(
"tokens" -> nonEmptyText
.verifying("Not enough tokens", t => extractTokenPairs(t).nonEmpty)
Expand All @@ -34,10 +43,27 @@ object SetupBulk {
}
),
SetupForm.api.variant,
"clock" -> SetupForm.api.clockMapping,
"rated" -> boolean
) { (tokens: String, variant: Option[String], clock: Clock.Config, rated: Boolean) =>
BulkFormData(tokens, Variant orDefault ~variant, clock, rated)
"clock" -> SetupForm.api.clockMapping,
"rated" -> boolean,
"pairAt" -> optional(timestampInNearFuture),
"startClocksAt" -> optional(timestampInNearFuture)
) {
(
tokens: String,
variant: Option[String],
clock: Clock.Config,
rated: Boolean,
pairTs: Option[Long],
clockTs: Option[Long]
) =>
BulkFormData(
tokens,
Variant orDefault ~variant,
clock,
rated,
pairTs.map { new DateTime(_) },
clockTs.map { new DateTime(_) }
)
}(_ => None)
)

Expand All @@ -59,13 +85,22 @@ object SetupBulk {
case class ScheduledGame(id: Game.ID, white: User.ID, black: User.ID)

case class ScheduledBulk(
_id: String,
by: User.ID,
games: List[ScheduledGame],
variant: Variant,
clock: Clock.Config,
mode: Mode,
pairAt: DateTime,
startClocksAt: DateTime
)
startClocksAt: Option[DateTime],
scheduledAt: DateTime,
pairedAt: Option[DateTime] = None
) {
def userSet = Set(games.flatMap(g => List(g.white, g.black)))
def collidesWith(other: ScheduledBulk) = {
pairAt == other.pairAt || startClocksAt == startClocksAt
} && userSet.exists(other.userSet.contains)
}
}

final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator)(implicit
Expand All @@ -75,7 +110,7 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator)

import SetupBulk._

def apply(data: BulkFormData): Fu[Either[List[BadToken], ScheduledBulk]] =
def apply(data: BulkFormData, me: User): Fu[Either[List[BadToken], ScheduledBulk]] =
Source(extractTokenPairs(data.tokens))
.mapConcat { case (whiteToken, blackToken) =>
List(whiteToken, blackToken) // flatten now, re-pair later!
Expand All @@ -98,6 +133,7 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator)
.grouped(2)
.collect { case List(w, b) => (w, b) }
.toList
lila.mon.api.challenge.bulk.scheduleNb(me.id).increment(pairs.size).unit
idGenerator
.games(pairs.size)
.map {
Expand All @@ -109,7 +145,17 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator)
}
}
.dmap {
ScheduledBulk(_, data.variant, data.clock, Mode(data.rated), DateTime.now, DateTime.now)
ScheduledBulk(
_id = lila.common.ThreadLocalRandom nextString 8,
by = me.id,
_,
data.variant,
data.clock,
Mode(data.rated),
pairAt = data.pairAt | DateTime.now,
startClocksAt = data.startClocksAt,
scheduledAt = DateTime.now
)
}
.dmap(Right.apply)
}
Expand Down
Loading

0 comments on commit 639a765

Please sign in to comment.