Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.5.0
-
None
-
None
-
Azure Databricks runtime 14.3 LTS
Description
When I use GroupedData.applyInPandas to implement aggregation in a streaming query, it fails to respect a watermark specified using DataFrame.withWatermark.
This query reproduces the behaviour I'm seeing:
from pyspark.sql.functions import window from typing import Tuple import pandas as pd df_source_stream = ( spark.readStream .format("rate") .option("rowsPerSecond", 3) .load() .withColumn("bucket", window("timestamp", "10 seconds").end) ) def my_function( key: Tuple[str], df: pd.DataFrame ) -> pd.DataFrame: return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]}) df = ( df_source_stream .withWatermark("bucket", "10 seconds") .groupBy("bucket") .applyInPandas(my_function, "bucket TIMESTAMP, count INT") ) display(df)
I expect the output of the query to contain one row per bucket value, but a new row is emitted for each incoming microbatch.
In contrast, an out of the box aggregate behaves as expected. For example:
df = ( df_source_stream .withWatermark("bucket", "10 seconds") .groupBy("bucket") .count() # standard aggregate in place of applyInPandas )
The output of this query contains one row per bucket value.