Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20396

Add "OperatorCoordinator.resetSubtask()" to fix order problems of "subtaskFailed()"

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      There are no strong order guarantees between OperatorCoordinator.subtaskFailed() and OperatorCoordinator.notifyCheckpointComplete().

      It can happen that a checkpoint completes after the notification for task failure is sent:

      • OperatorCoordinator.checkpoint()
      • OperatorCoordinator.subtaskFailed()
      • OperatorCoordinator.checkpointComplete()

      The subtask failure here does not know whether the previous checkpoint completed or not. It cannot decide what state the subtask will be in after recovery.
      There is no easy fix right now to strictly guarantee the order of the method calls, so alternatively we need to provide the necessary information to reason about the status of tasks.

      We should replace OperatorCoordinator.subtaskFailed(int subtask) with OperatorCoordinator.subtaskRestored(int subtask, long checkpoint). That implementations get the explicit checkpoint ID for the subtask recovery, and can align that with the IDs of checkpoints that were taken.

      It is still (in rare cases) possible that for a specific checkpoint C, OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before OperatorCoordinator.checkpointComplete(C).

      Background

      The Checkpointing Procedure is partially asynchronous on the JobManager / CheckpointCoordinator: After all subtasks acknowledged the checkpoint, the finalization (writing out metadata and registering the checkpoint in ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.

      This sequence of events can happen:

      • tasks acks checkpoint
      • checkpoint fully acknowledged, finalization starts
      • task fails
      • task failure notification is dispatched
      • checkpoint completes.

      For task failures and checkpoint completion, no order is defined.

      However, for task restore and checkpoint completion, the order is well defined: When a task is restored, pending checkpoints are either canceled or complete. None can be within finalization. That is currently guaranteed with a lock in the CheckpointCoordinator.
      (An implication of that being that restores can be blocking operations in the scheduler, which is not ideal from the perspective of making the scheduler async/non-blocking, but it is currently essential for correctness).

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            sewen Stephan Ewen
            sewen Stephan Ewen
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment