Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5246

Don't discard unknown checkpoint messages in the CheckpointCoordinator

    Details

      Description

      The delicate interplay of the CheckpointCoordinator and the SavepointCoordinator requires that unknown checkpoint messages are not discarded but given to the other coordinator. If both coordinator don't accept the checkpoint message, then the associated state will be discarded by the JobManager.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/2930

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2930
          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via https://github.com/apache/flink/commit/da09d418c1add17169368a38aeb9d793f9a2324c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2930#discussion_r90775412

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala —
          @@ -1376,6 +1376,24 @@ class JobManager(
          // addressed to the periodic checkpoint coordinator.
          log.info("Received message for non-existing checkpoint " +
          ackMessage.getCheckpointId)
          +
          + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match

          { + case Some(userCodeClassLoader) => userCodeClassLoader + case None => getClass.getClassLoader + }

          +
          + future {
          + Option(ackMessage.getState()) match {
          — End diff –

          Should we check the option before submitting the future and only do it if it's non empty?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2930#discussion_r90775412 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala — @@ -1376,6 +1376,24 @@ class JobManager( // addressed to the periodic checkpoint coordinator. log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) + + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match { + case Some(userCodeClassLoader) => userCodeClassLoader + case None => getClass.getClassLoader + } + + future { + Option(ackMessage.getState()) match { — End diff – Should we check the option before submitting the future and only do it if it's non empty?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2930#discussion_r90775397

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala —
          @@ -1376,6 +1376,24 @@ class JobManager(
          // addressed to the periodic checkpoint coordinator.
          log.info("Received message for non-existing checkpoint " +
          ackMessage.getCheckpointId)
          +
          + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match {
          — End diff –

          Should we add comment here that we do this here to prevent lingering state if both coordinators already cleared the pending checkpoint?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2930#discussion_r90775397 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala — @@ -1376,6 +1376,24 @@ class JobManager( // addressed to the periodic checkpoint coordinator. log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) + + val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match { — End diff – Should we add comment here that we do this here to prevent lingering state if both coordinators already cleared the pending checkpoint?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2930#discussion_r90775426

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -735,15 +735,15 @@ else if (checkpoint != null) {
          if (recentPendingCheckpoints.contains(checkpointId)) {
          wasPendingCheckpoint = true;
          LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
          +
          + // try to discard the state so that we don't have lingering state lying around
          + discardState(message.getState());
          }
          else {
          — End diff –

          We should add a comment here for 1.1 that it's important to not discard the state here if this was not a known pending checkpoint.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2930#discussion_r90775426 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -735,15 +735,15 @@ else if (checkpoint != null) { if (recentPendingCheckpoints.contains(checkpointId)) { wasPendingCheckpoint = true; LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + + // try to discard the state so that we don't have lingering state lying around + discardState(message.getState()); } else { — End diff – We should add a comment here for 1.1 that it's important to not discard the state here if this was not a known pending checkpoint.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2930

          Failing test cases are unrelated. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2930 Failing test cases are unrelated. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2930

          FLINK-5246 Don't discard checkpoint messages if they are unknown

          This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding
          checkpoint messages are not known to the checkpoint coordinator and thus should not be
          discarded. Instead, the JobManager will now discard all messages which have not been accepted
          by neither the CheckpointCoordinator nor the SavepointCoordinator.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixCheckpointMessages

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2930.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2930


          commit e872b5fbc454e379eab6788bae77c6bf3e2e98af
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-03T19:15:35Z

          FLINK-5246 Don't discard checkpoint messages if they are unknown

          This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding
          checkpoint messages are not known to the checkpoint coordinator and thus should not be
          discarded. Instead, the JobManager will now discard all messages which have not been accepted
          by neither the CheckpointCoordinator nor the SavepointCoordinator.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2930 FLINK-5246 Don't discard checkpoint messages if they are unknown This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding checkpoint messages are not known to the checkpoint coordinator and thus should not be discarded. Instead, the JobManager will now discard all messages which have not been accepted by neither the CheckpointCoordinator nor the SavepointCoordinator. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCheckpointMessages Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2930 commit e872b5fbc454e379eab6788bae77c6bf3e2e98af Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-03T19:15:35Z FLINK-5246 Don't discard checkpoint messages if they are unknown This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding checkpoint messages are not known to the checkpoint coordinator and thus should not be discarded. Instead, the JobManager will now discard all messages which have not been accepted by neither the CheckpointCoordinator nor the SavepointCoordinator.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development