Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20396

Add "OperatorCoordinator.resetSubtask()" to fix order problems of "subtaskFailed()"

    XMLWordPrintableJSON

Details

    Description

      There are no strong order guarantees between OperatorCoordinator.subtaskFailed() and OperatorCoordinator.notifyCheckpointComplete().

      It can happen that a checkpoint completes after the notification for task failure is sent:

      • OperatorCoordinator.checkpoint()
      • OperatorCoordinator.subtaskFailed()
      • OperatorCoordinator.checkpointComplete()

      The subtask failure here does not know whether the previous checkpoint completed or not. It cannot decide what state the subtask will be in after recovery.
      There is no easy fix right now to strictly guarantee the order of the method calls, so alternatively we need to provide the necessary information to reason about the status of tasks.

      We should replace OperatorCoordinator.subtaskFailed(int subtask) with OperatorCoordinator.subtaskRestored(int subtask, long checkpoint). That implementations get the explicit checkpoint ID for the subtask recovery, and can align that with the IDs of checkpoints that were taken.

      It is still (in rare cases) possible that for a specific checkpoint C, OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before OperatorCoordinator.checkpointComplete(C).

      Background

      The Checkpointing Procedure is partially asynchronous on the JobManager / CheckpointCoordinator: After all subtasks acknowledged the checkpoint, the finalization (writing out metadata and registering the checkpoint in ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.

      This sequence of events can happen:

      • tasks acks checkpoint
      • checkpoint fully acknowledged, finalization starts
      • task fails
      • task failure notification is dispatched
      • checkpoint completes.

      For task failures and checkpoint completion, no order is defined.

      However, for task restore and checkpoint completion, the order is well defined: When a task is restored, pending checkpoints are either canceled or complete. None can be within finalization. That is currently guaranteed with a lock in the CheckpointCoordinator.
      (An implication of that being that restores can be blocking operations in the scheduler, which is not ideal from the perspective of making the scheduler async/non-blocking, but it is currently essential for correctness).

      Attachments

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            People

              sewen Stephan Ewen
              sewen Stephan Ewen
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: