Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.15.0
-
This can be reproduced in an IDE with the attached sample program.
Description
Copied mostly from email trail on the flink user mailing list:
I done a lot of experimentation and I’m convinced there is a problem with Flink handling Finished sources and recovery.
The program consists of:
- Two sources:
- One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds.
- Prints the console a message saying the watermark has been emitted.
- Throws an exception every 5 or 10 iterations to force a recovery.
- One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a message to the console and returns.
- One “Long Running Source” – stays alive and emits a watermark of DateTime.now() every 10 seconds.
- The “Short Live Source” feeds into a map() and then it joins with the “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by Flink.
The problem here is that the “Join” receives no Long.MAX_VALUE watermark from the map() in some situations after a recovery. The dashboard goes from showing this:
To the below after a recovery (with the currentInput1/2Watermark metrics showing input 2 having not received a watermark from the map, saying –Long.MAX_VALUE):
The program is currently set to checkpoint every 5 seconds. By experimenting with 70 seconds, it seems that if only one checkpoint has been taken with the “Short Lived Source” in a FINISHED state since the last recovery then everything works fine and the restarted “Short Lived Source” emits its watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE watermark” message on the console meaning the run() definitely executed. However, I found that if 2 or more checkpoints are taken since the last recovery with the source in a FINISHED state then the console message does not appear and the watermark is not emitted.
To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or Flink if I see two or more checkpoints logged in between recoveries. If zero or checkpoints are made, everything is fine – the join gets the watermark and I see my console message. You can play with the checkpointing frequency as per the code comments:
// Useful checkpoint interval options:
// 5 - see the problem after the first recovery
// 70 - useful to see bad behaviour kick in after a recovery or two
// 120 - won't see the problem as we don't have 2 checkpoints within a single recovery session
If I merge the Triggering/Completed checkpoint messages in the log with my console output I see something like this clearly showing the “Short Lived Source” run() method is not executed after 2 checkpoints with the operators marked as FINISHED:
2022-06-29T11:52:31.268Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.
2022-06-29T11:52:31.293Z: LongRunningSource emitting initial watermark=1656503551268
2022-06-29T11:52:41.302Z: LongRunningSource emitting loop watermark=1656503561302
2022-06-29T11:52:51.302Z: LongRunningSource emitting loop watermark=1656503571302
2022-06-29T11:53:01.303Z: LongRunningSource emitting loop watermark=1656503581303
2022-06-29 11:53:02.772 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
2022-06-29 11:53:02.870 INFO [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 1 for job 877656d7752bc1304c2cb92790e6aefb
2022-06-29T11:53:11.303Z: LongRunningSource emitting loop watermark=1656503591303
2022-06-29T11:53:21.304Z: LongRunningSource emitting loop watermark=1656503601304
2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------
2022-06-29T11:53:22.405Z: LongRunningSource emitting initial watermark=1656503602405
2022-06-29T11:53:22.408Z: ShortLivedEmptySource emitting Long.MAX_VALUE watermark.
2022-06-29T11:53:32.406Z: LongRunningSource emitting loop watermark=1656503612406
2022-06-29T11:53:42.406Z: LongRunningSource emitting loop watermark=1656503622406
2022-06-29 11:53:51.048 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
2022-06-29 11:53:51.067 INFO [jobmanager-io-thread-4] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 2 for job 877656d7752bc1304c2cb92790e6aefb
2022-06-29T11:53:52.407Z: LongRunningSource emitting loop watermark=1656503632407
2022-06-29T11:54:02.407Z: LongRunningSource emitting loop watermark=1656503642407
2022-06-29T11:54:12.408Z: LongRunningSource emitting loop watermark=1656503652408
2022-06-29T11:54:22.408Z: LongRunningSource emitting loop watermark=1656503662408
2022-06-29T11:54:32.409Z: LongRunningSource emitting loop watermark=1656503672409
2022-06-29T11:54:42.409Z: LongRunningSource emitting loop watermark=1656503682409
2022-06-29T11:54:52.410Z: LongRunningSource emitting loop watermark=1656503692410
2022-06-29 11:55:01.048 INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD})
2022-06-29 11:55:01.057 INFO [jobmanager-io-thread-10] o.a.f.r.c.CheckpointCoordinator Completed checkpoint 3 for job 877656d7752bc1304c2cb92790e6aefb
2022-06-29T11:55:02.410Z: LongRunningSource emitting loop watermark=1656503702410
2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------
2022-06-29T11:55:03.445Z: LongRunningSource emitting initial watermark=1656503703444 <<<<< NO “ShortLivedEmptySource” message after recovery
2022-06-29T11:55:13.446Z: LongRunningSource emitting loop watermark=1656503713445
2022-06-29T11:55:23.446Z: LongRunningSource emitting loop watermark=1656503723446
2022-06-29T11:55:33.446Z: LongRunningSource emitting loop watermark=1656503733446
I have also attached a longer example with shows everything working fine after 5 recoveries, and then breaking after the 6th.
I am guessing here it has something to do with the checkpointing and recovery of a FINISHED source.
Finally, here are some ways that allows the code to work:
- Change the code so the “Short Lived Source” doesn’t return from run() and stays RUNNING (uncomment the Thread.sleep)
- As I mentioned before, if I remove the map() operator the problem in the join also goes away. (I don’t see the console output but the join is happy)
- Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have two checkpoints with FINISHED state per recovery.
The fact these changes prevent the issue means I really think there’s some bug or inconsistency here – if somebody could explain I would really appreciate it.