Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33482

V2 Datasources that extend FileScan preclude exchange reuse

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
    • Fix Version/s: 3.0.3, 3.1.2, 3.2.0
    • Component/s: SQL
    • Labels:
      None

      Description

      Sample query:

      spark.read.parquet("tbl").createOrReplaceTempView("tbl")
      spark.read.parquet("lookup").createOrReplaceTempView("lookup")
      
      sql("""
         select tbl.col1, fk1, fk2
         from tbl, lookup l1, lookup l2
         where fk1 = l1.key
         and fk2 = l2.key
      """).explain
      

      Test files can be created as so:

      import scala.util.Random
      
      val rand = Random
      
      val tbl = spark.range(1, 10000).map { x =>
        (rand.nextLong.abs % 20,
         rand.nextLong.abs % 20,
         x)
      }.toDF("fk1", "fk2", "col1")
      
      tbl.write.mode("overwrite").parquet("tbl")
      
      val lookup = spark.range(0, 20).map { x =>
        (x + 1, x * 10000, (x + 1) * 10000)
      }.toDF("key", "col1", "col2")
      lookup.write.mode("overwrite").parquet("lookup")
      

      Output with V1 Parquet reader:

       == Physical Plan ==
      *(3) Project [col1#2L, fk1#0L, fk2#1L]
      +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
         :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
         :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
         :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
         :     :  +- *(3) ColumnarToRow
         :     :     +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>
         :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         :        +- *(1) Filter isnotnull(key#6L)
         :           +- *(1) ColumnarToRow
         :              +- FileScan parquet [key#6L] Batched: true, DataFilters: [isnotnull(key#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
         +- ReusedExchange [key#12L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
      

      With V1 Parquet reader, the exchange for lookup is reused (see last line).

      Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""):

       == Physical Plan ==
      *(3) Project [col1#2L, fk1#0L, fk2#1L]
      +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
         :- *(3) Project [fk1#0L, fk2#1L, col1#2L]
         :  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
         :     :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
         :     :  +- *(3) ColumnarToRow
         :     :     +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>, PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)]
         :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         :        +- *(1) Filter isnotnull(key#6L)
         :           +- *(1) ColumnarToRow
         :              +- BatchScan[key#6L] ParquetScan DataFilters: [isnotnull(key#6L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#83]
            +- *(2) Filter isnotnull(key#12L)
               +- *(2) ColumnarToRow
                  +- BatchScan[key#12L] ParquetScan DataFilters: [isnotnull(key#12L)], Format: parquet, Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)]
      

      With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 lines).

      You can see the same issue with the Orc reader (and I assume any other datasource that extends Filescan).

      The issue appears to be this check in FileScan#equals:

      ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
      ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
      

      partitionFilters and dataFilters are not normalized, so their exprIds don't match. Thus FileScan objects don't match, even if they are the same.

      As a side note, FileScan#equals has a dangling boolean expression:

      fileIndex == f.fileIndex && readSchema == f.readSchema
      

      The result of that expression is not actually used anywhere. We might want to include it in the final decision, even though that's not the issue here.

        Attachments

          Activity

            People

            • Assignee:
              petertoth Peter Toth
              Reporter:
              bersprockets Bruce Robbins

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment