Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39347

Generate wrong time window when (timestamp-startTime) % slideDuration < 0

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.4.0, 3.5.0
    • Structured Streaming
    • None

    Description

      Since the generation strategy of the sliding window in PR 35362(https://github.com/apache/spark/pull/35362) is changed to the current one, and that leads to a new problem.

      A window generation error occurs when the time required to process the recorded data is negative and the modulo value between the time and window length is less than 0. In the current test cases, this bug does not thorw up.

      [ test("negative timestamps")](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299)

       

      val df1 = Seq(
        ("1970-01-01 00:00:02", 1),
        ("1970-01-01 00:00:12", 2)).toDF("time", "value")
      val df2 = Seq(
        (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
        (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
      
      Seq(df1, df2).foreach { df =>
        checkAnswer(
          df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
            .orderBy($"window.start".asc)
            .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
          Seq(
            Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
            Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
        )
      } 

       

       

      The timestamp of the above test data is not negative, and the value modulo the window length is not negative, so it can be passes the test case.

      An exception occurs when the timestamp becomes something like this.

       

      val df3 = Seq(
        ("1969-12-31 00:00:02", 1),
        ("1969-12-31 00:00:12", 2)).toDF("time", "value")
      val df4 = Seq(
        (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
        (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
      
      Seq(df3, df4).foreach { df =>
        checkAnswer(
          df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
            .orderBy($"window.start".asc)
            .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
          Seq(
            Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
            Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
        )
      } 

       

      run and get unexpected result:

       

      == Results ==
      !== Correct Answer - 2 ==                      == Spark Answer - 2 ==
      !struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
      ![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
      ![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] 

       

      benchmark result

       

      oldlogic18364(https://github.com/apache/spark/pull/18364)  VS 【fix version】

      Running benchmark: tumbling windows
      Running case: old logic
      Stopped after 407 iterations, 10012 ms
      Running case: new logic
      Stopped after 615 iterations, 10007 ms
      Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
      Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
      tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------------------------------
      old logic                                            17             25           9        580.1           1.7       1.0X
      new logic                                            15             16           2        680.8           1.5       1.2X
      Running benchmark: sliding windows
      Running case: old logic
      Stopped after 10 iterations, 10296 ms
      Running case: new logic
      Stopped after 15 iterations, 10391 ms
      Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
      Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
      sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------------------------------
      old logic                                          1000           1030          19         10.0         100.0       1.0X
      new logic                                           668            693          21         15.0          66.8       1.5X
      

       

       

      Fixed version than PR 38069(https://github.com/apache/spark/pull/35362) lost a bit of the performance.

      Attachments

        Activity

          People

            WweiL Wei Liu
            nyingping nyingping
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: