Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25164

Parquet reader builds entire list of columns once for each column

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.0
    • 2.2.3, 2.3.2, 2.4.0
    • SQL
    • None

    Description

      VectorizedParquetRecordReader.initializeInternal loops through each column, and for each column it calls

      requestedSchema.getColumns().get(i)
      

      However, MessageType.getColumns will build the entire column list from getPaths(0).

        public List<ColumnDescriptor> getColumns() {
          List<String[]> paths = this.getPaths(0);
          List<ColumnDescriptor> columns = new ArrayList<ColumnDescriptor>(paths.size());
          for (String[] path : paths) {
            // TODO: optimize this                                                                                                                    
            PrimitiveType primitiveType = getType(path).asPrimitiveType();
            columns.add(new ColumnDescriptor(
                            path,
                            primitiveType,
                            getMaxRepetitionLevel(path),
                            getMaxDefinitionLevel(path)));
          }
          return columns;
        }
      

      This means that for each parquet file, this routine indirectly iterates colCount*colCount times.

      This is actually not particularly noticeable unless you have:

      • many parquet files
      • many columns

      To verify that this is an issue, I created a 1 million record parquet table with 6000 columns of type double and 67 files (so initializeInternal is called 67 times). I ran the following query:

      sql("select * from 6000_1m_double where id1 = 1").collect
      

      I used Spark from the master branch. I had 8 executor threads. The filter returns only a few thousand records. The query ran (on average) for 6.4 minutes.

      Then I cached the column list at the top of initializeInternal as follows:

      List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
      

      Then I changed initializeInternal to use columnCache rather than requestedSchema.getColumns().

      With the column cache variable, the same query runs in 5 minutes. So with my simple query, you save %22 of time by not rebuilding the column list for each column.

      You get additional savings with a paths cache variable, now saving 34% in total on the above query.

      Attachments

        Issue Links

          Activity

            People

              bersprockets Bruce Robbins
              bersprockets Bruce Robbins
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: