Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
3.5.0
Description
First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly
I found the degradation in spark version 3.5.0:
When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows.
This behavior didn't exist in versions 3.3 and 3.4.
Example:
import pysparkfrom pyspark.sql import functions as F, Window df = spark.createDataFrame([ {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020}, {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022}, {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023}, {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021}, ]) # Create first window for row number window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year')) # Create additional window from the first window with unbounded frame unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) # Try to keep the first row by year, and also collect all scores into a list df2 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(unbound_spec) )
So far everything works, and if we display df2:
+----+------+-----+----+---+----------+ |name|row_id|score|year|rn |all_scores| +----+------+-----+----+---+----------+ |Dave|1 |3 |2023|1 |[3, 2, 1] | |Dave|1 |2 |2022|2 |[3, 2, 1] | |Dave|1 |1 |2020|3 |[3, 2, 1] | |Amy |2 |6 |2021|1 |[6] | +----+------+-----+----+---+----------+
However, once we filter to keep only the first row number:
df2.filter("rn=1").show(truncate=False) +----+------+-----+----+---+----------+ |name|row_id|score|year|rn |all_scores| +----+------+-----+----+---+----------+ |Dave|1 |3 |2023|1 |[3] | |Amy |2 |6 |2021|1 |[6] | +----+------+-----+----+---+----------+
As you can see just filtering changed the "all_scores" array for Dave.
(This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc)
Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine:
new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df3 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(new_window) ) df3.filter("rn=1").show(truncate=False)
+----+------+-----+----+---+----------+ |name|row_id|score|year|rn |all_scores| +----+------+-----+----+---+----------+ |Dave|1 |3 |2023|1 |[3, 2, 1] | |Amy |2 |6 |2021|1 |[6] | +----+------+-----+----+---+----------+
In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering.
Here is the final plan for the faulty dataframe:
df2.filter("rn=1").explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Filter (rn#9 = 1) +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]
I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan.
This clause doesn't appear in the dataframes that work fine, and before filtering the rn.
So I assume that since the optimizer detects that we want to only keep the first row of the ranking function, it first removes all other rows from the following computations, which causes the incorrect result or loss of data.
I think the bug comes from this change (and the attached PRs):
https://issues.apache.org/jira/browse/SPARK-44340
It was added in spark 3.5.0, and in addition, I noticed that it was included in databricks release 13.3, which included spark 3.4.0, but also this fix in their release note. And evidently, this bug happens on databricks13 spark3.4, but not on my local spark 3.4
tagging user beliefer as I believe you would know most about this.
Attachments
Issue Links
- links to