Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34167

Reading parquet with Decimal(8,2) written as a Decimal64 blows up

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.1
    • Fix Version/s: 3.2.0
    • Component/s: Input/Output
    • Labels:
      None
    • Target Version/s:

      Description

      When reading a parquet file written with Decimals with precision < 10 as a 64-bit representation, Spark tries to read it as an INT and fails. I generated this file using https://github.com/rapidsai/cudf. It allowed me to create a Decimal(8,2) backed by a 64-bit representation (LongDecimal). I have attached the files that can be read successfully using a 3rd party parquet reader (I used nathonhowell/parquet-tools)

       

      Steps to reproduce:

      Read the attached file that has a single Decimal(8,2) column with 10 values

      scala> spark.read.parquet("/tmp/pyspark_tests/936454/PARQUET_DATA").show
      
      ...
      Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:327)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongs(VectorizedRleValuesReader.java:370)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:514)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:256)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:273)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:171)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:497)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1426)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:483)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      ...
      
      

       

       

      Here are my findings. The VectorizedParquetRecordReader starts to read in the long value from parquet file correctly because its basing the read on the requestedSchema which is a MessageType and has the underlying data stored correctly as INT64 where as the WritableColumnVector  is initialized based on the batchSchema which is coming from org.apache.spark.sql.parquet.row.requested_schema that is set by the reader which is a StructType and only has Decimal(_,_) in it.

      https://github.com/apache/spark/blob/a44e008de3ae5aecad9e0f1a7af6a1e8b0d97f4e/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L224

       

      So we can see the problem above is the WritableColumnVector is initialized to store an int array, while the VectorizedParquetReader method calls the readLongBatch method which in turn calls the VectorizedRleValuesReader.readLongs which reads the long values and tries to call WritableColumnVector.putLong which will throw a NPE because WritableColumnVector wasn't initialized to store a long array.

       In the case where the file has a dictionaryPage a different exception is thrown

       

      Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
        at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToInt(ParquetDictionary.java:31)
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:298)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getDecimal(WritableColumnVector.java:353)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      
      

      In this case we have to make sure the correct dictionary is initialized i.e. PlainIntDictionary by setting the correct type in the ColumnDescriptor

       

      Attached are two files, one with Decimal(8,2) ther other with Decimal(1,1) both written as Decimal backed by INT64. Decimal(1,1) results in a different exception but same for the same reason

       

       

        Attachments

          Activity

            People

            • Assignee:
              razajafri Raza Jafri
              Reporter:
              razajafri Raza Jafri
              Shepherd:
              Robert Joseph Evans
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: