Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect schema migrations from running in parallel #199

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add a lock implementation based on SOLRs optimstic locking
For code that should not run in parallel when multiple deployments are
active, synchronisation must happen outside the app. For this SOLR can
be used, utilising it's optimistic locking feature. Based on this, a
`Resource` is provided to be able to run code sequentially.
  • Loading branch information
eikek committed Sep 6, 2024
commit 310cc0c7905242bf640e63a732b4e3e29728c30f
58 changes: 58 additions & 0 deletions modules/commons/src/test/scala/io/renku/search/Threading.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import scala.util.Try

import cats.effect.*

import munit.*

trait Threading:
self: CatsEffectSuite =>

// using "real" threads to have higher chance of parallelism

def runParallel[A](block: => IO[A], blockn: IO[A]*): IO[List[A]] =
IO.blocking {
val code = block +: blockn
val latch = CountDownLatch(1)
val result = new AtomicReference[List[A]](Nil)
val done = new CountDownLatch(code.size)
code.foreach { ioa =>
val t = new Thread(new Runnable {
def run() =
latch.await()
val ta = Try(ioa.unsafeRunSync())
ta.fold(_ => (), a => result.updateAndGet(list => a :: list))
done.countDown()
ta.fold(throw _, _ => ())
()
})
t.setDaemon(true)
t.start()
}
latch.countDown()
done.await(munitIOTimeout.toMillis, TimeUnit.MILLISECONDS)
result.get()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package io.renku.solr.client

import cats.MonadThrow
import cats.data.NonEmptyList
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.bullet.borer.{Decoder, Encoder}
import io.renku.search.http.{ClientBuilder, ResponseLogging, RetryConfig}
import io.renku.solr.client.schema.SchemaCommand
import io.renku.solr.client.util.{DocumentLockResource, LockDocument}
import org.http4s.ember.client.EmberClientBuilder

trait SolrClient[F[_]]:
Expand Down Expand Up @@ -52,6 +54,29 @@ trait SolrClient[F[_]]:
def createCore(name: String, configSet: Option[String] = None): F[Unit]
def deleteCore(name: String): F[Unit]

/** Returns a `Resource` that yields `true` if a lock for `id` could be obtained. It
* yields `false` if the lock `id` is already held.
*
* It uses a solr document of the given `id`.
*/
def lockOn(id: String)(using MonadThrow[F]): Resource[F, Boolean] =
DocumentLockResource.create[F](this)(id)

/** Returns a `Resource` that yields a `Some` if the lock represented by `A` could be
* obtained and `None` if not.
*
* The lock is represented by a solr document `A`. The `acquire` function either
* returns a new document in "acquired" state or sets the acquired state should the
* document already exist. Analogous, `release` puts the document back into free state
* or return `None` to remove the document from SOLR. The function `isFree` is used to
* determine the state if a document already exists with that id. If it doesn't exist,
* the lock is free to obtain.
*/
def lockBy[A](
id: String
)(using MonadThrow[F], LockDocument[F, A]): Resource[F, Option[A]] =
DocumentLockResource[F, A](this).make(id)

object SolrClient:
def apply[F[_]: Async: Network](config: SolrConfig): Resource[F, SolrClient[F]] =
ClientBuilder(EmberClientBuilder.default[F])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.solr.client.util

import cats.MonadThrow
import cats.data.NonEmptyList
import cats.effect.*
import cats.syntax.all.*

import io.bullet.borer.Decoder
import io.bullet.borer.Encoder
import io.bullet.borer.derivation.MapBasedCodecs
import io.bullet.borer.derivation.key
import io.renku.solr.client.*

/** Utilising SOLRs optimistic locking, create a [[cats.effect.Resource]] implementing a
* lock.
*
* NOTE: The document represented as `A` must supply the necessary properties to enable
* SOLRs optimistic locking, it must have at least an `id` and `_version_` property!
*/
final class DocumentLockResource[F[_]: MonadThrow, A](
client: SolrClient[F]
)(using e: LockDocument[F, A]):
private given Encoder[A] = e.encoder
private given Decoder[A] = e.decoder

/** Creates a resource that is represented by a solr document with the given
* `documentId`. The resource is considered acquired, if the document either can be
* inserted (it doesn't exist) or passes the `isFree` check if it exists. If this is
* fulfilled, the `acquire` function must return the "acquired" state of the document
* that will be upserted to SOLR (this state must not pass the `isFree` check).
* Finally, the `release` function determines the next free state of the resource. If
* `release` returns `None` the document will be deleted from SOLR.
*
* The resource yields a `Some` if the lock was acquired successfully. It yields a
* `None` if the document was in "non-free" state and therefore was not acquired
* successfully.
*/
def make(documentId: String): Resource[F, Option[A]] = {
val acq = for
existing <- getDocument(documentId)
res <-
if (existing.forall(e.isFree))
e.acquire(existing, documentId).map(Seq(_)).flatMap(client.upsert(_)).flatMap {
case UpsertResponse.VersionConflict => None.pure[F]
case UpsertResponse.Success(_) => getDocument(documentId)
}
else None.pure[F]
yield res

def rel(a: Option[A]) = a match {
case None => ().pure[F]
case Some(_) =>
requireDocument(documentId)
.map(e.release)
.flatMap {
case Some(d) => client.upsertSuccess(Seq(d))
case None => client.deleteIds(NonEmptyList.of(documentId))
}
}
Resource.make(acq)(rel)
}

private def getDocument(docId: String): F[Option[A]] =
client
.query[A](QueryString(s"id:$docId"))
.map(_.responseBody.docs.headOption)

private def requireDocument(docId: String) =
getDocument(docId).flatMap {
case None =>
MonadThrow[F].raiseError(
new Exception("No document available during resource release!")
)
case Some(d) => d.pure[F]
}

object DocumentLockResource:

def apply[F[_]: MonadThrow, A](
client: SolrClient[F]
)(using LockDocument[F, A]): DocumentLockResource[F, A] =
new DocumentLockResource[F, A](client)

final private case class SimpleLock(
id: String,
@key("_version_") version: DocVersion
)
private object SimpleLock:
given Encoder[SimpleLock] = MapBasedCodecs.deriveEncoder
given Decoder[SimpleLock] = MapBasedCodecs.deriveDecoder
given [F[_]: MonadThrow]: LockDocument[F, SimpleLock] =
LockDocument.whenExists(id =>
MonadThrow[F].pure(SimpleLock(id, DocVersion.NotExists))
)

def create[F[_]: MonadThrow](client: SolrClient[F])(id: String): Resource[F, Boolean] =
DocumentLockResource[F, SimpleLock](client)
.make(id)
.map(_.isDefined)
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.solr.client.util

import io.bullet.borer.*

/** Documents implementing this interface can be used for locking with
* `DocumentLockResource`.
*
* These documents must also provide at least an `id` and `_version_` property to enable
* SOLRs optimistic locking.
*/
trait LockDocument[F[_], A]:
def encoder: Encoder[A]
def decoder: Decoder[A]

/** Determines if an existing document is in "free" state. */
def isFree(value: A): Boolean

/** Return a document in "acquired" state, either set it given an existing document or
* create a new document in this state using the provided `id`.
*/
def acquire(existing: Option[A], id: String): F[A]

/** Given the document in "acquired" state, return it in "free" state. Return `None` to
* have it deleted from the index.
*/
def release(value: A): Option[A]

object LockDocument:

def apply[F[_], A: Encoder: Decoder](
_isFree: A => Boolean,
_acquire: (Option[A], String) => F[A],
_release: A => Option[A]
): LockDocument[F, A] = new LockDocument[F, A] {
val encoder: Encoder[A] = summon[Encoder[A]]
val decoder: Decoder[A] = summon[Decoder[A]]
def isFree(value: A): Boolean = _isFree(value)
def acquire(existing: Option[A], id: String): F[A] = _acquire(existing, id)
def release(value: A): Option[A] = _release(value)
}

/** A version that is based only on existence of a document.
*
* If the document exists in SOLR, the resource yields a `None` (not acquired). If the
* document doesn't exist, it is inserted and the current version returned as a `Some`.
*/
def whenExists[F[_], A: Encoder: Decoder](create: String => F[A]): LockDocument[F, A] =
apply(_isFree = _ => false, _acquire = (_, id) => create(id), _release = _ => None)
Loading