Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45797

Discrepancies in PySpark DataFrame Results When Using Window Functions and Filters

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.0
    • 3.5.0
    • PySpark
    • None
    • Python 3.10

      Pyspark 3.5.0

      Ubuntu 22.04.3 LTS

    • Important

    Description

      When doing certain types of transformations on a dataframe which involve window functions with filters I am getting the wrong results. Here is a minimal example of the results I get with my code:

       

      from pyspark.sql import SparkSession
      import pyspark.sql.functions as f
      from pyspark.sql.window import Window as w
      from datetime import datetime, date
      
      spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", True).getOrCreate()
      
      # Base dataframe
      df = spark.createDataFrame(
          [
              (1, date(2023, 10, 1), date(2023, 10, 2), "open"),
              (1, date(2023, 10, 2), date(2023, 10, 3), "close"),
              (2, date(2023, 10, 1), date(2023, 10, 2), "close"),
              (2, date(2023, 10, 2), date(2023, 10, 4), "close"),
              (3, date(2023, 10, 2), date(2023, 10, 4), "open"),
              (3, date(2023, 10, 3), date(2023, 10, 6), "open"),
          ],
          schema="id integer, date_start date, date_end date, status string"
      )
      
      # We define two partition functions
      partition = w.partitionBy("id").orderBy("date_start", "date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing)
      partition2 = w.partitionBy("id").orderBy("date_start", "date_end")
      
      # Define dataframe A
      A = df.withColumn(
          "date_end_of_last_close",
          f.max(f.when(f.col("status") == "close", f.col("date_end"))).over(partition)
      ).withColumn(
          "rank",
          f.row_number().over(partition2)
      )
      display(A)
      
      | id | date_start | date_end   | status | date_end_of_last_close | rank |
      |----|------------|------------|--------|------------------------|------|
      | 1  | 2023-10-01 | 2023-10-02 | open   | 2023-10-03             | 1    |
      | 1  | 2023-10-02 | 2023-10-03 | close  | 2023-10-03             | 2    |
      | 2  | 2023-10-01 | 2023-10-02 | close  | 2023-10-04             | 1    |
      | 2  | 2023-10-02 | 2023-10-04 | close  | 2023-10-04             | 2    |
      | 3  | 2023-10-02 | 2023-10-04 | open   | NULL                   | 1    |
      | 3  | 2023-10-03 | 2023-10-06 | open   | NULL                   | 2    |
      
      # When filtering by rank = 1, I get this weird result
      A_result = A.filter(f.col("rank") == 1).drop("rank")
      display(A_result)
      
      | id | date_start | date_end   | status | date_end_of_last_close |
      |----|------------|------------|--------|------------------------|
      | 1  | 2023-10-01 | 2023-10-02 | open   | NULL                   |
      | 2  | 2023-10-01 | 2023-10-02 | close  | 2023-10-02             |
      | 3  | 2023-10-02 | 2023-10-04 | open   | NULL                   | 

      I think spark engine might be managing wrongly the internal partitions. If creating the dataframe from scratch (without transformations), the filtering operation returns the right result. In pyspark 3.4.0 this error doesn't happen.

       

      For more details, please check out this same question in stackoverflow: stackoverflow question 

      I'll mark this issue as important because it affects some basic operations which are daily used

      Attachments

        Activity

          People

            Unassigned Unassigned
            dadiego91 Daniel Diego Horcajuelo
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: