Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34194

Queries that only touch partition columns shouldn't scan through all files

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Won't Fix
    • 3.2.0
    • None
    • SQL
    • None

    Description

      When querying only the partition columns of a partitioned table, it seems that Spark nonetheless scans through all files in the table, even though it doesn't need to.

      Here's an example:

      >>> data = spark.read.option('mergeSchema', 'false').parquet('s3a://some/dataset')
      [Stage 0:==================>                                  (407 + 12) / 1158]
      

      Note the 1158 tasks. This matches the number of partitions in the table, which is partitioned on a single field named file_date:

      $ aws s3 ls s3://some/dataset | head -n 3
                                 PRE file_date=2017-05-01/
                                 PRE file_date=2017-05-02/
                                 PRE file_date=2017-05-03/
      
      $ aws s3 ls s3://some/dataset | wc -l
          1158
      

      The table itself has over 138K files, though:

      $ aws s3 ls --recursive --human --summarize s3://some/dataset
      ...
      Total Objects: 138708
         Total Size: 3.7 TiB
      

      Now let's try to query just the file_date field and see what Spark does.

      >>> data.select('file_date').orderBy('file_date', ascending=False).limit(1).explain()
      == Physical Plan ==
      TakeOrderedAndProject(limit=1, orderBy=[file_date#11 DESC NULLS LAST], output=[file_date#11])
      +- *(1) ColumnarToRow
         +- FileScan parquet [file_date#11] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3a://some/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
      
      >>> data.select('file_date').orderBy('file_date', ascending=False).limit(1).show()
      [Stage 2:>                                                   (179 + 12) / 41011]
      

      Notice that Spark has spun up 41,011 tasks. Maybe more will be needed as the job progresses? I'm not sure.

      What I do know is that this operation takes a long time (~20 min) running from my laptop, whereas to list the top-level file_date partitions via the AWS CLI take a second or two.

      Spark appears to be going through all the files in the table, when it just needs to list the partitions captured in the S3 "directory" structure. The query is only touching file_date, after all.

      The current workaround for this performance problem / optimizer wastefulness, is to query the catalog directly. It works, but is a lot of extra work compared to the elegant query against file_date that users actually intend.

      Spark should somehow know when it is only querying partition fields and skip iterating through all the individual files in a table.

      Tested on Spark 3.0.1.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nchammas Nicholas Chammas
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: