Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18053

Savepoints do not preserve watermarks

    XMLWordPrintableJSON

Details

    Description

      Flink produces invalid result when streaming SQL aggregation is stopped and resumed from a savepoint.

       

      Steps to reproduce:

      1) Create an assembly from the attached file.

      This job will be reading CSV files as a stream. Files contain fake stock tickers which will be aggregated with following tumbling window query:

      SELECT
        TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
        symbol as symbol,
        min(price) as `min`,
        max(price) as `max`
      FROM Tickers
      GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
      

      Stream uses punctuated watermarks with max lateness of 1 day

      2) Create two CSV files with fake stock tickers:

      1.csv:

      2000-01-01 01:00:00.0,A,10
      2000-01-01 01:00:00.0,B,20
      2000-01-01 02:00:00.0,A,10
      2000-01-01 02:00:00.0,B,21
      2000-01-02 01:00:00.0,A,12
      2000-01-02 01:00:00.0,B,22
      2000-01-02 02:00:00.0,A,13
      2000-01-02 02:00:00.0,B,23
      2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
      2000-01-03 01:00:00.0,A,14
      2000-01-03 01:00:00.0,B,24
      2000-01-03 02:00:00.0,A,15
      2000-01-03 02:00:00.0,B,25
      

      2.csv:

      2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
      2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they only push watermark up
      2000-01-04 01:00:00.0,B,26
      2000-01-04 02:00:00.0,A,17
      2000-01-04 02:00:00.0,B,27
      2000-01-05 01:00:00.0,A,18
      2000-01-05 01:00:00.0,B,28
      

      3) Run the job on the folder containing both files. Observed result is as expected:

      2000-01-01,A,10,11
      2000-01-01,B,20,21
      2000-01-02,A,12,13
      2000-01-02,B,22,23
      2000-01-03,A,14,15
      2000-01-03,B,24,25
      

      4) Now run the job with only 1.csv in the directory. Produces still correct:

      2000-01-01,A,10,11
      2000-01-01,B,20,21
      

      5) Cancel job with savepoint, move 2.csv into the directory. Restart job from savepoint. Produces incorrect result:

      2000-01-01,A,12,12
      2000-01-02,A,12,13
      2000-01-02,B,22,23
      2000-01-03,A,14,15
      2000-01-03,B,24,25
      

       

      Expectation:
      We were not supposed to see 2000-01-01,A,12,12 record, as it should not have passed the watermark check. This tells me that Flink did not save the watermark in the savepoint.

      Attachments

        1. MyApp.scala
          2 kB
          Sergii Mikhtoniuk
        2. 2.csv
          0.2 kB
          Sergii Mikhtoniuk
        3. 1.csv
          0.3 kB
          Sergii Mikhtoniuk

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sergiimk Sergii Mikhtoniuk
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: