Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-1531

Page row count limit causes empty pages to be written from MessageColumnIO

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.11.0
    • 1.11.0
    • None

    Description

      This originally manifested as https://issues.apache.org/jira/browse/SPARK-26874 but we realized that this is fundamentally an issue in the way PARQUET-1414's solution interacts with MessageColumnIO, where Spark is one such user of that API.

      In MessageColumnIO#endMessage(), we first examine if any fields are missing and fill in the values with null in MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel. However, this method might not actually write any nulls to the underlying page. MessageColumnIO can buffer nulls in memory and flush them to the page store lazily.

      Regardless of whether or not nulls are flushed to the page store, in MessageColumnIO#endMessage we always call columns#endRecord() which will signal to the ColumnWriteStore that a record was written. At that point, the write store increments the row count for the current page by 1, and then check if the page needs to be flushed due to hitting the page row count limit.

      The problem is that with the above writing scheme, MessageColumnIO can cause empty pages to be written to Parquet files, and empty pages are not readable by Parquet readers. Suppose the page row count limit is N, and the MessageColumnIO receives N nulls for a column. The MessageColumnIO will buffer the nulls in memory, and doesn't necessarily flush the nulls to the writer yet. On the Nth call to endMessage(), however, the column store will think there are N values in memory and that the page has hit the row count limit, despite the fact that no rows have actually been written at all. But the underlying page writer will write an empty page regardless.

      To illustrate the problem, one can try running this simple example inserted into Spark's ParquetIOSuite when Spark has been upgraded to use the master branch of Parquet. Attach a debugger to MessageColumnIO#endMessage() and trace the logic accordingly - the column writer will push a page with 0 values:

      test("PARQUET-1414 Problems") {
        // Manually adjust the maximum row count to reproduce the issue on small data
        sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
        withTempPath { location =>
          val path = new Path(location.getCanonicalPath + "/parquet-data")
          val schema = StructType(
            Array(StructField("timestamps1", ArrayType(TimestampType))))
          val rows = ListBuffer[Row]()
          for (j <- 0 until 10) {
            rows += Row(
              null.asInstanceOf[Array[java.sql.Timestamp]])
          }
          val srcDf = spark.createDataFrame(
            sparkContext.parallelize(rows, 3),
            schema,
            true)
          srcDf.write.parquet(path.toString)
          assert(spark.read.parquet(path.toString).collect.size > 0)
        }
      }

      Attachments

        Issue Links

          Activity

            People

              gszadovszky Gabor Szadovszky
              mcheah Matt Cheah
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: