Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5193

Recovering all jobs fails completely if a single recovery fails

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.1.3
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: JobManager
    • Labels:
      None

      Description

      In HA case where the JobManager tries to recover all submitted job graphs, e.g. when regaining leadership, it can happen that none of the submitted jobs are recovered if a single recovery fails. Instead of failing the complete recovery procedure, the JobManager should still try to recover the remaining (non-failing) jobs and print a proper error message for the failed recoveries.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2909

          FLINK-5193 [jm] Harden job recovery in case of recovery failures

          When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
          This PR changes this behaviour to make the recovery of jobs independent so that a single
          failure won't make the complete recovery fail. Furthermore, this PR improves the error reporting
          for failures originating in the ZooKeeperSubmittedJobGraphStore.

          Add test case

          Fix failing JobManagerHACheckpointRecoveryITCase

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixJobRecoveryFailure

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2909.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2909


          commit d61636d0465e0e0f274871a883d8d376c223a1f3
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-11-29T16:31:08Z

          FLINK-5193 [jm] Harden job recovery in case of recovery failures

          When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
          This PR changes this behaviour to make the recovery of jobs independent so that a single
          failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
          for failures originating in the ZooKeeperSubmittedJobGraphStore.

          Add test case

          Fix failing JobManagerHACheckpointRecoveryITCase


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2909 FLINK-5193 [jm] Harden job recovery in case of recovery failures When recovering multiple jobs a single recovery failure caused all jobs to be not recovered. This PR changes this behaviour to make the recovery of jobs independent so that a single failure won't make the complete recovery fail. Furthermore, this PR improves the error reporting for failures originating in the ZooKeeperSubmittedJobGraphStore. Add test case Fix failing JobManagerHACheckpointRecoveryITCase You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixJobRecoveryFailure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2909 commit d61636d0465e0e0f274871a883d8d376c223a1f3 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-11-29T16:31:08Z FLINK-5193 [jm] Harden job recovery in case of recovery failures When recovering multiple jobs a single recovery failure caused all jobs to be not recovered. This PR changes this behaviour to make the recovery of jobs independent so that a single failure won't stall the complete recovery. Furthermore, this PR improves the error reporting for failures originating in the ZooKeeperSubmittedJobGraphStore. Add test case Fix failing JobManagerHACheckpointRecoveryITCase
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2910

          [backport] FLINK-5193 [jm] Harden job recovery in case of recovery failures

          This is a backport of #2909 to the release 1.1 branch.

          When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
          This PR changes this behaviour to make the recovery of jobs independent so that a single
          failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
          for failures originating in the ZooKeeperSubmittedJobGraphStore.

          cc @uce

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink backportFixJobRecoveryFailure

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2910.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2910


          commit 01620e88ca5a963941ced979c143ab95777249d8
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-11-29T16:31:08Z

          FLINK-5193 [jm] Harden job recovery in case of recovery failures

          When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
          This PR changes this behaviour to make the recovery of jobs independent so that a single
          failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
          for failures originating in the ZooKeeperSubmittedJobGraphStore.

          Add test case

          Fix failing JobManagerHACheckpointRecoveryITCase


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2910 [backport] FLINK-5193 [jm] Harden job recovery in case of recovery failures This is a backport of #2909 to the release 1.1 branch. When recovering multiple jobs a single recovery failure caused all jobs to be not recovered. This PR changes this behaviour to make the recovery of jobs independent so that a single failure won't stall the complete recovery. Furthermore, this PR improves the error reporting for failures originating in the ZooKeeperSubmittedJobGraphStore. cc @uce You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink backportFixJobRecoveryFailure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2910.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2910 commit 01620e88ca5a963941ced979c143ab95777249d8 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-11-29T16:31:08Z FLINK-5193 [jm] Harden job recovery in case of recovery failures When recovering multiple jobs a single recovery failure caused all jobs to be not recovered. This PR changes this behaviour to make the recovery of jobs independent so that a single failure won't stall the complete recovery. Furthermore, this PR improves the error reporting for failures originating in the ZooKeeperSubmittedJobGraphStore. Add test case Fix failing JobManagerHACheckpointRecoveryITCase
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2910#discussion_r90237601

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala —
          @@ -505,37 +507,31 @@ class JobManager(
          }
          }
          } catch

          { - case t: Throwable => log.error(s"Failed to recover job $jobId.", t) + case t: Throwable => log.warn(s"Failed to recover job $jobId.", t) }

          }(context.dispatcher)

          case RecoverAllJobs =>
          future {

          • try {
          • // The ActorRef, which is part of the submitted job graph can only be
          • // de-serialized in the scope of an actor system.
          • akka.serialization.JavaSerializer.currentSystem.withValue(
          • context.system.asInstanceOf[ExtendedActorSystem]) {
            + log.info("Attempting to recover all jobs.")
          • log.info(s"Attempting to recover all jobs.")
            -
          • val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
            + try {
            + val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
          • if (!leaderElectionService.hasLeadership()) {
          • // we've lost leadership. mission: abort.
          • log.warn(s"Lost leadership during recovery. Aborting recovery of $ {jobGraphs.size} " +
            - s"jobs.")
            - } else {
            - log.info(s"Re-submitting ${jobGraphs.size}

            job graphs.")
            + if (jobIdsToRecover.isEmpty)

            { + log.info("There are no jobs to recover.") + }

            else

            Unknown macro: { + log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " +
            — End diff –

            Should we do a `if-else` on the log level here and print the job IDs on debug?
            ```
            if (isDebug()) {
            // There are ${jobIdsToRecover.size} jobs to recover}

            else

            { // What you already have }

            ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2910#discussion_r90237601 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala — @@ -505,37 +507,31 @@ class JobManager( } } } catch { - case t: Throwable => log.error(s"Failed to recover job $jobId.", t) + case t: Throwable => log.warn(s"Failed to recover job $jobId.", t) } }(context.dispatcher) case RecoverAllJobs => future { try { // The ActorRef, which is part of the submitted job graph can only be // de-serialized in the scope of an actor system. akka.serialization.JavaSerializer.currentSystem.withValue( context.system.asInstanceOf [ExtendedActorSystem] ) { + log.info("Attempting to recover all jobs.") log.info(s"Attempting to recover all jobs.") - val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala + try { + val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala if (!leaderElectionService.hasLeadership()) { // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of $ {jobGraphs.size} " + - s"jobs.") - } else { - log.info(s"Re-submitting ${jobGraphs.size} job graphs.") + if (jobIdsToRecover.isEmpty) { + log.info("There are no jobs to recover.") + } else Unknown macro: { + log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " + — End diff – Should we do a `if-else` on the log level here and print the job IDs on debug? ``` if (isDebug()) { // There are ${jobIdsToRecover.size} jobs to recover} else { // What you already have } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2910#discussion_r90235223

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java —
          @@ -64,6 +57,14 @@
          void removeJobGraph(JobID jobId) throws Exception;

          /**
          + * Get all job ids of submitted job graphs to the submitted job graph store.
          — End diff –

          Good idea to separate it this way

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2910#discussion_r90235223 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java — @@ -64,6 +57,14 @@ void removeJobGraph(JobID jobId) throws Exception; /** + * Get all job ids of submitted job graphs to the submitted job graph store. — End diff – Good idea to separate it this way
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2910#discussion_r90235653

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java —
          @@ -275,6 +242,25 @@ public void removeJobGraph(JobID jobId) throws Exception {
          }
          }

          + @Override
          + public Collection<JobID> getJobIds() throws Exception {
          + Collection<String> paths;
          +
          + try

          { + paths = jobGraphsInZooKeeper.getAllPaths(); + }

          catch (Exception e)

          { + throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e); + }

          +
          + List<JobID> jobIds = new ArrayList<>(paths.size());
          +
          + for (String path : paths) {
          + jobIds.add(jobIdfromPath(path));
          — End diff –

          If we have a malformatted sub node, this will skip recovery of all jobs again, right? Wondering if we should wrap this line in a `try-catch`. It's quite unlikely though as someone would need to put the node there manually Feel free to not address this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2910#discussion_r90235653 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java — @@ -275,6 +242,25 @@ public void removeJobGraph(JobID jobId) throws Exception { } } + @Override + public Collection<JobID> getJobIds() throws Exception { + Collection<String> paths; + + try { + paths = jobGraphsInZooKeeper.getAllPaths(); + } catch (Exception e) { + throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e); + } + + List<JobID> jobIds = new ArrayList<>(paths.size()); + + for (String path : paths) { + jobIds.add(jobIdfromPath(path)); — End diff – If we have a malformatted sub node, this will skip recovery of all jobs again, right? Wondering if we should wrap this line in a `try-catch`. It's quite unlikely though as someone would need to put the node there manually Feel free to not address this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2910#discussion_r90446364

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala —
          @@ -505,37 +507,31 @@ class JobManager(
          }
          }
          } catch

          { - case t: Throwable => log.error(s"Failed to recover job $jobId.", t) + case t: Throwable => log.warn(s"Failed to recover job $jobId.", t) }

          }(context.dispatcher)

          case RecoverAllJobs =>
          future {

          • try {
          • // The ActorRef, which is part of the submitted job graph can only be
          • // de-serialized in the scope of an actor system.
          • akka.serialization.JavaSerializer.currentSystem.withValue(
          • context.system.asInstanceOf[ExtendedActorSystem]) {
            + log.info("Attempting to recover all jobs.")
          • log.info(s"Attempting to recover all jobs.")
            -
          • val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
            + try {
            + val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
          • if (!leaderElectionService.hasLeadership()) {
          • // we've lost leadership. mission: abort.
          • log.warn(s"Lost leadership during recovery. Aborting recovery of $ {jobGraphs.size} " +
            - s"jobs.")
            - } else {
            - log.info(s"Re-submitting ${jobGraphs.size}

            job graphs.")
            + if (jobIdsToRecover.isEmpty)

            { + log.info("There are no jobs to recover.") + }

            else {
            + log.info(s"There are $

            {jobIdsToRecover.size}

            jobs to recover. Starting the job " +

              • End diff –

          I think logging all job IDs on info level is fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2910#discussion_r90446364 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala — @@ -505,37 +507,31 @@ class JobManager( } } } catch { - case t: Throwable => log.error(s"Failed to recover job $jobId.", t) + case t: Throwable => log.warn(s"Failed to recover job $jobId.", t) } }(context.dispatcher) case RecoverAllJobs => future { try { // The ActorRef, which is part of the submitted job graph can only be // de-serialized in the scope of an actor system. akka.serialization.JavaSerializer.currentSystem.withValue( context.system.asInstanceOf [ExtendedActorSystem] ) { + log.info("Attempting to recover all jobs.") log.info(s"Attempting to recover all jobs.") - val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala + try { + val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala if (!leaderElectionService.hasLeadership()) { // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of $ {jobGraphs.size} " + - s"jobs.") - } else { - log.info(s"Re-submitting ${jobGraphs.size} job graphs.") + if (jobIdsToRecover.isEmpty) { + log.info("There are no jobs to recover.") + } else { + log.info(s"There are $ {jobIdsToRecover.size} jobs to recover. Starting the job " + End diff – I think logging all job IDs on info level is fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2910

          Thanks for the review @StephanEwen & @uce. Will address your comment with the malformed job id.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2910 Thanks for the review @StephanEwen & @uce. Will address your comment with the malformed job id.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2909

          Forwarding @uce and @StephanEwen review from the backport to this PR.

          Rebasing on the latest master and if Travis gives green light, then I will merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2909 Forwarding @uce and @StephanEwen review from the backport to this PR. Rebasing on the latest master and if Travis gives green light, then I will merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2909

          Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2909 Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2909

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2909
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed in 1.2.0 via add3765d1626a04fb98b8f36cb725eb32806d8b6
          Fixed in 1.1.4 via d314bc5235e2573ff77f45d327bc62f521063b71

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed in 1.2.0 via add3765d1626a04fb98b8f36cb725eb32806d8b6 Fixed in 1.1.4 via d314bc5235e2573ff77f45d327bc62f521063b71
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/2910

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2910

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development