Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-48992

applyInPandas does not respect streaming watermark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.5.0
    • None
    • Spark Core
    • None
    • Azure Databricks runtime 14.3 LTS

    Description

      When I use GroupedData.applyInPandas to implement aggregation in a streaming query, it fails to respect a watermark specified using DataFrame.withWatermark.

      This query reproduces the behaviour I'm seeing:
       

      from pyspark.sql.functions import window
      from typing import Tuple
      import pandas as pd
      
      df_source_stream = (
          spark.readStream
          .format("rate")
          .option("rowsPerSecond", 3)
          .load()
          .withColumn("bucket", window("timestamp", "10 seconds").end)
      )
      
      def my_function(
          key: Tuple[str], df: pd.DataFrame
      ) -> pd.DataFrame:
          return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
      
      df = (
          df_source_stream
          .withWatermark("bucket", "10 seconds")
          .groupBy("bucket")
          .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
      )
      display(df)
      

      I expect the output of the query to contain one row per bucket value, but a new row is emitted for each incoming microbatch.

      In contrast, an out of the box aggregate behaves as expected. For example:

      df = (
          df_source_stream
          .withWatermark("bucket", "10 seconds")
          .groupBy("bucket")
          .count()  # standard aggregate in place of applyInPandas
      )
      

      The output of this query contains one row per bucket value.
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            richardswinbank Richard Swinbank
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: