Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45637

Time window aggregation in separate streams followed by stream-stream join not returning results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Duplicate
    • 3.5.0
    • None
    • Structured Streaming
    • I'm using Spark 3.5.0 on Databricks Runtime 14.1

    Description

      According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join:

      https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995

      However, I failed to reproduce this example and the query I built doesn't return any results:

      from pyspark.sql.functions import rand
      from pyspark.sql.functions import expr, window, window_time
      
      spark.conf.set("spark.sql.shuffle.partitions", "1")
      
      impressions = (
          spark    
          .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()    
          .selectExpr("value AS adId", "timestamp AS impressionTime")
      )
      
      impressionsWithWatermark = impressions \
          .selectExpr("adId AS impressionAdId", "impressionTime") \
          .withWatermark("impressionTime", "10 seconds")
      
      clicks = (  
          spark  
          .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()  
          .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 impressions result in a click  
          .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so that a click with same id as impression is generated later (i.e. delayed data).
          .where("adId > 0")  
      ) 
      
      clicksWithWatermark = clicks \
          .selectExpr("adId AS clickAdId", "clickTime") \
          .withWatermark("clickTime", "10 seconds")
      
      clicksWindow = clicksWithWatermark.groupBy(      
          window(clicksWithWatermark.clickTime, "1 minute")
      ).count()
      
      impressionsWindow = impressionsWithWatermark.groupBy(
          window(impressionsWithWatermark.impressionTime, "1 minute")
      ).count()
      
      clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")
      
      clicksAndImpressions.writeStream \
          .format("memory") \
          .queryName("clicksAndImpressions") \
          .outputMode("append") \
          .start() 

       

      My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results:

      clicksWindow2 = clicksWithWatermark.groupBy( 
          window(clicksWithWatermark.clickTime, "1 minute")
      ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")
      
      impressionsWindow2 = impressionsWithWatermark.groupBy(
          window(impressionsWithWatermark.impressionTime, "1 minute")
      ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")
      
      clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner")
      
      clicksAndImpressions2.writeStream \
          .format("memory") \
          .queryName("clicksAndImpressions2") \
          .outputMode("append") \
          .start()  

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              azera Andrzej Zera
              Jungtaek Lim Jungtaek Lim
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: