Details
Description
I use an example to illustrate the problem. The reason for the problem and the problem-solving approach are stated below.
Assume follow Protocol buffer schema:
message Model { string name = 1; repeated string keywords = 2; }
Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.0.2 or older, we could run the following query using spark-shell:
val data = spark.read.parquet("/path/to/parquet") data.registerTempTable("models") spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
But after updating Spark, we get the following error:
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
At first it seems the problem is the parquet library. But in fact, our problem is because of this line that has been around since 2014 (based on Git history):
Parquet Schema Compatibility Validator
After some check, I notice that the cause of the problem is due to a change in the data filtering conditions:
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true); // Spark 3.0.2 and older == Physical Plan == ... +- FileScan parquet [link#0,keywords#1] DataFilters: [array_contains(keywords#1, Google)] PushedFilters: [] ... // Spark 3.1.0 and newer == Physical Plan == ... +- FileScan parquet [link#0,keywords#1] DataFilters: [isnotnull(keywords#1), array_contains(keywords#1, Google)] PushedFilters: [IsNotNull(keywords)] ...
It's good that the filtering section has become smarter. Unfortunately, due to unfamiliarity with code base, I could not find the exact location of the change and related pull request. In general, this change is suitable for non-repeated parquet fields, but in the repeated field, it causes an error from the parquet library. (Like the example given)
The only temporary solution in my opinion to solve the problem is to disable the following setting, which in general greatly reduces performance:
SET spark.sql.parquet.filterPushdown=false
I created a patch for this bug and a pull request will be sent soon.