Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2
Description
AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in https://github.com/apache/spark/pull/40812 it tried to fix the concurrency issue with cached plan.
In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.
I don't have a clear idea how yet
- maybe we just a coarse granularity lock in explain?
- make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version.
A simple repro:
d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.filter("key > 10") df.collect()
>>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Filter (isnotnull(key#4L) AND (key#4L > 10)) +- TableCacheQueryStage 0 +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == Filter (isnotnull(key#4L) AND (key#4L > 10)) +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10)
Attachments
Issue Links
- is caused by
-
SPARK-43157 TreeNode tags can become corrupted and hang driver when the dataset is cached
- Resolved
- links to