Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5285

CancelCheckpointMarker flood when using at least once mode

    Details

      Description

      When using at least once mode (BarrierTracker), then an interleaved arrival of cancellation barriers at the BarrierTracker of two consecutive checkpoints can trigger a flood of CancelCheckpointMarkers.

      The following sequence is problematic:

      Cancel(1, 0),
      Cancel(2, 0),
      Cancel(1, 1),
      Cancel(2, 1),
      Cancel(1, 2),
      Cancel(2, 2)
      

      with Cancel(checkpointId, channelId)

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/2964

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2964
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed in 1.2.0 via d3f19a5bead1d0709da733b75d729afa9341c250
          Fixed in 1.1.4 via afaa27e9faeb0352a49f30de90e719572caa97c5

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed in 1.2.0 via d3f19a5bead1d0709da733b75d729afa9341c250 Fixed in 1.1.4 via afaa27e9faeb0352a49f30de90e719572caa97c5
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2963

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2963
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2963

          Thanks for the review @StephanEwen. Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2963 Thanks for the review @StephanEwen. Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2964

          Test failure are unrelated. Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2964 Test failure are unrelated. Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2964#discussion_r91556985

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java —
          @@ -225,17 +230,19 @@ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int c
          pendingCheckpoints.removeFirst();
          }
          }

          • else {
            + else if (checkpointId > latestPendingCheckpointID) {
            notifyAbort(checkpointId);
          • // first barrier for this checkpoint - remember it as aborted
          • // since we polled away all entries with lower checkpoint IDs
          • // this entry will become the new first entry
          • if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - }

            + latestPendingCheckpointID = checkpointId;
            +
            + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
            + abortedMarker.markAborted();
            + pendingCheckpoints.addLast(abortedMarker);

              • End diff –

          True, will add `addFirst` again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2964#discussion_r91556985 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java — @@ -225,17 +230,19 @@ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int c pendingCheckpoints.removeFirst(); } } else { + else if (checkpointId > latestPendingCheckpointID) { notifyAbort(checkpointId); // first barrier for this checkpoint - remember it as aborted // since we polled away all entries with lower checkpoint IDs // this entry will become the new first entry if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - } + latestPendingCheckpointID = checkpointId; + + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addLast(abortedMarker); End diff – True, will add `addFirst` again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2964

          Thanks for the review @StephanEwen. I will address your comments also from the backport and merge the PR then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2964 Thanks for the review @StephanEwen. I will address your comments also from the backport and merge the PR then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2964#discussion_r91547796

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java —
          @@ -225,17 +230,19 @@ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int c
          pendingCheckpoints.removeFirst();
          }
          }

          • else {
            + else if (checkpointId > latestPendingCheckpointID) {
            notifyAbort(checkpointId);
          • // first barrier for this checkpoint - remember it as aborted
          • // since we polled away all entries with lower checkpoint IDs
          • // this entry will become the new first entry
          • if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - }

            + latestPendingCheckpointID = checkpointId;
            +
            + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
            + abortedMarker.markAborted();
            + pendingCheckpoints.addLast(abortedMarker);

              • End diff –

          Small comment here: I would

          • either keep the `addFirst()` statement here (we can be sure that is true, given that we pulled out all older checkpoints)
          • or add a sanity check that `pendingCheckpoints` is empty at that point.

          That way we explicitly guard the assumption that `pendingCheckpoints` contains entries on ordered sequence (which is currently only implicitly guarded by the `checkpointId > latestPendingCheckpointID` condition.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2964#discussion_r91547796 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java — @@ -225,17 +230,19 @@ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int c pendingCheckpoints.removeFirst(); } } else { + else if (checkpointId > latestPendingCheckpointID) { notifyAbort(checkpointId); // first barrier for this checkpoint - remember it as aborted // since we polled away all entries with lower checkpoint IDs // this entry will become the new first entry if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - } + latestPendingCheckpointID = checkpointId; + + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addLast(abortedMarker); End diff – Small comment here: I would either keep the `addFirst()` statement here (we can be sure that is true, given that we pulled out all older checkpoints) or add a sanity check that `pendingCheckpoints` is empty at that point. That way we explicitly guard the assumption that `pendingCheckpoints` contains entries on ordered sequence (which is currently only implicitly guarded by the `checkpointId > latestPendingCheckpointID` condition.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2963#discussion_r91532008

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java —
          @@ -422,7 +425,7 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc
          startOfAlignmentTimestamp = System.nanoTime();

          if (LOG.isDebugEnabled()) {

          • LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
            + LOG.debug("Starting stream alignment for checkpoint {}.", checkpointId);
              • End diff –

          I am not a super big fan of that change: Given that this is guarded by a `LOG.isDebugEnabled()` the prior version was actually more efficient than the new one. Saves a boxing operation and a parsing step.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2963#discussion_r91532008 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java — @@ -422,7 +425,7 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug("Starting stream alignment for checkpoint " + checkpointId); + LOG.debug("Starting stream alignment for checkpoint {}.", checkpointId); End diff – I am not a super big fan of that change: Given that this is guarded by a `LOG.isDebugEnabled()` the prior version was actually more efficient than the new one. Saves a boxing operation and a parsing step.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2964

          [backport] FLINK-5285 Abort checkpoint only once in BarrierTracker

          Backport of #2963 for the release-1.1 branch.

          Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
          to trigger a flood of cancellation markers for down stream operators. This is done by
          aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
          aborted checkpoints.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink backportFixCheckpointBarrierCancellation

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2964.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2964


          commit debc59177c6e7a32266a12775d3379a36d23f7f6
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-07T18:05:47Z

          FLINK-5285 Abort checkpoint only once in BarrierTracker

          Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
          to trigger a flood of cancellation markers for down stream operators. This is done by
          aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
          aborted checkpoints.

          Add test case


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2964 [backport] FLINK-5285 Abort checkpoint only once in BarrierTracker Backport of #2963 for the release-1.1 branch. Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink backportFixCheckpointBarrierCancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2964 commit debc59177c6e7a32266a12775d3379a36d23f7f6 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-07T18:05:47Z FLINK-5285 Abort checkpoint only once in BarrierTracker Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. Add test case
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2963

          FLINK-5285 Abort checkpoint only once in BarrierTracker

          Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
          to trigger a flood of cancellation markers for down stream operators. This is done by
          aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
          aborted checkpoints.

          cc @StephanEwen

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixCheckpointBarrierCancellation

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2963.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2963


          commit cbf0d6c30a29536315979502b1e8728971441bdf
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-07T18:05:47Z

          FLINK-5285 Abort checkpoint only once in BarrierTracker

          Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
          to trigger a flood of cancellation markers for down stream operators. This is done by
          aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
          aborted checkpoints.

          Add test case


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2963 FLINK-5285 Abort checkpoint only once in BarrierTracker Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. cc @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCheckpointBarrierCancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2963.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2963 commit cbf0d6c30a29536315979502b1e8728971441bdf Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-07T18:05:47Z FLINK-5285 Abort checkpoint only once in BarrierTracker Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. Add test case

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development