Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47177

Cached SQL plan do not display final AQE plan in explain string

    XMLWordPrintableJSON

Details

    Description

      AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in https://github.com/apache/spark/pull/40812 it tried to fix the concurrency issue with cached plan. 

      In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.

       

      I don't have a clear idea how yet

      • maybe we just a coarse granularity lock in explain?
      • make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version.

       

      A simple repro:

      d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"})
      cached_d2 = d1.cache()
      df = cached_d2.filter("key > 10")
      df.collect() 
      >>> df.explain()
      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=true
      +- == Final Plan ==
         *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
         +- TableCacheQueryStage 0
            +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)]
                  +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24]
                                 +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10)
      +- == Initial Plan ==
         Filter (isnotnull(key#4L) AND (key#4L > 10))
         +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)]
               +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- AdaptiveSparkPlan isFinalPlan=false
                        +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                           +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24]
                              +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)])
                                 +- Project [(id#2L % 100) AS key#4L]
                                    +- Range (0, 1000, step=1, splits=10)

      Attachments

        Issue Links

          Activity

            People

              ulysses XiDuo You
              liuzq12 Ziqi Liu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: