Skip to content

Commit

Permalink
[Spark][Version Checksum] Add DV-related metrics (#3953)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
1. Adds Deletion Vector-related metrics to Version Checksum. 
2. Adds full and incremental computation of these metrics
3. Updates checksum verification so that it takes these new metrics into
account

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
ChecksumDVMetricsSuite

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dhruvarya-db authored Jan 2, 2025
1 parent 80dfb74 commit 98dd01e
Show file tree
Hide file tree
Showing 10 changed files with 807 additions and 6 deletions.
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@
],
"sqlState" : "42601"
},
"DELTA_DV_HISTOGRAM_DESERIALIZATON" : {
"message" : [
"Could not deserialize the deleted record counts histogram during table integrity verification."
],
"sqlState" : "22000"
},
"DELTA_DYNAMIC_PARTITION_OVERWRITE_DISABLED" : {
"message" : [
"Dynamic partition overwrite mode is specified by session config or write options, but it is disabled by `delta.dynamicPartitionOverwrite.enabled=false`."
Expand Down
86 changes: 84 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
Expand All @@ -50,14 +52,22 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* @param txnId Optional transaction identifier
* @param tableSizeBytes The size of the table in bytes
* @param numFiles Number of `AddFile` actions in the snapshot
* @param numDeletedRecordsOpt The number of deleted records with Deletion Vectors.
* @param numDeletionVectorsOpt The number of Deletion Vectors present in the snapshot.
* @param numMetadata Number of `Metadata` actions in the snapshot
* @param numProtocol Number of `Protocol` actions in the snapshot
* @param histogramOpt Optional file size histogram
* @param deletedRecordCountsHistogramOpt A histogram of the deleted records count distribution
* for all the files in the snapshot.
*/
case class VersionChecksum(
txnId: Option[String],
tableSizeBytes: Long,
numFiles: Long,
@JsonDeserialize(contentAs = classOf[Long])
numDeletedRecordsOpt: Option[Long],
@JsonDeserialize(contentAs = classOf[Long])
numDeletionVectorsOpt: Option[Long],
numMetadata: Long,
numProtocol: Long,
@JsonDeserialize(contentAs = classOf[Long])
Expand All @@ -67,6 +77,7 @@ case class VersionChecksum(
metadata: Metadata,
protocol: Protocol,
histogramOpt: Option[FileSizeHistogram],
deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram],
allFiles: Option[Seq[AddFile]])

/**
Expand Down Expand Up @@ -202,6 +213,10 @@ trait RecordChecksum extends DeltaLogging {
// Incrementally compute the new version checksum, if the old one is available.
val ignoreAddFilesInOperation =
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
val persistentDVsOnTableReadable =
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
val persistentDVsOnTableWritable =
DeletionVectorUtils.deletionVectorsWritable(protocol, metadata)

computeNewChecksum(
versionToCompute,
Expand All @@ -211,7 +226,9 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot,
actions,
ignoreAddFilesInOperation,
includeAddFilesInCrc
includeAddFilesInCrc,
persistentDVsOnTableReadable,
persistentDVsOnTableWritable
)
}

Expand All @@ -227,6 +244,10 @@ trait RecordChecksum extends DeltaLogging {
* @param actions used to incrementally compute new checksum.
* @param ignoreAddFiles for transactions whose add file actions refer to already-existing files
* e.g., [[DeltaOperations.ComputeStats]] transactions.
* @param persistentDVsOnTableReadable Indicates whether commands modifying this table are allowed
* to read deletion vectors.
* @param persistentDVsOnTableWritable Indicates whether commands modifying this table are allowed
* to create new deletion vectors.
* @return Either the new checksum or error code string if the checksum could not be computed
* incrementally due to some reason.
*/
Expand All @@ -239,7 +260,9 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot: Option[Snapshot],
actions: Seq[Action],
ignoreAddFiles: Boolean,
includeAllFilesInCRC: Boolean
includeAllFilesInCRC: Boolean,
persistentDVsOnTableReadable: Boolean,
persistentDVsOnTableWritable: Boolean
) : Either[String, VersionChecksum] = {
// scalastyle:on argcount
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
Expand All @@ -248,19 +271,66 @@ trait RecordChecksum extends DeltaLogging {
var protocol = oldVersionChecksum.protocol
var metadata = oldVersionChecksum.metadata

// In incremental computation, tables initialized with DVs disabled contain None DV
// statistics. DV statistics remain None even if DVs are enabled at a random point
// during the lifecycle of a table. That can only change if a full snapshot recomputation
// is invoked while DVs are enabled for the table.
val conf = spark.sessionState.conf
val isFirstVersion = oldSnapshot.forall(_.version == -1)
val checksumDVMetricsEnabled = conf.getConf(DeltaSQLConf.DELTA_CHECKSUM_DV_METRICS_ENABLED)
val deletedRecordCountsHistogramEnabled =
conf.getConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED)

// For tables where DVs were disabled later on in the table lifecycle we want to maintain DV
// statistics.
val computeDVMetricsWhenDVsNotWritable = persistentDVsOnTableReadable &&
oldVersionChecksum.numDeletionVectorsOpt.isDefined && !isFirstVersion

val computeDVMetrics = checksumDVMetricsEnabled &&
(persistentDVsOnTableWritable || computeDVMetricsWhenDVsNotWritable)

// DV-related metrics. When the old checksum does not contain DV statistics, we attempt to
// pick them up from the old snapshot.
var numDeletedRecordsOpt = if (computeDVMetrics) {
oldVersionChecksum.numDeletedRecordsOpt
.orElse(oldSnapshot.flatMap(_.numDeletedRecordsOpt))
} else None
var numDeletionVectorsOpt = if (computeDVMetrics) {
oldVersionChecksum.numDeletionVectorsOpt
.orElse(oldSnapshot.flatMap(_.numDeletionVectorsOpt))
} else None
val deletedRecordCountsHistogramOpt =
if (computeDVMetrics && deletedRecordCountsHistogramEnabled) {
oldVersionChecksum.deletedRecordCountsHistogramOpt
.orElse(oldSnapshot.flatMap(_.deletedRecordCountsHistogramOpt))
.map(h => DeletedRecordCountsHistogram(h.deletedRecordCounts.clone()))
} else None

var inCommitTimestamp : Option[Long] = None
actions.foreach {
case a: AddFile if !ignoreAddFiles =>
tableSizeBytes += a.size
numFiles += 1

// Only accumulate DV statistics when base stats are not None.
val (dvCount, dvCardinality) =
Option(a.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L)
numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ + dvCardinality)
numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ + dvCount)
deletedRecordCountsHistogramOpt.foreach(_.insert(dvCardinality))

// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
val size = r.size.get
tableSizeBytes -= size
numFiles -= 1

// Only accumulate DV statistics when base stats are not None.
val (dvCount, dvCardinality) =
Option(r.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L)
numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ - dvCardinality)
numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ - dvCount)
deletedRecordCountsHistogramOpt.foreach(_.remove(dvCardinality))

case r: RemoveFile =>
// Report the failure to usage logs.
Expand Down Expand Up @@ -359,6 +429,8 @@ trait RecordChecksum extends DeltaLogging {
txnId = txnIdOpt,
tableSizeBytes = tableSizeBytes,
numFiles = numFiles,
numDeletedRecordsOpt = numDeletedRecordsOpt,
numDeletionVectorsOpt = numDeletionVectorsOpt,
numMetadata = 1,
numProtocol = 1,
inCommitTimestampOpt = inCommitTimestamp,
Expand All @@ -367,6 +439,7 @@ trait RecordChecksum extends DeltaLogging {
setTransactions = setTransactions,
domainMetadata = domainMetadata,
allFiles = allFiles,
deletedRecordCountsHistogramOpt = deletedRecordCountsHistogramOpt,
histogramOpt = None
))
}
Expand Down Expand Up @@ -794,6 +867,15 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
detailedErrorMapForUsageLogs += ("domainMetadata" -> JsonUtils.toJson(eventData))
}
}
// Deletion vectors metrics.
if (DeletionVectorUtils.deletionVectorsReadable(self)) {
(checksum.numDeletedRecordsOpt zip computedState.numDeletedRecordsOpt).foreach {
case (a, b) => compare(a, b, "Number of deleted records", "numDeletedRecordsOpt")
}
(checksum.numDeletionVectorsOpt zip computedState.numDeletionVectorsOpt).foreach {
case (a, b) => compare(a, b, "Number of deleted vectors", "numDeletionVectorsOpt")
}
}

compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata")
compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ trait DeltaErrorsBase
DeltaConfigs.CHANGE_DATA_FEED.key))
}

def deletedRecordCountsHistogramDeserializationException(): Throwable = {
new DeltaChecksumException(
errorClass = "DELTA_DV_HISTOGRAM_DESERIALIZATON",
messageParameters = Array.empty,
pos = 0)
}

/**
* Throwable used for invalid CDC 'start' and 'end' options, where end < start
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.encoderFor
Expand Down Expand Up @@ -45,6 +46,10 @@ object DeltaUDF {
def stringFromMap(f: Map[String, String] => String): UserDefinedFunction =
createUdfFromTemplateUnsafe(stringFromMapTemplate, f, udf(f))

def deletedRecordCountsHistogramFromArrayLong(
f: Array[Long] => DeletedRecordCountsHistogram): UserDefinedFunction =
createUdfFromTemplateUnsafe(deletedRecordCountsHistogramFromArrayLongTemplate, f, udf(f))

def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

Expand All @@ -65,6 +70,10 @@ object DeltaUDF {
private lazy val stringFromMapTemplate =
udf((_: Map[String, String]) => "").asInstanceOf[SparkUserDefinedFunction]

private lazy val deletedRecordCountsHistogramFromArrayLongTemplate =
udf((_: Array[Long]) => DeletedRecordCountsHistogram(Array.empty))
.asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,15 @@ class Snapshot(
},
domainMetadata = checksumOpt.flatMap(_.domainMetadata)
.orElse(Option.when(_computedStateTriggered)(domainMetadata)),
numDeletedRecordsOpt = checksumOpt.flatMap(_.numDeletedRecordsOpt)
.orElse(Option.when(_computedStateTriggered)(numDeletedRecordsOpt).flatten)
.filter(_ => deletionVectorsReadableAndMetricsEnabled),
numDeletionVectorsOpt = checksumOpt.flatMap(_.numDeletionVectorsOpt)
.orElse(Option.when(_computedStateTriggered)(numDeletionVectorsOpt).flatten)
.filter(_ => deletionVectorsReadableAndMetricsEnabled),
deletedRecordCountsHistogramOpt = checksumOpt.flatMap(_.deletedRecordCountsHistogramOpt)
.orElse(Option.when(_computedStateTriggered)(deletedRecordCountsHistogramOpt).flatten)
.filter(_ => deletionVectorsReadableAndHistogramEnabled),
histogramOpt = checksumOpt.flatMap(_.histogramOpt)
)

Expand Down
Loading

0 comments on commit 98dd01e

Please sign in to comment.