Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29266

Optimize Dataset.isEmpty for base relations / unfiltered datasets

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.1.0
    • None
    • SQL
    • None

    Description

      SPARK-23627 added a Dataset.isEmpty method. This is currently implemented as

      def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
          plan.executeCollect().head.getLong(0) == 0
        }
      

      which has a global limit of 1 embedded in the middle of the query plan.

      As a result, this will end up computing all partitions of the Dataset but each task can stop early once it's computed a single record (due to how global limits are planned / executed in this query).

      We could instead implement this as ds.limit(1).collect().isEmpty but that will go through the "CollectLimit" execution strategy which first computes 1 partition, then 2, then 4, and so on. That will be faster in some cases but slower in others: if the dataset is indeed empty then that method will be slower than one which checks all partitions in parallel, but if it's non-empty (and most tasks' output is non-empty) then it can be much faster.

      There's not an obviously-best implementation here. However, I think there's high value (and low downside) to optimizing for the special case where the Dataset is an unfiltered, untransformed input dataset (e.g. the output of spark.read.parquet):

      I found a production job which calls isEmpty on the output of spark.read.parquet() and the isEmpty call took several minutes to complete because it needed to launch hundreds of thousands of tasks to compute a single record of each partition (this is an enormous dataset, hence the long runtime for this).

      I could instruct the job author use a different, more efficient method of checking for non-emptiness, but this feels like the type of optimization that Spark should handle itself (reducing the burden placed on users to understand internal details of Spark's execution model).

      Maybe we can special-case isEmpty for the case where plan consists of only a file source scan (or a file source scan followed by a projection, but without any filters, etc.). In those cases, we can use either the .limit(1).take() implementation (under assumption that we don't have a ton of empty input files) or something fancier (metadata-only query, looking at Parquet footers, delegating to some datasource API, etc).

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            joshrosen Josh Rosen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: