Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18053

Savepoints do not preserve watermarks

    XMLWordPrintableJSON

Details

    Description

      Flink produces invalid result when streaming SQL aggregation is stopped and resumed from a savepoint.

       

      Steps to reproduce:

      1) Create an assembly from the attached file.

      This job will be reading CSV files as a stream. Files contain fake stock tickers which will be aggregated with following tumbling window query:

      SELECT
        TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
        symbol as symbol,
        min(price) as `min`,
        max(price) as `max`
      FROM Tickers
      GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
      

      Stream uses punctuated watermarks with max lateness of 1 day

      2) Create two CSV files with fake stock tickers:

      1.csv:

      2000-01-01 01:00:00.0,A,10
      2000-01-01 01:00:00.0,B,20
      2000-01-01 02:00:00.0,A,10
      2000-01-01 02:00:00.0,B,21
      2000-01-02 01:00:00.0,A,12
      2000-01-02 01:00:00.0,B,22
      2000-01-02 02:00:00.0,A,13
      2000-01-02 02:00:00.0,B,23
      2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
      2000-01-03 01:00:00.0,A,14
      2000-01-03 01:00:00.0,B,24
      2000-01-03 02:00:00.0,A,15
      2000-01-03 02:00:00.0,B,25
      

      2.csv:

      2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
      2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they only push watermark up
      2000-01-04 01:00:00.0,B,26
      2000-01-04 02:00:00.0,A,17
      2000-01-04 02:00:00.0,B,27
      2000-01-05 01:00:00.0,A,18
      2000-01-05 01:00:00.0,B,28
      

      3) Run the job on the folder containing both files. Observed result is as expected:

      2000-01-01,A,10,11
      2000-01-01,B,20,21
      2000-01-02,A,12,13
      2000-01-02,B,22,23
      2000-01-03,A,14,15
      2000-01-03,B,24,25
      

      4) Now run the job with only 1.csv in the directory. Produces still correct:

      2000-01-01,A,10,11
      2000-01-01,B,20,21
      

      5) Cancel job with savepoint, move 2.csv into the directory. Restart job from savepoint. Produces incorrect result:

      2000-01-01,A,12,12
      2000-01-02,A,12,13
      2000-01-02,B,22,23
      2000-01-03,A,14,15
      2000-01-03,B,24,25
      

       

      Expectation:
      We were not supposed to see 2000-01-01,A,12,12 record, as it should not have passed the watermark check. This tells me that Flink did not save the watermark in the savepoint.

      Attachments

        1. 2.csv
          0.2 kB
          Sergii Mikhtoniuk
        2. 1.csv
          0.3 kB
          Sergii Mikhtoniuk
        3. MyApp.scala
          2 kB
          Sergii Mikhtoniuk

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sergiimk Sergii Mikhtoniuk
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: