Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22482

Unreadable Parquet array columns

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • Spark Core
    • Spark 2.1.0
      Parquet 1.8.1
      Hive 1.2
      Hive 2.1.0
      presto 0.157
      presto 0.180

    Description

      We have seen an issue with writing out parquet data from spark. int and bool arrays seem to be throwing exceptions when trying to read the parquet files from hive and presto.

      I've logged a ticket here: PARQUET-1157 with the parquet project but I'm not sure if it's an issue within their project or an issue with spark itself.

      Spark is reading parquet-avro data which is output by a mapreduce job and writing it out to parquet.

      The inbound parquet format has the column defined as:

        optional group playerpositions_ai (LIST) {
          repeated int32 array;
        }
      

      Spark is redefining this data as this:

        optional group playerpositions_ai (LIST) {
          repeated group list {
            optional int32 element;
          }
        }
      

      and with legacy format:

        optional group playerpositions_ai (LIST) {
          repeated group bag {
            optional int32 array;
          }
        }
      

      The parquet data was tested in Hive 1.2, Hive 2.1, Presto 0.157, Presto 0.180, and Spark 2.1, as well as Amazon Athena (which is some form of presto implementation).

      I believe that the above schema is valid for writing out parquet.

      The spark command writing it out is simple:

            data.repartition(((data.count() / 10000000) + 1).toInt).write.format("parquet")
              .mode("append")
              .partitionBy(partitionColumns: _*)
              .save(path)
      

      We initially wrote this out with legacy format turned off but later turned on legacy format and have seen this error occur the same way with legacy off and on.

      Spark's stack trace from reading this is:

      java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
      	at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
      	at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:82)
      	at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:64)
      	at org.apache.parquet.column.values.dictionary.DictionaryValuesReader.readInteger(DictionaryValuesReader.java:112)
      	at org.apache.parquet.column.impl.ColumnReaderImpl$2$3.read(ColumnReaderImpl.java:243)
      	at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:464)
      	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:370)
      	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
      	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
      	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
      	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      	at org.apache.spark.scheduler.Task.run(Task.scala:99)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      Also do note that our data is stored on S3 if that matters.

      Attachments

        Activity

          People

            Unassigned Unassigned
            cpiliotis Costas Piliotis
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: