diff --git a/app/controllers/Challenge.scala b/app/controllers/Challenge.scala index cc8f67bbe6891..40b9b361587b5 100644 --- a/app/controllers/Challenge.scala +++ b/app/controllers/Challenge.scala @@ -317,7 +317,7 @@ final class Challenge( .fold( newJsonFormError, data => - env.setup.bulk(data) map { + env.setup.bulk(data, me) flatMap { case Left(badTokens) => import lila.setup.SetupBulk.BadToken import play.api.libs.json._ @@ -329,15 +329,16 @@ final class Challenge( } } ) - ) + ).fuccess case Right(bulk) => - env.challenge.bulk(me, bulk).thenPp - Ok(Json.obj("games" -> bulk.games.map { g => - Json.obj( - "gameId" -> g.id, - "userIds" -> Json.arr(g.white, g.black) - ) - })) as JSON + env.challenge.bulk.schedule(bulk) inject { + Ok(Json.obj("games" -> bulk.games.map { g => + Json.obj( + "gameId" -> g.id, + "userIds" -> Json.arr(g.white, g.black) + ) + })) as JSON + } } ) } diff --git a/modules/challenge/src/main/BSONHandlers.scala b/modules/challenge/src/main/BSONHandlers.scala index f5e586a87a264..70cd728dfc478 100644 --- a/modules/challenge/src/main/BSONHandlers.scala +++ b/modules/challenge/src/main/BSONHandlers.scala @@ -48,7 +48,6 @@ private object BSONHandlers { { case BSONInteger(v) => Status(v) toTry s"No such status: $v" }, x => BSONInteger(x.id) ) - implicit val ModeBSONHandler = BSONBooleanHandler.as[Mode](Mode.apply, _.rated) implicit val RatingBSONHandler = new BSON[Rating] { def reads(r: Reader) = Rating(r.int("i"), r.boolD("p")) def writes(w: Writer, r: Rating) = diff --git a/modules/challenge/src/main/ChallengeBulk.scala b/modules/challenge/src/main/ChallengeBulk.scala index 4e413538da5d3..ed3bd05494883 100644 --- a/modules/challenge/src/main/ChallengeBulk.scala +++ b/modules/challenge/src/main/ChallengeBulk.scala @@ -1,33 +1,80 @@ package lila.challenge +import akka.actor.ActorSystem import akka.stream.scaladsl._ import chess.variant.Variant import chess.{ Clock, Mode, Situation, Speed } import org.joda.time.DateTime -import scala.util.chaining._ +import reactivemongo.api.bson.Macros +import scala.concurrent.duration._ +import lila.common.Bus import lila.common.LilaStream +import lila.db.dsl._ import lila.game.{ Game, Player } +import lila.hub.actorApi.map.TellMany +import lila.hub.DuctSequencers import lila.rating.PerfType -import lila.setup.SetupBulk.ScheduledBulk +import lila.setup.SetupBulk.{ ScheduledBulk, ScheduledGame } import lila.user.User final class ChallengeBulkApi( + colls: ChallengeColls, gameRepo: lila.game.GameRepo, userRepo: lila.user.UserRepo, onStart: lila.round.OnStart )(implicit ec: scala.concurrent.ExecutionContext, - mat: akka.stream.Materializer + mat: akka.stream.Materializer, + system: ActorSystem, + mode: play.api.Mode ) { - def apply( - by: User, - scheduled: ScheduledBulk - ): Fu[Int] = { - val perfType = PerfType(scheduled.variant, Speed(scheduled.clock)) - val startClock = scheduled.startClocksAt isBefore DateTime.now - Source(scheduled.games) + implicit private val gameHandler = Macros.handler[ScheduledGame] + implicit private val variantHandler = variantByKeyHandler + implicit private val clockHandler = clockConfigHandler + implicit private val bulkHandler = Macros.handler[ScheduledBulk] + + private val coll = colls.bulk + + private val workQueue = + new DuctSequencers(maxSize = 64, expiration = 1 minute, timeout = 10 seconds, name = "challenge.bulk") + + def schedule(bulk: ScheduledBulk): Fu[Option[String]] = workQueue(bulk.by) { + if (bulk.pairAt.isBeforeNow) makePairings(bulk) inject none + else + coll.list[ScheduledBulk]($doc("by" -> bulk.by, "pairedAt" $exists false)) flatMap { bulks => + val nbGames = bulks.map(_.games.size).sum + if (bulks.sizeIs >= 10) fuccess("Already too many bulks queued".some) + else if (bulks.map(_.games.size).sum >= 1000) fuccess("Already too many games queued".some) + else if (bulks.exists(_ collidesWith bulk)) + fuccess("A bulk containing the same players is scheduled at the same time".some) + else coll.insert.one(bulk) inject none + } + } + + private[challenge] def tick: Funit = + checkForPairing >> checkForClocks + + private def checkForPairing: Funit = + coll.one[ScheduledBulk]($doc("pairAt" $lte DateTime.now, "pairedAt" $exists false)) flatMap { + _ ?? makePairings + } + + private def checkForClocks: Funit = + coll.one[ScheduledBulk]($doc("startClocksAt" $lte DateTime.now, "pairedAt" $exists true)) flatMap { + _ ?? startClocks + } + + private def startClocks(bulk: ScheduledBulk): Funit = workQueue(bulk.by) { + fuccess { + Bus.publish(TellMany(bulk.games.map(_.id), lila.round.actorApi.round.StartClock), "roundSocket") + } + } + + private def makePairings(bulk: ScheduledBulk): Funit = workQueue(bulk.by) { + val perfType = PerfType(bulk.variant, Speed(bulk.clock)) + Source(bulk.games) .mapAsyncUnordered(8) { game => userRepo.pair(game.white, game.black) map2 { case (white, black) => (game.id, white, black) @@ -37,19 +84,15 @@ final class ChallengeBulkApi( .map[Game] { case (id, white, black) => Game .make( - chess = - chess.Game(situation = Situation(scheduled.variant), clock = scheduled.clock.toClock.some), + chess = chess.Game(situation = Situation(bulk.variant), clock = bulk.clock.toClock.some), whitePlayer = Player.make(chess.White, white.some, _(perfType)), blackPlayer = Player.make(chess.Black, black.some, _(perfType)), - mode = scheduled.mode, + mode = bulk.mode, source = lila.game.Source.Api, pgnImport = None ) .withId(id) .start - .pipe { g => - if (startClock) g.startClock.fold(g)(_.game) else g - } } .mapAsyncUnordered(8) { game => (gameRepo insertDenormalized game) >>- onStart(game.id) @@ -57,7 +100,11 @@ final class ChallengeBulkApi( .toMat(LilaStream.sinkCount)(Keep.right) .run() .addEffect { nb => - lila.mon.api.challenge.bulk.createNb(by.id).increment(nb).unit - } + lila.mon.api.challenge.bulk.createNb(bulk.by).increment(nb).unit + } >> { + if (bulk.startClocksAt.isDefined) + coll.updateField($id(bulk._id), "pairedAt", DateTime.now) + else coll.delete.one($id(bulk._id)) + }.void } } diff --git a/modules/challenge/src/main/ChallengeRepo.scala b/modules/challenge/src/main/ChallengeRepo.scala index 82a919e81ef46..91ec0e4b46f10 100644 --- a/modules/challenge/src/main/ChallengeRepo.scala +++ b/modules/challenge/src/main/ChallengeRepo.scala @@ -6,13 +6,15 @@ import scala.annotation.nowarn import lila.common.config.Max import lila.db.dsl._ -final private class ChallengeRepo(coll: Coll, maxPerUser: Max)(implicit +final private class ChallengeRepo(colls: ChallengeColls, maxPerUser: Max)(implicit ec: scala.concurrent.ExecutionContext ) { import BSONHandlers._ import Challenge._ + private val coll = colls.challenge + def byId(id: Challenge.ID) = coll.find($id(id)).one[Challenge] def byIdFor(id: Challenge.ID, dest: lila.user.User) = diff --git a/modules/challenge/src/main/Env.scala b/modules/challenge/src/main/Env.scala index 69762acacbdb6..b031f37c09ef5 100644 --- a/modules/challenge/src/main/Env.scala +++ b/modules/challenge/src/main/Env.scala @@ -30,6 +30,8 @@ final class Env( private lazy val maxPlaying = appConfig.get[Max]("setup.max_playing") + private val colls = wire[ChallengeColls] + def version(challengeId: Challenge.ID): Fu[SocketVersion] = socket.rooms.ask[SocketVersion](challengeId)(GetVersion) @@ -43,10 +45,7 @@ final class Env( lazy val granter = wire[ChallengeGranter] - private lazy val repo = new ChallengeRepo( - coll = db(CollName("challenge")), - maxPerUser = maxPlaying - ) + private lazy val repo = wire[ChallengeRepo] lazy val jsonView = wire[JsonView] @@ -54,7 +53,16 @@ final class Env( val forms = new ChallengeForm - system.scheduler.scheduleWithFixedDelay(10 seconds, 3 seconds) { () => + system.scheduler.scheduleWithFixedDelay(10 seconds, 3343 millis) { () => api.sweep.unit } + + system.scheduler.scheduleWithFixedDelay(20 seconds, 2897 millis) { () => + bulk.tick.unit + } +} + +private class ChallengeColls(db: lila.db.Db) { + val challenge = db(CollName("challenge")) + val bulk = db(CollName("challenge_bulk")) } diff --git a/modules/db/src/main/Handlers.scala b/modules/db/src/main/Handlers.scala index f7941f7c6b40a..9a894ba93f3ab 100644 --- a/modules/db/src/main/Handlers.scala +++ b/modules/db/src/main/Handlers.scala @@ -9,6 +9,7 @@ import scala.util.{ Failure, Success, Try } import lila.common.Iso._ import lila.common.{ EmailAddress, IpAddress, Iso, NormalizedEmailAddress } import chess.format.FEN +import chess.variant.Variant trait Handlers { @@ -117,4 +118,28 @@ trait Handlers { implicit val colorBoolHandler = BSONBooleanHandler.as[chess.Color](chess.Color.fromWhite, _.white) implicit val FENHandler: BSONHandler[FEN] = stringAnyValHandler[FEN](_.value, FEN.apply) + + implicit val modeHandler = BSONBooleanHandler.as[chess.Mode](chess.Mode.apply, _.rated) + + val variantByKeyHandler: BSONHandler[Variant] = quickHandler[Variant]( + { + case BSONString(v) => Variant orDefault v + case _ => Variant.default + }, + v => BSONString(v.key) + ) + + val clockConfigHandler = tryHandler[chess.Clock.Config]( + { case doc: BSONDocument => + for { + limit <- doc.getAsTry[Int]("limit") + inc <- doc.getAsTry[Int]("increment") + } yield chess.Clock.Config(limit, inc) + }, + c => + BSONDocument( + "limit" -> c.limitSeconds, + "increment" -> c.incrementSeconds + ) + ) } diff --git a/modules/setup/src/main/SetupBulk.scala b/modules/setup/src/main/SetupBulk.scala index 35a9ebbabae7f..ab0f592e08631 100644 --- a/modules/setup/src/main/SetupBulk.scala +++ b/modules/setup/src/main/SetupBulk.scala @@ -19,9 +19,18 @@ object SetupBulk { val maxGames = 500 - case class BulkFormData(tokens: String, variant: Variant, clock: Clock.Config, rated: Boolean) + case class BulkFormData( + tokens: String, + variant: Variant, + clock: Clock.Config, + rated: Boolean, + pairAt: Option[DateTime], + startClocksAt: Option[DateTime] + ) - val form = Form[BulkFormData]( + private def timestampInNearFuture = longNumber(max = DateTime.now.plusDays(1).getMillis) + + def form = Form[BulkFormData]( mapping( "tokens" -> nonEmptyText .verifying("Not enough tokens", t => extractTokenPairs(t).nonEmpty) @@ -34,10 +43,27 @@ object SetupBulk { } ), SetupForm.api.variant, - "clock" -> SetupForm.api.clockMapping, - "rated" -> boolean - ) { (tokens: String, variant: Option[String], clock: Clock.Config, rated: Boolean) => - BulkFormData(tokens, Variant orDefault ~variant, clock, rated) + "clock" -> SetupForm.api.clockMapping, + "rated" -> boolean, + "pairAt" -> optional(timestampInNearFuture), + "startClocksAt" -> optional(timestampInNearFuture) + ) { + ( + tokens: String, + variant: Option[String], + clock: Clock.Config, + rated: Boolean, + pairTs: Option[Long], + clockTs: Option[Long] + ) => + BulkFormData( + tokens, + Variant orDefault ~variant, + clock, + rated, + pairTs.map { new DateTime(_) }, + clockTs.map { new DateTime(_) } + ) }(_ => None) ) @@ -59,13 +85,22 @@ object SetupBulk { case class ScheduledGame(id: Game.ID, white: User.ID, black: User.ID) case class ScheduledBulk( + _id: String, + by: User.ID, games: List[ScheduledGame], variant: Variant, clock: Clock.Config, mode: Mode, pairAt: DateTime, - startClocksAt: DateTime - ) + startClocksAt: Option[DateTime], + scheduledAt: DateTime, + pairedAt: Option[DateTime] = None + ) { + def userSet = Set(games.flatMap(g => List(g.white, g.black))) + def collidesWith(other: ScheduledBulk) = { + pairAt == other.pairAt || startClocksAt == startClocksAt + } && userSet.exists(other.userSet.contains) + } } final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator)(implicit @@ -75,7 +110,7 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator) import SetupBulk._ - def apply(data: BulkFormData): Fu[Either[List[BadToken], ScheduledBulk]] = + def apply(data: BulkFormData, me: User): Fu[Either[List[BadToken], ScheduledBulk]] = Source(extractTokenPairs(data.tokens)) .mapConcat { case (whiteToken, blackToken) => List(whiteToken, blackToken) // flatten now, re-pair later! @@ -98,6 +133,7 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator) .grouped(2) .collect { case List(w, b) => (w, b) } .toList + lila.mon.api.challenge.bulk.scheduleNb(me.id).increment(pairs.size).unit idGenerator .games(pairs.size) .map { @@ -109,7 +145,17 @@ final class BulkChallengeApi(oauthServer: OAuthServer, idGenerator: IdGenerator) } } .dmap { - ScheduledBulk(_, data.variant, data.clock, Mode(data.rated), DateTime.now, DateTime.now) + ScheduledBulk( + _id = lila.common.ThreadLocalRandom nextString 8, + by = me.id, + _, + data.variant, + data.clock, + Mode(data.rated), + pairAt = data.pairAt | DateTime.now, + startClocksAt = data.startClocksAt, + scheduledAt = DateTime.now + ) } .dmap(Right.apply) } diff --git a/modules/swiss/src/main/BsonHandlers.scala b/modules/swiss/src/main/BsonHandlers.scala index 0a7841a70bdb9..d799f7c165362 100644 --- a/modules/swiss/src/main/BsonHandlers.scala +++ b/modules/swiss/src/main/BsonHandlers.scala @@ -3,7 +3,6 @@ package lila.swiss import chess.Clock.{ Config => ClockConfig } import chess.Color import chess.format.FEN -import chess.variant.Variant import reactivemongo.api.bson._ import scala.concurrent.duration._ @@ -13,26 +12,8 @@ import lila.user.User private object BsonHandlers { - implicit val clockHandler = tryHandler[ClockConfig]( - { case doc: BSONDocument => - for { - limit <- doc.getAsTry[Int]("limit") - inc <- doc.getAsTry[Int]("increment") - } yield ClockConfig(limit, inc) - }, - c => - BSONDocument( - "limit" -> c.limitSeconds, - "increment" -> c.incrementSeconds - ) - ) - implicit val variantHandler = lila.db.dsl.quickHandler[Variant]( - { - case BSONString(v) => Variant orDefault v - case _ => Variant.default - }, - v => BSONString(v.key) - ) + implicit val variantHandler = variantByKeyHandler + implicit val clockHandler = clockConfigHandler implicit val swissPointsHandler = intAnyValHandler[Swiss.Points](_.double, Swiss.Points.apply) implicit val swissTieBreakHandler = doubleAnyValHandler[Swiss.TieBreak](_.value, Swiss.TieBreak.apply) implicit val swissPerformanceHandler =