Welcome to Eva! It is a Kotlin open-source framework, which helps you to write your code in DDD style and using CQRS approach.
Declare Eva dependencies in your project:
dependencies {
implementation("team.razz.eva:eva-uow:$eva_version")
implementation("team.razz.eva:eva-repository:$eva_version")
implementation("team.razz.eva:eva-persistence-jdbc:$eva_version")
}
Define events for your business domain. You can read more about domain and integration events here.
sealed class WalletEvent : ModelEvent<Wallet.Id> {
override val modelName = "Wallet"
data class Created(
override val modelId: Wallet.Id,
val currency: Currency,
val amount: ULong,
val expireAt: Instant
) : WalletEvent(), ModelCreatedEvent<Wallet.Id> {
override fun integrationEvent() = buildJsonObject {
put("currency", currency.currencyCode)
put("amount", amount.toLong())
put("expireAt", expireAt.epochSecond)
}
}
data class Deposit(
override val modelId: Wallet.Id,
val walletAmount: ULong,
val depositAmount: ULong
) : WalletEvent(), ModelCreatedEvent<Wallet.Id> {
override fun integrationEvent() = buildJsonObject {
put("walletAmount", walletAmount.toLong())
put("depositAmount", depositAmount.toLong())
}
}
}
Define a model and methods that change model's state. On any model's modification we should raise an event about it.
class Wallet(
id: Id,
val currency: Currency,
val amount: ULong,
val expireAt: Instant,
entityState: EntityState<Id, WalletEvent>
) : Model<Wallet.Id, WalletEvent>(id, entityState) {
data class Id(override val id: UUID) : ModelId<UUID>
fun deposit(toDeposit: ULong) = Wallet(
amount = amount - toDeposit,
currency = currency,
id = id(),
expireAt = expireAt,
entityState = entityState()
.raiseEvent(WalletEvent.Deposit(id(), amount, toDeposit))
)
}
We need queries interface, so we can query our existing models
interface WalletQueries {
suspend fun find(id: Wallet.Id): Wallet?
}
Now we can write our first unit of work. In our framework unit of work stands for Command in CQRS pattern. You can read more about CQRS here and here. Unit of work is a transactional operation. Here the unit of work either returns an existing wallet by ID or creates a new one and returns it.
class CreateWalletUow(
private val queries: WalletQueries,
clock: Clock
) : UnitOfWork<ServicePrincipal, Params, Wallet>(clock) {
@Serializable
data class Params(val id: String, val currency: String) : UowParams<Params> {
override fun serialization() = serializer()
}
override suspend fun tryPerform(principal: ServicePrincipal, params: Params): Changes<Wallet> {
val walletId = Wallet.Id(UUID.fromString(params.id))
val wallet = queries.find(walletId)
return if (wallet != null) {
noChanges(wallet)
} else {
val amount = ULong.MIN_VALUE
val currency = Currency.getInstance(params.currency)
val expireAt = clock.instant().plus(timeToExpire)
val newWallet = Wallet(
id = walletId,
currency = currency,
amount = amount,
expireAt = expireAt,
entityState = newState(WalletEvent.Created(walletId, currency, amount, expireAt))
)
changes {
add(newWallet)
}
}
}
companion object {
private val timeToExpire = Duration.ofDays(600)
}
}
In this example we return noChanges in case model with such id already exists, otherwise we create new model and using ChangesDsl we add new model.
You can also update your models in scope of unit of work -
changes {
update(wallet.deposit(amount))
}
By default, update allows passing model with no changes (model was not updated after calling deposit() method). Sometimes it can lead to some inconsistency in your domain logic - you expected model to be changed, but it wasn't. You can force verification for your model, that it was changed in scope of your unit of work:
changes {
update(wallet.deposit(amount), required = true)
}
To persist our model we need to add repository for it. We use jOOQ to have a type-safe DB querying. You need to generate jOOQ tables/records based on your DB schema to have a type-safe mapping of your model to DB record. You can use different Gradle plugins to generate jOOQ tables, f.e. check this plugin.
Your generated records should extend BaseEntityRecord. To achieve it use jOOQ matcher strategies.
When you create tables for your models you need to add next fields to your schema, so we can persist your model properly -
record_updated_at TIMESTAMP NOT NULL ,
record_created_at TIMESTAMP NOT NULL ,
version BIGINT NOT NULL
After you created DB schema for you data, we can implement Repository for your model.
class WalletRepository(
queryExecutor: QueryExecutor,
dslContext: DSLContext
) : WalletQueries, JooqBaseModelRepository<UUID, Wallet.Id, Wallet, WalletEvent, WalletRecord>(
queryExecutor = queryExecutor,
dslContext = dslContext,
table = WALLET
) {
override fun toRecord(model: Wallet) = WalletRecord().apply {
currency = model.currency.currencyCode
amount = model.amount.toLong()
expireAt = model.expireAt
}
override fun fromRecord(
record: WalletRecord,
entityState: PersistentState<Wallet.Id, WalletEvent>
) = Wallet(
id = Wallet.Id(record.id),
currency = Currency.getInstance(record.currency),
amount = record.amount.toULong(),
expireAt = record.expireAt,
entityState = entityState
)
}
We have unit of work and repository, so we can set up everything together. In this example we are going to use JDBC implementation for our transactional manager.
class WalletModule(databaseConfig: DatabaseConfig) {
/**
* Query executor definition
*/
val transactionManager = JdbcTransactionManager(
primaryProvider = HikariPoolConnectionProvider(dataSource(databaseConfig, isPrimary = true)),
replicaProvider = HikariPoolConnectionProvider(dataSource(databaseConfig, isPrimary = false)),
blockingJdbcContext = newFixedThreadPool(databaseConfig.maxPoolSize.value()).asCoroutineDispatcher()
)
val queryExecutor = JdbcQueryExecutor(transactionManager)
val dslContext: DSLContext = DSL.using(
POSTGRES,
Settings().withRenderNamedParamPrefix("$").withParamType(ParamType.NAMED)
)
/**
* Persisting definition
*/
val tracer = NoopTracerFactory.create()
val walletRepo = WalletRepository(queryExecutor, dslContext)
val persisting = Persisting(
transactionManager = transactionManager,
modelRepos = ModelRepos(Wallet::class hasRepo walletRepo),
eventRepository = JooqEventRepository(queryExecutor, dslContext, tracer)
)
/**
* Unit of work executor definition
*/
val clock = Clock.tickMillis(UTC)
val uowx: UnitOfWorkExecutor = UnitOfWorkExecutor(
persisting = persisting,
tracer = tracer,
meterRegistry = SimpleMeterRegistry(),
factories = listOf(
CreateWalletUow::class withFactory { CreateWalletUow(walletRepo, clock) }
)
)
}
Please don't forget to create tables for your models and table for storing events. You can find script to create event's table here
val module = WalletModule(config)
val principal = ServicePrincipal(Principal.Id("eva-id"), Principal.Name("eva"))
val createdWallet = module.uowx.execute(CreateWalletUow::class, principal) {
CreateWalletUow.Params(
id = "45dfd599-4d62-47f1-8e47-a779df4f6bbc",
currency = "USD"
)
}
Eva employs outbox pattern for event distribution. In short: events are written to the same database and in the same transaction with models. Same transactional guarantees applied to both models and events. Events schema and migrations are provided by eva, you can find sql sources here and persistence logic here. Eva is not in charge of further distribution of such events, however there are several opensource frameworks available, for instance Kafka Connect and Debezium.
When desired, events can be published through custom implementation of EventPublisher. This publisher has to be passed to Persisting
as optional parameter like demonstrated below:
val persisting: Persisting = Persisting(
transactionManager = persistenceModule.transactionManager,
modelRepos = repositoryModule.modelRepos,
eventRepository = eventRepository,
eventPublisher = eventPublisher
)
Events are passed to the publisher out of the scope of transaction once models are persisted. If persisting of models fails, no events are passed to the publisher. Publisher failure does not affect models persisting. Eva provides simple in-memory eventbus implementation for your convenience. This eventbus implements Publisher
interface and accepts multiple EventConsumer
s to which it distributes published events. This implementation provides fifo guarantees for published events and does not provide any guarantees regarding distribution resilience. We strongly suggest to follow transactional outbox
approach if at-least-once event delivery is a requirement.
After you wrote your first unit of work, you probably want to ask - how I can test it?
We provide verification DSL, so you can write unit tests and verify results of your unit of work.
Use verifyInOrder
function to start verification process.
CreateWalletUow(queries, clock).tryPerform(principal, params) verifyInOrder {
adds<Wallet> { model -> ... }
addsEq(expectedModel)
updates<Wallet> { model -> ... }
updatesEq(expectedModel)
emits<WalletEvent> { event -> ... }
emitsEq(expectedEvent)
returns { result -> ... }
}
You can check some examples here
Sometimes something goes wrong, your service doesn't respond within deadline. You want to make a retry, but you are afraid of creating duplicates or new unnecessary models, so your DB becomes inconsistent.
To prevent it people use idempotency key pattern. Unit of work allows you to define idempotency key in params, so you can safely make retries.
@Serializable
data class Params(
val id: String,
val currency: String,
override val idempotencyKey: IdempotencyKey
) : UowParams<Params> {
override fun serialization() = serializer()
}
Idempotency key can be shipped as a standalone artifact outside your service, if you f.e. want to pass it via http request.
implementation("team.razz.eva:eva-idempotency-key:$eva_version")
Out of the box Eva supports paging for your data, when it is not possible to return all results in one request. First, you need to add paging module to your dependencies:
implementation("team.razz.eva:eva-paging:$eva_version")
Second, you need to define your PagingStrategy. For now, we support paging by some timestamp only.
object WalletPaging : PagingStrategy<UUID, Wallet.Id, Wallet, Wallet, WalletRecord>(Wallet::class) {
override fun tableTimestamp() = WALLET.EXPIRE_AT
override fun tableId() = WALLET.ID
override fun tableOffset(modelOffset: ModelOffset) = UUID.fromString(modelOffset)
override fun modelTimestamp(model: Wallet) = model.expireAt
override fun modelOffset(model: Wallet) = model.id().stringValue()
}
Now we can implement method in our repository to get pages.
suspend fun wallets(currency: Currency, page: TimestampPage) = findPage(
condition = WALLET.CURRENCY.eq(currency.currencyCode),
page = page,
pagingStrategy = WalletPaging
)
That's all! This method returns object of PagedList. It provides a part of your requested results and the next page to query the next part of results.
One day you are going face a lot of concurrent unit of works. It leads to concurrent modification of same models. But our units of work are transactional, so we guarantee consistency of your models. In case of concurrent modification unit of work throws StaleRecordException. By default, we will do a one retry for such kind of exception, but you can change this strategy
val configuration = UnitOfWork.Configuration(
retry = StaleRecordFixedRetry(attempts = 3, staleRecordDelay = Duration.ofMillis(100))
)
class CreateWalletUow(
private val queries: WalletQueries,
clock: Clock
) : UnitOfWork<ServicePrincipal, Params, Wallet>(clock, configuration = configuration) {
Your database schema can also have some constraints, f.e. unique index. We don't want you to deal with such kind of exception out of your unit of work. You can intercept these exceptions and throw your business exception or return some fallback result.
In our CreateWalletUow
we check if wallet with same id already exists.
But we can face situation, when wallet with same id was created during unit of work execution.
Let's intercept this error and return already created wallet. We also can handle DB constraint and throw some more meaningful exception.
override suspend fun onFailure(params: Params, ex: PersistenceException): Wallet = when(ex) {
is UniqueModelRecordViolationException -> checkNotNull(queries.find(Wallet.Id(UUID.fromString(params.id))))
is ModelRecordConstraintViolationException -> throw IllegalArgumentException("${params.currency} is invalid")
else -> throw ex
}
If you care about your system performance - you want to collect some metrics, so you can create alerts and investigate poor performance. We allow you to collect some metrics via Micrometer framework and do instrumentation with Opentracing. Both frameworks provide you interfaces, and you can choose your own implementation, how you want to collect metrics. In this example we use Prometheus and Jaeger implementations.
Don't forget to add Jaeger and Prometheus dependencies to your project
val meterRegistry = PrometheusMeterRegistry(DEFAULT)
val uowx: UnitOfWorkExecutor = UnitOfWorkExecutor(
persisting = persisting,
tracer = tracer("wallet-service"),
meterRegistry = meterRegistry,
factories = listOf(
CreateWalletUow::class withFactory { CreateWalletUow(walletRepo, clock) }
)
)
In the begging we suggested you to add eva-persistence-jdbc to your dependencies and explained how to configure JdbcTransactionManager. Under the hood it uses classic blocking Java JDBC driver.
But we also ship non-blocking version of TransactionManager - VertxTransactionManager, based on Vert.x. Add this implementation to your dependencies
implementation("team.razz.eva:eva-persistence-vertx:$eva_version")
Configuration:
val transactionManager = VertxTransactionManager(
primaryProvider = PgPoolConnectionProvider(
poolProvider(primaryConfig, true, meterRegistry)
),
replicaProvider = PgPoolConnectionProvider(
poolProvider(replicaConfig, false, meterRegistry)
)
)
val queryExecutor = VertxQueryExecutor(transactionManager)
private fun poolProvider(config: DatabaseConfig, isPrimary: Boolean, meterRegistry: MeterRegistry): PgPool {
val vertx = vertx(
VertxOptions()
.setMetricsOptions(
MicrometerMetricsOptions()
.setMicrometerRegistry(meterRegistry)
.setLabels(setOf(POOL_NAME, POOL_TYPE, REMOTE, NAMESPACE))
.setEnabled(true)
)
)
check(config.nodes.size == 1 || !isPrimary) {
"Primary pool must be configured with single db node"
}
val options = config.nodes.map { node ->
PgConnectOptions().apply {
cachePreparedStatements = true
preparedStatementCacheMaxSize = 2048
preparedStatementCacheSqlFilter = Predicate { sql -> sql.length < 10_000 }
pipeliningLimit = 256
user = config.user.toString()
password = config.password.showPassword()
host = node.host()
database = config.name.toString()
port = node.port()
}
}
return PgPool.pool(vertx, options, PoolOptions().apply { maxSize = config.maxPoolSize.value() })
}
Eva is distributed under the terms of the Apache License (Version 2.0). See license file for details.