Details
Description
Sample query:
spark.read.parquet("tbl").createOrReplaceTempView("tbl") spark.read.parquet("lookup").createOrReplaceTempView("lookup") sql(""" select tbl.col1, fk1, fk2 from tbl, lookup l1, lookup l2 where fk1 = l1.key and fk2 = l2.key """).explain
Test files can be created as so:
import scala.util.Random val rand = Random val tbl = spark.range(1, 10000).map { x => (rand.nextLong.abs % 20, rand.nextLong.abs % 20, x) }.toDF("fk1", "fk2", "col1") tbl.write.mode("overwrite").parquet("tbl") val lookup = spark.range(0, 20).map { x => (x + 1, x * 10000, (x + 1) * 10000) }.toDF("key", "col1", "col2") lookup.write.mode("overwrite").parquet("lookup")
Output with V1 Parquet reader:
== Physical Plan == *(3) Project [col1#2L, fk1#0L, fk2#1L] +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false :- *(3) Project [fk1#0L, fk2#1L, col1#2L] : +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false : :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) : : +- *(3) ColumnarToRow : : +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] : +- *(1) Filter isnotnull(key#6L) : +- *(1) ColumnarToRow : +- FileScan parquet [key#6L] Batched: true, DataFilters: [isnotnull(key#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint> +- ReusedExchange [key#12L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
With V1 Parquet reader, the exchange for lookup is reused (see last line).
Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""):
== Physical Plan == *(3) Project [col1#2L, fk1#0L, fk2#1L] +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false :- *(3) Project [fk1#0L, fk2#1L, col1#2L] : +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false : :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) : : +- *(3) ColumnarToRow : : +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>, PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] : +- *(1) Filter isnotnull(key#6L) : +- *(1) ColumnarToRow : +- BatchScan[key#6L] ParquetScan DataFilters: [isnotnull(key#6L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#83] +- *(2) Filter isnotnull(key#12L) +- *(2) ColumnarToRow +- BatchScan[key#12L] ParquetScan DataFilters: [isnotnull(key#12L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 lines).
You can see the same issue with the Orc reader (and I assume any other datasource that extends Filescan).
The issue appears to be this check in FileScan#equals:
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
partitionFilters and dataFilters are not normalized, so their exprIds don't match. Thus FileScan objects don't match, even if they are the same.
As a side note, FileScan#equals has a dangling boolean expression:
fileIndex == f.fileIndex && readSchema == f.readSchema
The result of that expression is not actually used anywhere. We might want to include it in the final decision, even though that's not the issue here.