Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47177

Cached SQL plan do not display final AQE plan in explain string

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in https://github.com/apache/spark/pull/40812 it tried to fix the concurrency issue with cached plan. 

      In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.

       

      I don't have a clear idea how yet

      • maybe we just a coarse granularity lock in explain?
      • make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version.

       

      A simple repro:

      d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"})
      cached_d2 = d1.cache()
      df = cached_d2.filter("key > 10")
      df.collect() 
      >>> df.explain()
      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=true
      +- == Final Plan ==
         *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
         +- TableCacheQueryStage 0
            +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)]
                  +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24]
                                 +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10)
      +- == Initial Plan ==
         Filter (isnotnull(key#4L) AND (key#4L > 10))
         +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)]
               +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- AdaptiveSparkPlan isFinalPlan=false
                        +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                           +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24]
                              +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)])
                                 +- Project [(id#2L % 100) AS key#4L]
                                    +- Range (0, 1000, step=1, splits=10)

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ulysses XiDuo You
            liuzq12 Ziqi Liu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment