Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28357

Watermark issue when recovering Finished sources




      Copied mostly from email trail on the flink user mailing list:

      I done a lot of experimentation and I’m convinced there is a problem with Flink handling Finished sources and recovery. 

      The program consists of:

      • Two sources:
        • One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds.
          • Prints the console a message saying the watermark has been emitted.
          • Throws an exception every 5 or 10 iterations to force a recovery.
        • One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a message to the console and returns.
      • The “Short Live Source” feeds into a map() and then it joins with the “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by Flink.

      The problem here is that the “Join” receives no Long.MAX_VALUE watermark from the map() in some situations after a recovery. The dashboard goes from showing this:

      To the below after a recovery (with the currentInput1/2Watermark metrics showing input 2 having not received a watermark from the map, saying –Long.MAX_VALUE):

      The program is currently set to checkpoint every 5 seconds. By experimenting with 70 seconds, it seems that if only one checkpoint has been taken with the “Short Lived Source” in a FINISHED state since the last recovery then everything works fine and the restarted “Short Lived Source” emits its watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE watermark” message on the console meaning the run() definitely executed. However, I found that if 2 or more checkpoints are taken since the last recovery with the source in a FINISHED state then the console message does not appear and the watermark is not emitted.

      To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or Flink if I see two or more checkpoints logged in between recoveries. If zero or checkpoints are made, everything is fine – the join gets the watermark and I see my console message. You can play with the checkpointing frequency as per the code comments:

              // Useful checkpoint interval options:

              //    5 - see the problem after the first recovery

              //   70 - useful to see bad behaviour kick in after a recovery or two

              //  120 - won't see the problem as we don't have 2 checkpoints within a single recovery session

      If I merge the Triggering/Completed checkpoint messages in the log with my console output I see something like this clearly showing the “Short Lived Source” run() method is not executed after 2 checkpoints with the operators marked as FINISHED:


      2022-06-29T11:52:31.268Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.

      2022-06-29T11:52:31.293Z: LongRunningSource emitting initial watermark=1656503551268

      2022-06-29T11:52:41.302Z: LongRunningSource emitting loop watermark=1656503561302

      2022-06-29T11:52:51.302Z: LongRunningSource emitting loop watermark=1656503571302

      2022-06-29T11:53:01.303Z: LongRunningSource emitting loop watermark=1656503581303

      2022-06-29 11:53:02.772 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

      2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job 877656d7752bc1304c2cb92790e6aefb

      2022-06-29T11:53:11.303Z: LongRunningSource emitting loop watermark=1656503591303

      2022-06-29T11:53:21.304Z: LongRunningSource emitting loop watermark=1656503601304

      2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------

      2022-06-29T11:53:22.405Z: LongRunningSource emitting initial watermark=1656503602405

      2022-06-29T11:53:22.408Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.

      2022-06-29T11:53:32.406Z: LongRunningSource emitting loop watermark=1656503612406

      2022-06-29T11:53:42.406Z: LongRunningSource emitting loop watermark=1656503622406

      2022-06-29 11:53:51.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

      2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4] o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job 877656d7752bc1304c2cb92790e6aefb

      2022-06-29T11:53:52.407Z: LongRunningSource emitting loop watermark=1656503632407

      2022-06-29T11:54:02.407Z: LongRunningSource emitting loop watermark=1656503642407

      2022-06-29T11:54:12.408Z: LongRunningSource emitting loop watermark=1656503652408

      2022-06-29T11:54:22.408Z: LongRunningSource emitting loop watermark=1656503662408

      2022-06-29T11:54:32.409Z: LongRunningSource emitting loop watermark=1656503672409

      2022-06-29T11:54:42.409Z: LongRunningSource emitting loop watermark=1656503682409

      2022-06-29T11:54:52.410Z: LongRunningSource emitting loop watermark=1656503692410

      2022-06-29 11:55:01.048 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})

      2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job 877656d7752bc1304c2cb92790e6aefb

      2022-06-29T11:55:02.410Z: LongRunningSource emitting loop watermark=1656503702410

      2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------

      2022-06-29T11:55:03.445Z: LongRunningSource emitting initial watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message after recovery

      2022-06-29T11:55:13.446Z: LongRunningSource emitting loop watermark=1656503713445

      2022-06-29T11:55:23.446Z: LongRunningSource emitting loop watermark=1656503723446

      2022-06-29T11:55:33.446Z: LongRunningSource emitting loop watermark=1656503733446


      I have also attached a longer example with shows everything working fine after 5 recoveries, and then breaking after the 6th.

      I am guessing here it has something to do with the checkpointing and recovery of a FINISHED source.

      Finally, here are some ways that allows the code to work:

      • Change the code so the “Short Lived Source” doesn’t return from run() and stays RUNNING (uncomment the Thread.sleep)
      • As I mentioned before, if I remove the map() operator the problem in the join also goes away. (I don’t see the console output but the join is happy)
      • Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have two checkpoints with FINISHED state per recovery.

      The fact these changes prevent the issue means I really think there’s some bug or inconsistency here – if somebody could explain I would really appreciate it.



        1. WatermarkDemoMain.java
          7 kB
        2. longExample.txt
          10 kB
        3. image-2022-07-08-17-06-01-256.png
          51 kB
        4. image-2022-07-01-16-18-14-768.png
          30 kB



            pnowojski Piotr Nowojski
            Sandys-Lumsdaine James
            0 Vote for this issue
            3 Start watching this issue