Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Duplicate
-
3.5.0
-
None
-
I'm using Spark 3.5.0 on Databricks Runtime 14.1
Description
According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join:
However, I failed to reproduce this example and the query I built doesn't return any results:
from pyspark.sql.functions import rand from pyspark.sql.functions import expr, window, window_time spark.conf.set("spark.sql.shuffle.partitions", "1") impressions = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .selectExpr("value AS adId", "timestamp AS impressionTime") ) impressionsWithWatermark = impressions \ .selectExpr("adId AS impressionAdId", "impressionTime") \ .withWatermark("impressionTime", "10 seconds") clicks = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .where((rand() * 100).cast("integer") < 10) # 10 out of every 100 impressions result in a click .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime") # -10 so that a click with same id as impression is generated later (i.e. delayed data). .where("adId > 0") ) clicksWithWatermark = clicks \ .selectExpr("adId AS clickAdId", "clickTime") \ .withWatermark("clickTime", "10 seconds") clicksWindow = clicksWithWatermark.groupBy( window(clicksWithWatermark.clickTime, "1 minute") ).count() impressionsWindow = impressionsWithWatermark.groupBy( window(impressionsWithWatermark.impressionTime, "1 minute") ).count() clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") clicksAndImpressions.writeStream \ .format("memory") \ .queryName("clicksAndImpressions") \ .outputMode("append") \ .start()
My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results:
clicksWindow2 = clicksWithWatermark.groupBy( window(clicksWithWatermark.clickTime, "1 minute") ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window") impressionsWindow2 = impressionsWithWatermark.groupBy( window(impressionsWithWatermark.impressionTime, "1 minute") ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window") clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner") clicksAndImpressions2.writeStream \ .format("memory") \ .queryName("clicksAndImpressions2") \ .outputMode("append") \ .start()
Attachments
Issue Links
- is superceded by
-
SPARK-49829 Issues with optimization on adding input to state store in stream-stream join
- Resolved