Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35367

Window group-key and pandas inconsistent

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.1
    • None
    • PySpark
    • None

    Description

      Not completely sure whether this is a bug or a configuration/usage issue:

      We are seeing inconsistent timezone-treatment when using a windowed group by aggregation in combiation with a pandas udf.

      A minimal example:

       

      // code placeholder
      
      def a_udf(group_key, pdf: pd.DataFrame) -> pd.DataFrame:
          w_start = group_key[0]["start"]
          w_end = group_key[0]["end"]
      
          print(f"Pandas   : {pdf['window_start'].iloc[0]} to {pdf['window_end'].iloc[0]}")
          print(f"Group key: {w_start} to {w_end}")
          print(f"Data     : {pdf['time'].min()} to {pdf['time'].max()}")
      
          assert (pdf["time"] >= w_start).all()
          assert (pdf["time"] < w_end).all()        
      
          # some result
      
          return pd.DataFrame.from_records([{"result": 1}])
      
      
      df = spark.createDataFrame([(datetime.datetime(2020, 1, 1, 12, 30, 0),)], schema=["time"])
      
      w = window("time", "60 minutes")
      df.withColumn("window_start", w.start)\
        .withColumn("window_end", w.end)\
        .groupby(w).applyInPandas(a_udf, schema="result int")\
        .show()
      

       

      Produces:

      Pandas   : 2020-01-01 12:00:00 to 2020-01-01 13:00:00
      Group key: 2020-01-01 11:00:00 to 2020-01-01 12:00:00
      Data     : 2020-01-01 12:30:00 to 2020-01-01 12:30:00
      
      

      And the assertions fail. It seems the group-key goes through some timezone- (and dst?) conversion that ends up being one hour off.
      This is without any specific timezone configuration and with CEST as the local timezone.

      Is this working as expected?

      Setting

      "spark.sql.session.timeZone": "UTC"
      "spark.driver.extraJavaOptions": "-Duser.timezone=UTC"
      "spark.executor.extraJavaOptions": "-Duser.timezone=UTC"
      

      seems to be workaround.
      Using "Europe/Berlin" for all three timezone settings however reproduces the inconsistent behaviour.

      I would assume though, that running this in a non-UTC timezone should generally be possible?

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sebastian_eckweiler Sebastian Eckweiler
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: