Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26893

Allow partition pruning with subquery filters on file source

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      File source doesn't use subquery filters for partition pruning. But it could use those filters with a minor improvement.

      This query is an example:

      CREATE TABLE a (id INT, p INT) USING PARQUET PARTITIONED BY (p)
      CREATE TABLE b (id INT) USING PARQUET
      SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b)

      Where the executed plan of the SELECT currently is:

      *(1) Filter (p#252L <= Subquery subquery250)
      : +- Subquery subquery250
      : +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
      : +- Exchange SinglePartition
      : +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
      : +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
      +- *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionCount: 2, PartitionFilters: [isnotnull(p#252L)], PushedFilters: [], ReadSchema: struct<id:bigint>
      

      But it could be: 

      *(1) FileScan parquet default.a[id#251L,p#252L] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/a/p=0, file:..., PartitionFilters: [isnotnull(p#252L), (p#252L <= Subquery subquery250)], PushedFilters: [], ReadSchema: struct<id:bigint>
      +- Subquery subquery250
      +- *(2) HashAggregate(keys=[], functions=[min(id#253L)], output=[min(id)#255L])
      +- Exchange SinglePartition
      +- *(1) HashAggregate(keys=[], functions=[partial_min(id#253L)], output=[min#259L])
      +- *(1) FileScan parquet default.b[id#253L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/ptoth/git2/spark2/common/kvstore/spark-warehouse/b], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
      

      and so partition pruning could work in FileSourceScanExec.
      Please note that PartitionCount metadata can't be computed before execution so in this case it is no longer part of the plan.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                petertoth Peter Toth
                Reporter:
                petertoth Peter Toth
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: