Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.1
-
None
Description
When reading a parquet file written with Decimals with precision < 10 as a 64-bit representation, Spark tries to read it as an INT and fails. I generated this file using https://github.com/rapidsai/cudf. It allowed me to create a Decimal(8,2) backed by a 64-bit representation (LongDecimal). I have attached the files that can be read successfully using a 3rd party parquet reader (I used nathonhowell/parquet-tools)
Steps to reproduce:
Read the attached file that has a single Decimal(8,2) column with 10 values
scala> spark.read.parquet("/tmp/pyspark_tests/936454/PARQUET_DATA").show ... Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:327) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongs(VectorizedRleValuesReader.java:370) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:514) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:256) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:273) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:171) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:497) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1426) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:483) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ...
Here are my findings. The VectorizedParquetRecordReader starts to read in the long value from parquet file correctly because its basing the read on the requestedSchema which is a MessageType and has the underlying data stored correctly as INT64 where as the WritableColumnVector is initialized based on the batchSchema which is coming from org.apache.spark.sql.parquet.row.requested_schema that is set by the reader which is a StructType and only has Decimal(_,_) in it.
So we can see the problem above is the WritableColumnVector is initialized to store an int array, while the VectorizedParquetReader method calls the readLongBatch method which in turn calls the VectorizedRleValuesReader.readLongs which reads the long values and tries to call WritableColumnVector.putLong which will throw a NPE because WritableColumnVector wasn't initialized to store a long array.
In the case where the file has a dictionaryPage a different exception is thrown
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45) at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToInt(ParquetDictionary.java:31) at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:298) at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getDecimal(WritableColumnVector.java:353) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
In this case we have to make sure the correct dictionary is initialized i.e. PlainIntDictionary by setting the correct type in the ColumnDescriptor
Attached are two files, one with Decimal(8,2) ther other with Decimal(1,1) both written as Decimal backed by INT64. Decimal(1,1) results in a different exception but same for the same reason