Description
VectorizedParquetRecordReader.initializeInternal loops through each column, and for each column it calls
requestedSchema.getColumns().get(i)
However, MessageType.getColumns will build the entire column list from getPaths(0).
public List<ColumnDescriptor> getColumns() { List<String[]> paths = this.getPaths(0); List<ColumnDescriptor> columns = new ArrayList<ColumnDescriptor>(paths.size()); for (String[] path : paths) { // TODO: optimize this PrimitiveType primitiveType = getType(path).asPrimitiveType(); columns.add(new ColumnDescriptor( path, primitiveType, getMaxRepetitionLevel(path), getMaxDefinitionLevel(path))); } return columns; }
This means that for each parquet file, this routine indirectly iterates colCount*colCount times.
This is actually not particularly noticeable unless you have:
- many parquet files
- many columns
To verify that this is an issue, I created a 1 million record parquet table with 6000 columns of type double and 67 files (so initializeInternal is called 67 times). I ran the following query:
sql("select * from 6000_1m_double where id1 = 1").collect
I used Spark from the master branch. I had 8 executor threads. The filter returns only a few thousand records. The query ran (on average) for 6.4 minutes.
Then I cached the column list at the top of initializeInternal as follows:
List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
Then I changed initializeInternal to use columnCache rather than requestedSchema.getColumns().
With the column cache variable, the same query runs in 5 minutes. So with my simple query, you save %22 of time by not rebuilding the column list for each column.
You get additional savings with a paths cache variable, now saving 34% in total on the above query.
Attachments
Issue Links
- is related to
-
SPARK-24316 Spark sql queries stall for column width more than 6k for parquet based table
- Open
- relates to
-
SPARK-25643 Performance issues querying wide rows
- Open
- links to