Affects Version/s: 1.7.0
Fix Version/s: None
Component/s: Runtime / Coordination
When a JobMaster in an HA setup looses leadership, it suspends the execution of its job via JobMaster.suspend(Exception, Time). This operation involves transitioning to the SUSPENDING job state and cancelling all running tasks. In some executions it may happen that the job does not reach the terminal SUSPENDED job state.
This is due to the fact that suspending the job stops related RPC endpoints such as the JobMaster or SlotPool (in JobMaster.suspend(Exception, Time) and JobMaster.suspendExecution( Exception)) immediately after suspending. Whenever this happens before the TaskExecutor instances have cancelled or failed the respective tasks, the job does not transition to SUSPENDED, because the ExecutionGraph does not receive all Execution state transitions.
In practice, this should not happen frequently due the fact that JobMaster and TaskExecutor instances are notified about the loss of leadership (or loss of ZooKeeper connection or similar events) around the same time. In this scenario, the TaskExecutor instances proactively fail the executing tasks and notify the JobMaster. All in all, the impact of this is limited by the fact that a new JobMaster leader will eventually recover the job.
Steps to reproduce:
- Start ZooKeeper
- Start a Flink cluster in HA mode and submit job
- Stop ZooKeeper
In some executions you will find that the job does not reach the terminal state SUSPENDED. Furthermore, you may see log messages similar to the following in this case:
I've attached a logs of a local run that does not transition to SUSPENDED and a sequence diagram of what I think may be a problematic timing.