Skip to content

Commit

Permalink
More detailed store transaction metrics (digital-asset#6163)
Browse files Browse the repository at this point in the history
* Add additional metrics when storing transactions

Since event witnesses will soon be denormalized into the participant_events
table, I did not include metrics right now.

CHANGELOG_BEGIN
[DAML Ledger Integration Kit] Add additional metrics for storing transactions. The overall time is measured by ``daml.index.db.store_ledger_entry``.
- Timer ``daml.index.db.store_ledger_entry.prepare_batches``: measures the time for preparing batch insert/delete statements
- Timer ``daml.index.db.store_ledger_entry.events_batch``: measures the time for inserting events
- Timer ``daml.index.db.store_ledger_entry.delete_contract_witnesses_batch``:  measures the time for deleting contract witnesses
- Timer ``daml.index.db.store_ledger_entry.delete_contracts_batch``: measures the time for deleting contracts
- Timer ``daml.index.db.store_ledger_entry.insert_contracts_batch``: measures the time for inserting contracts
- Timer ``daml.index.db.store_ledger_entry.insert_contract_witnesses_batch``: measures the time for inserting contract witnesses
- Timer ``daml.index.db.store_ledger_entry.insert_completion``: measures the time for inserting the completion
- Timer ``daml.index.db.store_ledger_entry.update_ledger_end``: measures the time for updating the ledger end
[Sandbox Classic] Added Timer ``daml.index.db.store_ledger_entry.commit_validation``: measure the time for commit validation in Sandbox Classic

CHANGELOG_END

* Refactoring: rename metrics *dao to *DbMetrics
  • Loading branch information
gerolf-da authored May 29, 2020
1 parent 12c05c4 commit 070bd18
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,14 @@ package com.daml.metrics

import com.codahale.metrics.{MetricRegistry, Timer}

final class DatabaseMetrics private[metrics] (
class DatabaseMetrics private[metrics] (
registry: MetricRegistry,
prefix: MetricName,
val name: String,
) {
val dbPrefix = prefix :+ name

val waitTimer: Timer = registry.timer(prefix :+ name :+ "wait")
val executionTimer: Timer = registry.timer(prefix :+ name :+ "exec")
val translationTimer: Timer = registry.timer(prefix :+ name :+ "translation")

}

object DatabaseMetrics {

private[metrics] def apply(registry: MetricRegistry, prefix: MetricName)(
name: String,
): DatabaseMetrics =
new DatabaseMetrics(registry, prefix, name)

val waitTimer: Timer = registry.timer(dbPrefix :+ "wait")
val executionTimer: Timer = registry.timer(dbPrefix :+ "exec")
val translationTimer: Timer = registry.timer(dbPrefix :+ "translation")
}
94 changes: 55 additions & 39 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,59 +313,75 @@ class Metrics(val registry: MetricRegistry) {
val stopDeduplicatingCommand: Timer =
registry.timer(prefix :+ "stop_deduplicating_command")

private val createDatabaseMetrics: String => DatabaseMetrics =
DatabaseMetrics(registry, prefix)(_)
private val createDbMetrics: String => DatabaseMetrics =
new DatabaseMetrics(registry, prefix, _)

private val overall = createDatabaseMetrics("all")
private val overall = createDbMetrics("all")
val waitAll: Timer = overall.waitTimer
val execAll: Timer = overall.executionTimer

val getCompletions: DatabaseMetrics = createDatabaseMetrics("get_completions")
val getLedgerId: DatabaseMetrics = createDatabaseMetrics("get_ledger_id")
val getLedgerEnd: DatabaseMetrics = createDatabaseMetrics("get_ledger_end")
val getInitialLedgerEnd: DatabaseMetrics = createDatabaseMetrics("get_initial_ledger_end")
val initializeLedgerParameters: DatabaseMetrics = createDatabaseMetrics(
val getCompletions: DatabaseMetrics = createDbMetrics("get_completions")
val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id")
val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end")
val getInitialLedgerEnd: DatabaseMetrics = createDbMetrics("get_initial_ledger_end")
val initializeLedgerParameters: DatabaseMetrics = createDbMetrics(
"initialize_ledger_parameters")
val lookupConfiguration: DatabaseMetrics = createDatabaseMetrics("lookup_configuration")
val loadConfigurationEntries: DatabaseMetrics = createDatabaseMetrics(
val lookupConfiguration: DatabaseMetrics = createDbMetrics("lookup_configuration")
val loadConfigurationEntries: DatabaseMetrics = createDbMetrics(
"load_configuration_entries")
val storeConfigurationEntryDao: DatabaseMetrics = createDatabaseMetrics(
val storeConfigurationEntryDbMetrics: DatabaseMetrics = createDbMetrics(
"store_configuration_entry") // FIXME Base name conflicts with storeConfigurationEntry
val storePartyEntryDao
: DatabaseMetrics = createDatabaseMetrics("store_party_entry") // FIXME Base name conflicts with storePartyEntry
val loadPartyEntries: DatabaseMetrics = createDatabaseMetrics("load_party_entries")
val storeTransactionDao
: DatabaseMetrics = createDatabaseMetrics("store_ledger_entry") // FIXME Base name conflicts with storeTransaction
val storeRejectionDao
: DatabaseMetrics = createDatabaseMetrics("store_rejection") // FIXME Base name conflicts with storeRejection
val storeInitialStateFromScenario: DatabaseMetrics = createDatabaseMetrics(
val storePartyEntryDbMetrics
: DatabaseMetrics = createDbMetrics("store_party_entry") // FIXME Base name conflicts with storePartyEntry
val loadPartyEntries: DatabaseMetrics = createDbMetrics("load_party_entries")
object storeTransactionDbMetrics
extends DatabaseMetrics(registry, prefix, "store_ledger_entry") {
// outside of SQL transaction
val prepareBatches = registry.timer(dbPrefix :+ "prepare_batches")

// in order within SQL transaction
val commitValidation = registry.timer(dbPrefix :+ "commit_validation")
val eventsBatch = registry.timer(dbPrefix :+ "events_batch")
val deleteContractWitnessesBatch =
registry.timer(dbPrefix :+ "delete_contract_witnesses_batch")
val deleteContractsBatch = registry.timer(dbPrefix :+ "delete_contracts_batch")
val insertContractsBatch = registry.timer(dbPrefix :+ "insert_contracts_batch")
val insertContractWitnessesBatch =
registry.timer(dbPrefix :+ "insert_contract_witnesses_batch")

val insertCompletion = registry.timer(dbPrefix :+ "insert_completion")
val updateLedgerEnd = registry.timer(dbPrefix :+ "update_ledger_end")
}
val storeRejectionDbMetrics
: DatabaseMetrics = createDbMetrics("store_rejection") // FIXME Base name conflicts with storeRejection
val storeInitialStateFromScenario: DatabaseMetrics = createDbMetrics(
"store_initial_state_from_scenario")
val loadParties: DatabaseMetrics = createDatabaseMetrics("load_parties")
val loadAllParties: DatabaseMetrics = createDatabaseMetrics("load_all_parties")
val loadPackages: DatabaseMetrics = createDatabaseMetrics("load_packages")
val loadArchive: DatabaseMetrics = createDatabaseMetrics("load_archive")
val storePackageEntryDao
: DatabaseMetrics = createDatabaseMetrics("store_package_entry") // FIXME Base name conflicts with storePackageEntry
val loadPackageEntries: DatabaseMetrics = createDatabaseMetrics("load_package_entries")
val deduplicateCommandDao
: DatabaseMetrics = createDatabaseMetrics("deduplicate_command") // FIXME Base name conflicts with deduplicateCommand
val removeExpiredDeduplicationDataDao: DatabaseMetrics = createDatabaseMetrics(
val loadParties: DatabaseMetrics = createDbMetrics("load_parties")
val loadAllParties: DatabaseMetrics = createDbMetrics("load_all_parties")
val loadPackages: DatabaseMetrics = createDbMetrics("load_packages")
val loadArchive: DatabaseMetrics = createDbMetrics("load_archive")
val storePackageEntryDbMetrics
: DatabaseMetrics = createDbMetrics("store_package_entry") // FIXME Base name conflicts with storePackageEntry
val loadPackageEntries: DatabaseMetrics = createDbMetrics("load_package_entries")
val deduplicateCommandDbMetrics
: DatabaseMetrics = createDbMetrics("deduplicate_command") // FIXME Base name conflicts with deduplicateCommand
val removeExpiredDeduplicationDataDbMetrics: DatabaseMetrics = createDbMetrics(
"remove_expired_deduplication_data") // FIXME Base name conflicts with removeExpiredDeduplicationData
val stopDeduplicatingCommandDao: DatabaseMetrics = createDatabaseMetrics(
val stopDeduplicatingCommandDbMetrics: DatabaseMetrics = createDbMetrics(
"stop_deduplicating_command") // FIXME Base name conflicts with stopDeduplicatingCommand
val truncateAllTables: DatabaseMetrics = createDatabaseMetrics("truncate_all_tables")
val lookupActiveContractDao: DatabaseMetrics = createDatabaseMetrics(
val truncateAllTables: DatabaseMetrics = createDbMetrics("truncate_all_tables")
val lookupActiveContractDbMetrics: DatabaseMetrics = createDbMetrics(
"lookup_active_contract") // FIXME Base name conflicts with lookupActiveContract
val lookupContractByKey: DatabaseMetrics = createDatabaseMetrics("lookup_contract_by_key")
val lookupMaximumLedgerTimeDao: DatabaseMetrics = createDatabaseMetrics(
val lookupContractByKey: DatabaseMetrics = createDbMetrics("lookup_contract_by_key")
val lookupMaximumLedgerTimeDbMetrics: DatabaseMetrics = createDbMetrics(
"lookup_maximum_ledger_time") // FIXME Base name conflicts with lookupActiveContract
val getFlatTransactions: DatabaseMetrics = createDatabaseMetrics("get_flat_transactions")
val lookupFlatTransactionById: DatabaseMetrics = createDatabaseMetrics(
val getFlatTransactions: DatabaseMetrics = createDbMetrics("get_flat_transactions")
val lookupFlatTransactionById: DatabaseMetrics = createDbMetrics(
"lookup_flat_transaction_by_id")
val getTransactionTrees: DatabaseMetrics = createDatabaseMetrics("get_transaction_trees")
val lookupTransactionTreeById: DatabaseMetrics = createDatabaseMetrics(
val getTransactionTrees: DatabaseMetrics = createDbMetrics("get_transaction_trees")
val lookupTransactionTreeById: DatabaseMetrics = createDbMetrics(
"lookup_transaction_tree_by_id")
val getActiveContracts: DatabaseMetrics = createDatabaseMetrics("get_active_contracts")
val getActiveContracts: DatabaseMetrics = createDbMetrics("get_active_contracts")

object translation {
val prefix: MetricName = db.prefix :+ "translation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.daml.lf.data.Ref.{PackageId, Party}
import com.daml.lf.transaction.Node
import com.daml.lf.value.Value.{ContractId, NodeId}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.configuration.ServerRole
import com.daml.platform.events.EventIdFormatter.split
Expand Down Expand Up @@ -263,7 +263,7 @@ private class JdbcLedgerDao(
rejectionReason: Option[String]
): Future[PersistenceResponse] = {
dbDispatcher.executeSql(
metrics.daml.index.db.storeConfigurationEntryDao,
metrics.daml.index.db.storeConfigurationEntryDbMetrics,
Some(s"submissionId=$submissionId"),
) { implicit conn =>
val optCurrentConfig = selectLedgerConfiguration
Expand Down Expand Up @@ -338,7 +338,7 @@ private class JdbcLedgerDao(
offset: Offset,
partyEntry: PartyLedgerEntry,
): Future[PersistenceResponse] = {
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDao) { implicit conn =>
dbDispatcher.executeSql(metrics.daml.index.db.storePartyEntryDbMetrics) { implicit conn =>
updateLedgerEnd(offset)

partyEntry match {
Expand Down Expand Up @@ -477,35 +477,47 @@ private class JdbcLedgerDao(
divulged: Iterable[DivulgedContract],
): Future[PersistenceResponse] = {
val preparedTransactionInsert =
transactionsWriter.prepare(
submitterInfo = submitterInfo,
workflowId = workflowId,
transactionId = transactionId,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
transaction = transaction,
divulgedContracts = divulged,
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.prepareBatches,
transactionsWriter.prepare(
submitterInfo = submitterInfo,
workflowId = workflowId,
transactionId = transactionId,
ledgerEffectiveTime = ledgerEffectiveTime,
offset = offset,
transaction = transaction,
divulgedContracts = divulged,
)
)
dbDispatcher
.executeSql(metrics.daml.index.db.storeTransactionDao) { implicit conn =>
.executeSql(metrics.daml.index.db.storeTransactionDbMetrics) { implicit conn =>
val error =
postCommitValidation.validate(
transaction = transaction,
transactionLedgerEffectiveTime = ledgerEffectiveTime,
divulged = divulged.iterator.map(_.contractId).toSet,
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.commitValidation,
postCommitValidation.validate(
transaction = transaction,
transactionLedgerEffectiveTime = ledgerEffectiveTime,
divulged = divulged.iterator.map(_.contractId).toSet,
)
)
if (error.isEmpty) {
preparedTransactionInsert.write()
submitterInfo
.map(prepareCompletionInsert(_, offset, transactionId, recordTime))
.foreach(_.execute())
preparedTransactionInsert.write(metrics)
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.insertCompletion,
submitterInfo
.map(prepareCompletionInsert(_, offset, transactionId, recordTime))
.foreach(_.execute())
)
} else {
for (info @ SubmitterInfo(submitter, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(domain.CommandId(commandId), submitter)
prepareRejectionInsert(info, offset, recordTime, error.get).execute()
}
}
updateLedgerEnd(offset)
Timed.value(
metrics.daml.index.db.storeTransactionDbMetrics.updateLedgerEnd,
updateLedgerEnd(offset)
)
Ok
}
}
Expand All @@ -516,7 +528,7 @@ private class JdbcLedgerDao(
offset: Offset,
reason: RejectionReason,
): Future[PersistenceResponse] =
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDao) { implicit conn =>
dbDispatcher.executeSql(metrics.daml.index.db.storeRejectionDbMetrics) { implicit conn =>
for (info @ SubmitterInfo(submitter, _, commandId, _) <- submitterInfo) {
stopDeduplicatingCommandSync(domain.CommandId(commandId), submitter)
prepareRejectionInsert(info, offset, recordTime, reason).execute()
Expand Down Expand Up @@ -551,7 +563,7 @@ private class JdbcLedgerDao(
transaction = tx.transaction.mapNodeId(splitOrThrow),
divulgedContracts = Nil,
)
.write()
.write(metrics)
submitterInfo
.map(prepareCompletionInsert(_, offset, tx.transactionId, tx.recordedAt))
.foreach(_.execute())
Expand Down Expand Up @@ -704,7 +716,7 @@ private class JdbcLedgerDao(
optEntry: Option[PackageLedgerEntry]
): Future[PersistenceResponse] = {
dbDispatcher.executeSql(
metrics.daml.index.db.storePackageEntryDao,
metrics.daml.index.db.storePackageEntryDbMetrics,
Some(s"packages: ${packages.map(_._1.getHash).mkString(", ")}")) { implicit conn =>
updateLedgerEnd(offset)

Expand Down Expand Up @@ -815,7 +827,7 @@ private class JdbcLedgerDao(
submitter: Ref.Party,
submittedAt: Instant,
deduplicateUntil: Instant): Future[CommandDeduplicationResult] =
dbDispatcher.executeSql(metrics.daml.index.db.deduplicateCommandDao) { implicit conn =>
dbDispatcher.executeSql(metrics.daml.index.db.deduplicateCommandDbMetrics) { implicit conn =>
val key = deduplicationKey(commandId, submitter)
// Insert a new deduplication entry, or update an expired entry
val updated = SQL(queries.SQL_INSERT_COMMAND)
Expand Down Expand Up @@ -844,7 +856,7 @@ private class JdbcLedgerDao(
""".stripMargin)

override def removeExpiredDeduplicationData(currentTime: Instant): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.removeExpiredDeduplicationDataDao) {
dbDispatcher.executeSql(metrics.daml.index.db.removeExpiredDeduplicationDataDbMetrics) {
implicit conn =>
SQL_DELETE_EXPIRED_COMMANDS
.on("currentTime" -> currentTime)
Expand All @@ -869,8 +881,9 @@ private class JdbcLedgerDao(
override def stopDeduplicatingCommand(
commandId: domain.CommandId,
submitter: Party): Future[Unit] =
dbDispatcher.executeSql(metrics.daml.index.db.stopDeduplicatingCommandDao) { implicit conn =>
stopDeduplicatingCommandSync(commandId, submitter)
dbDispatcher.executeSql(metrics.daml.index.db.stopDeduplicatingCommandDbMetrics) {
implicit conn =>
stopDeduplicatingCommandSync(commandId, submitter)
}

override def reset(): Future[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[dao] sealed abstract class ContractsReader(
contractId: ContractId,
): Future[Option[Contract]] =
dispatcher
.executeSql(metrics.daml.index.db.lookupActiveContractDao) { implicit connection =>
.executeSql(metrics.daml.index.db.lookupActiveContractDbMetrics) { implicit connection =>
SQL"select participant_contracts.contract_id, template_id, create_argument from #$contractsTable where contract_witness = $submitter and participant_contracts.contract_id = $contractId"
.as(contractRowParser.singleOpt)
}
Expand All @@ -50,7 +50,8 @@ private[dao] sealed abstract class ContractsReader(
contractId = contractId,
templateId = templateId,
createArgument = createArgument,
deserializationTimer = metrics.daml.index.db.lookupActiveContractDao.translationTimer,
deserializationTimer =
metrics.daml.index.db.lookupActiveContractDbMetrics.translationTimer,
)
})

Expand All @@ -64,7 +65,7 @@ private[dao] sealed abstract class ContractsReader(

override def lookupMaximumLedgerTime(ids: Set[ContractId]): Future[Option[Instant]] =
dispatcher
.executeSql(metrics.daml.index.db.lookupMaximumLedgerTimeDao) { implicit connection =>
.executeSql(metrics.daml.index.db.lookupMaximumLedgerTimeDbMetrics) { implicit connection =>
committedContracts.lookupMaximumLedgerTime(ids)
}
.map(_.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,32 @@ private[dao] object TransactionsWriter {
deleteWitnessesBatch: Option[BatchSql],
insertWitnessesBatch: Option[BatchSql],
) {
def write()(implicit connection: Connection): Unit = {
eventBatches.foreach(_.execute())
def write(metrics: Metrics)(implicit connection: Connection): Unit = {
import metrics.daml.index.db.storeTransactionDbMetrics

Timed.value(storeTransactionDbMetrics.eventsBatch, eventBatches.foreach(_.execute()))
flatTransactionWitnessesBatch.foreach(_.execute())
complementWitnessesBatch.foreach(_.execute())

// Delete the witnesses of contracts that being removed first, to
// respect the foreign key constraint of the underlying storage
deleteWitnessesBatch.map(_.execute())
Timed.value(
storeTransactionDbMetrics.deleteContractWitnessesBatch,
deleteWitnessesBatch.map(_.execute()))
for ((_, deleteContractsBatch) <- contractBatches.deletions) {
deleteContractsBatch.execute()
Timed.value(storeTransactionDbMetrics.deleteContractsBatch, deleteContractsBatch.execute())
}
for ((_, insertContractsBatch) <- contractBatches.insertions) {
insertContractsBatch.execute()
Timed.value(storeTransactionDbMetrics.insertContractsBatch, insertContractsBatch.execute())
}

// Insert the witnesses last to respect the foreign key constraint of the underlying storage.
// Compute and insert new witnesses regardless of whether the current transaction adds new
// contracts because it may be the case that we are only adding new witnesses to existing
// contracts (e.g. via divulging a contract with fetch).
insertWitnessesBatch.foreach(_.execute())
Timed.value(
storeTransactionDbMetrics.insertContractWitnessesBatch,
insertWitnessesBatch.foreach(_.execute()))
}
}

Expand Down Expand Up @@ -193,7 +199,7 @@ private[dao] final class TransactionsWriter(

val (serializedEventBatches, serializedContractBatches) =
Timed.value(
metrics.daml.index.db.storeTransactionDao.translationTimer,
metrics.daml.index.db.storeTransactionDbMetrics.translationTimer,
(
rawEventBatches.applySerialization(lfValueTranslation),
rawContractBatches.applySerialization(lfValueTranslation)
Expand Down

0 comments on commit 070bd18

Please sign in to comment.