Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-980

Cannot read row group larger than 2GB

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.8.0, 1.8.1, 1.8.2
    • None
    • parquet-mr
    • None

    Description

      Parquet MR 1.8.2 does not support reading row groups which are larger than 2 GB. See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064

      We are seeing this when writing skewed records. This throws off the estimation of the memory check interval in the InternalParquetRecordWriter. The following spark code illustrates this:

      /**
       * Create a data frame that will make parquet write a file with a row group larger than 2 GB. Parquet
       * only checks the size of the row group after writing a number of records. This number is based on
       * average row size of the already written records. This is problematic in the following scenario:
       * - The initial (100) records in the record group are relatively small.
       * - The InternalParquetRecordWriter checks if it needs to write to disk (it should not), it assumes
       *   that the remaining records have a similar size, and (greatly) increases the check interval (usually
       *   to 10000).
       * - The remaining records are much larger then expected, making the row group larger than 2 GB (which
       *   makes reading the row group impossible).
       *
       * The data frame below illustrates such a scenario. This creates a row group of approximately 4GB.
       */
      val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator =>
        var i = 0
        val random = new scala.util.Random(42)
        val buffer = new Array[Char](750000)
        iterator.map { id =>
          // the first 200 records have a length of 1K and the remaining 2000 have a length of 750K.
          val numChars = if (i < 200) 1000 else 750000
          i += 1
      
          // create a random array
          var j = 0
          while (j < numChars) {
            // Generate a char (borrowed from scala.util.Random)
            buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar
            j += 1
          }
      
          // create a string: the string constructor will copy the buffer.
          new String(buffer, 0, numChars)
        }
      }
      badDf.write.parquet("somefile")
      val corruptedDf = spark.read.parquet("somefile")
      corruptedDf.select(count(lit(1)), max(length($"value"))).show()
      

      The latter fails with the following exception:

      java.lang.NegativeArraySizeException
      	at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064)
      	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698)
      ...
      

      This seems to be fixed by commit https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x?

      Attachments

        Activity

          People

            Unassigned Unassigned
            hvanhovell Herman van Hövell
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: