Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45543

InferWindowGroupLimit causes bug if the other window functions haven't the same window frame as the rank-like functions

    XMLWordPrintableJSON

Details

    Description

      First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly

      I found the degradation in spark version 3.5.0:

      When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows.

      This behavior didn't exist in versions 3.3 and 3.4.

      Example:

       

      import pysparkfrom pyspark.sql import functions as F, Window  
      
      df = spark.createDataFrame([
          {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
          {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
          {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
          {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
      ])
      
      # Create first window for row number
      window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))
      
      # Create additional window from the first window with unbounded frame
      unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      
      # Try to keep the first row by year, and also collect all scores into a list
      df2 = df.withColumn(
          'rn', 
          F.row_number().over(window_spec)
      ).withColumn(
          'all_scores', 
          F.collect_list('score').over(unbound_spec)
      )

      So far everything works, and if we display df2:

       

      +----+------+-----+----+---+----------+
      |name|row_id|score|year|rn |all_scores|
      +----+------+-----+----+---+----------+
      |Dave|1     |3    |2023|1  |[3, 2, 1] |
      |Dave|1     |2    |2022|2  |[3, 2, 1] |
      |Dave|1     |1    |2020|3  |[3, 2, 1] |
      |Amy |2     |6    |2021|1  |[6]       |
      +----+------+-----+----+---+----------+

       

      However, once we filter to keep only the first row number:

       

      df2.filter("rn=1").show(truncate=False)
      +----+------+-----+----+---+----------+
      |name|row_id|score|year|rn |all_scores|
      +----+------+-----+----+---+----------+
      |Dave|1     |3    |2023|1  |[3]       |
      |Amy |2     |6    |2021|1  |[6]       |
      +----+------+-----+----+---+----------+

      As you can see just filtering changed the "all_scores" array for Dave.

      (This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc)

       

      Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine:

      new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      
      df3 = df.withColumn(
          'rn',
          F.row_number().over(window_spec)
      ).withColumn(
          'all_scores',
          F.collect_list('score').over(new_window)
      )
      df3.filter("rn=1").show(truncate=False)
      +----+------+-----+----+---+----------+
      |name|row_id|score|year|rn |all_scores|
      +----+------+-----+----+---+----------+
      |Dave|1     |3    |2023|1  |[3, 2, 1] |
      |Amy |2     |6    |2021|1  |[6]       |
      +----+------+-----+----+---+----------+
      

      In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering.

      Here is the final plan for the faulty dataframe:

      df2.filter("rn=1").explain()
      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- Filter (rn#9 = 1)
         +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
            +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final
               +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
                  +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425]
                     +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial
                        +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
                           +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]

      I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan.

      This clause doesn't appear in the dataframes that work fine, and before filtering the rn.

      So I assume that since the optimizer detects that we want to only keep the first row of the ranking function, it first removes all other rows from the following computations, which causes the incorrect result or loss of data.

      I think the bug comes from this change (and the attached PRs):

      https://issues.apache.org/jira/browse/SPARK-44340

      It was added in spark 3.5.0, and in addition, I noticed that it was included in databricks release 13.3, which included spark 3.4.0, but also this fix in their release note. And evidently, this bug happens on databricks13 spark3.4, but not on my local spark 3.4

      tagging user beliefer as I believe you would know most about this.

      Attachments

        Issue Links

          Activity

            People

              beliefer Jiaan Geng
              ronserruya Ron Serruya
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: