Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33482

V2 Datasources that extend FileScan preclude exchange reuse

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
    • 3.0.3, 3.1.2, 3.2.0
    • SQL
    • None

    Description

      Sample query:

      spark.read.parquet("tbl").createOrReplaceTempView("tbl")
      spark.read.parquet("lookup").createOrReplaceTempView("lookup")
      
      sql("""
         select tbl.col1, fk1, fk2
         from tbl, lookup l1, lookup l2
         where fk1 = l1.key
         and fk2 = l2.key
      """).explain
      

      Test files can be created as so:

      import scala.util.Random
      
      val rand = Random
      
      val tbl = spark.range(1, 10000).map { x =>
        (rand.nextLong.abs % 20,
         rand.nextLong.abs % 20,
         x)
      }.toDF("fk1", "fk2", "col1")
      
      tbl.write.mode("overwrite").parquet("tbl")
      
      val lookup = spark.range(0, 20).map { x =>
        (x + 1, x * 10000, (x + 1) * 10000)
      }.toDF("key", "col1", "col2")
      lookup.write.mode("overwrite").parquet("lookup")
      

      Output with V1 Parquet reader:

       == Physical Plan ==
      *(3) Project [col1#2L, fk1#0L, fk2#1L]
      +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
         :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
         :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
         :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
         :     :  +- *(3) ColumnarToRow
         :     :     +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>
         :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         :        +- *(1) Filter isnotnull(key#6L)
         :           +- *(1) ColumnarToRow
         :              +- FileScan parquet [key#6L] Batched: true, DataFilters: [isnotnull(key#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
         +- ReusedExchange [key#12L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
      

      With V1 Parquet reader, the exchange for lookup is reused (see last line).

      Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""):

       == Physical Plan ==
      *(3) Project [col1#2L, fk1#0L, fk2#1L]
      +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
         :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
         :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
         :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
         :     :  +- *(3) ColumnarToRow
         :     :     +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>, PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)]
         :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         :        +- *(1) Filter isnotnull(key#6L)
         :           +- *(1) ColumnarToRow
         :              +- BatchScan[key#6L] ParquetScan DataFilters: [isnotnull(key#6L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#83]
            +- *(2) Filter isnotnull(key#12L)
               +- *(2) ColumnarToRow
                  +- BatchScan[key#12L] ParquetScan DataFilters: [isnotnull(key#12L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
      

      With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 lines).

      You can see the same issue with the Orc reader (and I assume any other datasource that extends Filescan).

      The issue appears to be this check in FileScan#equals:

      ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
      ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
      

      partitionFilters and dataFilters are not normalized, so their exprIds don't match. Thus FileScan objects don't match, even if they are the same.

      As a side note, FileScan#equals has a dangling boolean expression:

      fileIndex == f.fileIndex && readSchema == f.readSchema
      

      The result of that expression is not actually used anywhere. We might want to include it in the final decision, even though that's not the issue here.

      Attachments

        Activity

          People

            petertoth Peter Toth
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: