Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19596

Do not recover CompletedCheckpointStore on each failover

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Critical
    • Resolution: Duplicate
    • 1.11.2
    • None
    • None

    Description

      completedCheckpointStore.recover() in restoreLatestCheckpointedStateInternal could be a bottleneck on failover because the CompletedCheckpointStore needs to load HDFS files to instantialize the CompleteCheckpoint instances.

      The impact is significant in our case below:

      • Jobs with high parallelism (no shuffle) which transfer data from Kafka to other filesystems.
      • If a machine goes down, several containers and tens of tasks are affected, which means the completedCheckpointStore.recover() would be called tens of times since the tasks are not in a failover region.

      And I notice there is a "TODO" in the source codes:

      // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
      completedCheckpointStore.recover();
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            wind_ljy Jiayi Liao
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment