Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][FLINK] ParquetDecodingException due to decimal column statistics in the checkpoint Parquet file #3909

Open
2 of 8 tasks
viniciuspnu opened this issue Nov 29, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@viniciuspnu
Copy link

viniciuspnu commented Nov 29, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

We have a Flink application that writes from Kafka to Delta tables. Periodically, a maintenance job is run using Spark on these tables, performing OPTIMIZE and VACUUM operations.

  • The maintenance job wrote a *.checkpoint.parquet Delta log checkpoint
  • This checkpoint includes an add.stats_parsed column, which is a struct containing the stats of each column
  • One of the stats is maxValues, which contains statistical information about columns, including a decimal(38,9) column
  • When Flink uses this checkpoint information, it employs a library (Parquet4S) to read the parquet files which sets the scale from 9 to 18. Increasing the scale limits the number of digits in the integer part to 20 digits
  • Our Delta table has some entries with an integer part greater than 20 digits, causing a BufferOverflowException during Parquet decoding
  • We confirmed this hypothesis by reading the latest checkpoint before the maintenance job. In this case, the Parquet file does not contain the stats_parsed column

Steps to reproduce

  1. Have a Delta table with checkpoint statistics information for a decimal(38,9) column and entries with an integer part greater than 20 digits;
  2. Try to write to this Delta table using the Delta connector.

Observed results

Flink connector throws a ParquetDecodingException due to the integer part of a decimal column exceeding 20 digits after the scale is increased from 9 to 18 when decoding the Parquet checkpoint file.

Expected results

Flink should be able to handle the Parquet checkpoint file with structured statistics for the columns.

Further details

shadedelta.org.apache.parquet.io.ParquetDecodingException: Can not read value at 15 in block 0 in file s3a://some-bucket/some-delta-table/_delta_log/00000000000000000232.checkpoint.parquet
	at shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
	at shadedelta.org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl$$anon$3.hasNext(ParquetReader.scala:144)
	at scala.collection.IterableLike.copyToArray(IterableLike.scala:255)
	at scala.collection.IterableLike.copyToArray$(IterableLike.scala:251)
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl.copyToArray(ParquetReader.scala:126)
	at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
	at scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl.copyToArray(ParquetReader.scala:126)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl.toArray(ParquetReader.scala:126)
	at io.delta.standalone.internal.SnapshotImpl.$anonfun$loadInMemory$3(SnapshotImpl.scala:284)
        ...
Caused by: java.nio.BufferOverflowException
	at java.base/java.nio.HeapByteBuffer.put(Unknown Source)
	at java.base/java.nio.ByteBuffer.put(Unknown Source)
	at shadedelta.com.github.mjakubowski84.parquet4s.Decimals$.binaryFromDecimal(Decimals.scala:36)
	at shadedelta.com.github.mjakubowski84.parquet4s.Decimals$.rescaleBinary(Decimals.scala:20)
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordConverter$DecimalConverter.addBinary(ParquetReadSupport.scala:133)
	at shadedelta.org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
	at shadedelta.org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
	at shadedelta.org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
	at shadedelta.org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at shadedelta.org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
	...

Environment information

  • Delta Lake version: 3.2.0
  • Spark version: N/A
  • Scala version: N/A

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@viniciuspnu viniciuspnu added the bug Something isn't working label Nov 29, 2024
@viniciuspnu
Copy link
Author

Increasing the scale limits the number of digits in the integer part to 20 digits

Sorry, this is not accurate, a better explanation is:

  1. In the Parquet4S library, the number is rescaled to have 18 decimal places (scale = 18)
  2. The value is unscaled
  3. This unscaled value must fit in 16 bytes (128 bits)

So for example:

  • For 21 digits in integer part + 18 digits after decimal = 39 digits total in unscaled form
  • The largest number would be something like: 999...999 (21 digits).999999999999999999 (18 digits)
  • When unscaled: 999...999999999999999999999 (39 digits)

This 39-digit number is just at the limit of what can be stored in 127 bits (1 bit is used for sign).

For 22 digits in the integer part, after rescaling to 18 decimal places and unscaling, it results in a 40-digit number which would overflow the 16-byte storage.

@viniciuspnu
Copy link
Author

Issue created in the Parquet4S library: mjakubowski84/parquet4s#365

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant