Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19299

Nulls in non nullable columns causes data corruption in parquet

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
    • None
    • PySpark, Spark Core

    Description

      The problem we're seeing is that if a null occurs in a non-nullable field and is written down to parquet the resulting file gets corrupted and can not be read back correctly.

      One way that this can occur is if a long value in python overflows the sql LongType, this results in a null value inside the dataframe.

      We're also seeing that the behaviour is different depending on whether or not the vectorized reader is enabled.

      Here's an example in PySpark

      from datetime import datetime
      from pyspark.sql import types
      
      data = [
        (1, 6),
        (2, 7),
        (3, 2 ** 64), # value overflows sql LongType
        (4, 8),
        (5, 9)
      ]
      
      schema = types.StructType([
        types.StructField("index", types.LongType(), False),
        types.StructField("long", types.LongType(), False),
      ])
      
      df = sc.sql.createDataFrame(data, schema)
      
      df.collect()
      
      df.write.parquet("corrupt_parquet")
      
      df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
      
      df_parquet.collect()
      

      with the vectorized reader enabled this causes

      In [2]: df.collect()
      Out[2]:
      [Row(index=1, long=6),
       Row(index=2, long=7),
       Row(index=3, long=None),
       Row(index=4, long=8),
       Row(index=5, long=9)]
      
      In [3]: df_parquet.collect()
      Out[3]:
      [Row(index=1, long=6),
       Row(index=2, long=7),
       Row(index=3, long=8),
       Row(index=4, long=9),
       Row(index=5, long=5)]
      

      as you can see reading the data back from disk causes data to get shifted up and between columns.

      with the vectorized reader disabled we are completely unable to read the file.

      Py4JJavaError: An error occurred while calling o143.collectToPython.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read value at 4 in block 0 in file file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
      	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
      	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
      	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
      	at org.apache.spark.scheduler.Task.run(Task.scala:86)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition level: 0, definition level: 0
      	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
      	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
      	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
      	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
      	... 19 more
      Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
      	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
      	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
      	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458)
      	... 22 more
      Caused by: java.io.EOFException
      	at org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90)
      	at org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377)
      	at org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              franklynDsouza Franklyn Dsouza
              Votes:
              10 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: