Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.4.0, 3.5.0, 4.0.0
Description
The scenario is a bit more complicated than what the title says but it's not that far fetched.
- Write a parquet file with one column
- Evolve the schema and add a new column with DecimalType wide enough that it doesn't fit in a long and has a default value.
- Try to read the file with the new schema
- NPE
The issue lies in how the column vector stores DecimalTypes. It incorrectly assumes that they fit in a long and try to write it to associated long array.
In OnHeapColumnVector which extends WritableColumVector reserveInternal() checks if the type is too wide and initializes the array elements.
isArray() returns true if the type is byteArrayDecimalType
Without the fix
[info] Cause: java.lang.NullPointerException: [info] at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLongs(OnHeapColumnVector.java:370) [info] at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendLongs(WritableColumnVector.java:611) [info] at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendObjects(WritableColumnVector.java:745) [info] at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:95) [info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286) [info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306) [info] at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:218) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:280) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130) [info] at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614) [info] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) [info] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) [info] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [info] at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) [info] at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) [info] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
fix PR https://github.com/apache/spark/pull/43960