Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5962

Cancel checkpoint canceller tasks in CheckpointCoordinator

    Details

      Description

      The CheckpointCoordinator register a canceller task for each running checkpoint. The canceller task's responsibility is to cancel a checkpoint if it takes too long to complete. We should cancel this task as soon as the checkpoint has been completed, because otherwise we will keep many canceller tasks around. This can eventually lead to an OOM exception.

        Issue Links

          Activity

          Hide
          ram_krish ramkrishna.s.vasudevan added a comment -

          I can work on this Till Rohrmann - if you have not already started with it.

          Show
          ram_krish ramkrishna.s.vasudevan added a comment - I can work on this Till Rohrmann - if you have not already started with it.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi ramkrishna.s.vasudevan, my latest knowledge is that Stephan Ewen wanted to take a look because he's currently working on the CheckpointCoordinator anyway. But maybe you guys can split the work. Let's see what he says.

          Show
          till.rohrmann Till Rohrmann added a comment - Hi ramkrishna.s.vasudevan , my latest knowledge is that Stephan Ewen wanted to take a look because he's currently working on the CheckpointCoordinator anyway. But maybe you guys can split the work. Let's see what he says.
          Hide
          StephanEwen Stephan Ewen added a comment -

          I have a pretty big change to the PendingCheckpoint and CheckpointCoordinator coming up, which should go in first, lest we completely redo the timer patch anyways.

          I think the fix for this issue is actually very small, it simply means adding the cancellation timer to the PendingCheckpoint and cancelling it when disposing the pending checkpoint.

          My change will only go into master, so creating a patch for the release-1.2 branch should be fine.

          Show
          StephanEwen Stephan Ewen added a comment - I have a pretty big change to the PendingCheckpoint and CheckpointCoordinator coming up, which should go in first, lest we completely redo the timer patch anyways. I think the fix for this issue is actually very small, it simply means adding the cancellation timer to the PendingCheckpoint and cancelling it when disposing the pending checkpoint. My change will only go into master , so creating a patch for the release-1.2 branch should be fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3548

          FLINK-5962 [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks

            1. Bug

          Timer tasks that cancel checkpoints are not eagerly removed from the Timer when checkpoints abort/complete. This can lead to memory leaks in the presence of very frequent checkpoints.

            1. Changes
          • This converts the `Timer` to a `ScheduledThreadPoolExecutor` which has the ability to remove canceled timers from the priority queue
          • The `PendingCheckpoint` now cancels (i.e. removes) the timer when it is disposed (which also happens upon successful completion).
            1. Tests
          • Adds checks into the `CheckpointCoordinatorTest`
          • Adds a test that the timer is canceled in the `PendingCheckpoint`

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink timer_leak

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3548.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3548



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3548 FLINK-5962 [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks Bug Timer tasks that cancel checkpoints are not eagerly removed from the Timer when checkpoints abort/complete. This can lead to memory leaks in the presence of very frequent checkpoints. Changes This converts the `Timer` to a `ScheduledThreadPoolExecutor` which has the ability to remove canceled timers from the priority queue The `PendingCheckpoint` now cancels (i.e. removes) the timer when it is disposed (which also happens upon successful completion). Tests Adds checks into the `CheckpointCoordinatorTest` Adds a test that the timer is canceled in the `PendingCheckpoint` You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink timer_leak Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3548.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3548
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3548#discussion_r106208791

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -819,20 +839,25 @@ private void triggerQueuedRequests() {

          // trigger the checkpoint from the trigger timer, to finish the work of this thread before
          // starting with the next checkpoint

          • ScheduledTrigger trigger = new ScheduledTrigger();
            if (periodicScheduling) {
            if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); }
          • currentPeriodicTrigger = trigger;
          • timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
            + currentPeriodicTrigger = timer.scheduleAtFixedRate(
            + new ScheduledTrigger(),
            + 0L, baseInterval, TimeUnit.MILLISECONDS);
            }
            else {
          • timer.schedule(trigger, 0L);
            + timer.execute(new ScheduledTrigger());
              • End diff –

          Maybe we can create a singleton `ScheduledTrigger`, then we would save some object creation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3548#discussion_r106208791 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -819,20 +839,25 @@ private void triggerQueuedRequests() { // trigger the checkpoint from the trigger timer, to finish the work of this thread before // starting with the next checkpoint ScheduledTrigger trigger = new ScheduledTrigger(); if (periodicScheduling) { if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); + currentPeriodicTrigger.cancel(false); } currentPeriodicTrigger = trigger; timer.scheduleAtFixedRate(trigger, 0L, baseInterval); + currentPeriodicTrigger = timer.scheduleAtFixedRate( + new ScheduledTrigger(), + 0L, baseInterval, TimeUnit.MILLISECONDS); } else { timer.schedule(trigger, 0L); + timer.execute(new ScheduledTrigger()); End diff – Maybe we can create a singleton `ScheduledTrigger`, then we would save some object creation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3548#discussion_r106209061

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
          @@ -427,8 +446,23 @@ public void run()

          { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); + }

          + }
          + }
          +
          + private void cancelCanceller() {
          — End diff –

          two whitespaces

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3548#discussion_r106209061 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java — @@ -427,8 +446,23 @@ public void run() { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); + } + } + } + + private void cancelCanceller() { — End diff – two whitespaces
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3548#discussion_r106209461

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
          @@ -427,8 +446,23 @@ public void run()

          { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); + }

          + }
          + }
          +
          + private void cancelCanceller() {
          + try {
          + final ScheduledFuture<?> canceller = this.cancellerHandle;
          + if (canceller != null)

          { + this.cancellerHandle = null; + canceller.cancel(false); }

          }
          + catch (Exception e) {
          + // this code should not throw exceptions
          + LOG.warn("Error while cancelling checkpoint timeout task");
          — End diff –

          `e` is swallowed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3548#discussion_r106209461 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java — @@ -427,8 +446,23 @@ public void run() { discarded = true; notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); + cancelCanceller(); + } + } + } + + private void cancelCanceller() { + try { + final ScheduledFuture<?> canceller = this.cancellerHandle; + if (canceller != null) { + this.cancellerHandle = null; + canceller.cancel(false); } } + catch (Exception e) { + // this code should not throw exceptions + LOG.warn("Error while cancelling checkpoint timeout task"); — End diff – `e` is swallowed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3548#discussion_r106211648

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
          @@ -191,6 +194,22 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback)

          { this.statsCallback = checkNotNull(trackerCallback); }

          + /**
          + * Sets the handle for the canceller to this pending checkoint.
          + *
          + * @return true, if the handle was set, false, if the checkpoint is already disposed;
          + */
          + public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
          + synchronized (lock) {
          + if (!discarded) {
          + this.cancellerHandle = cancellerHandle;
          — End diff –

          I know this method will only be called once in `CheckpointCoordinator`. However, in order to make this class self contained I think it would either be good to fail if we have already set an `cancellerHandle` or cancel the old
          canceller to avoid an outdated discard call.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3548#discussion_r106211648 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java — @@ -191,6 +194,22 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) { this.statsCallback = checkNotNull(trackerCallback); } + /** + * Sets the handle for the canceller to this pending checkoint. + * + * @return true, if the handle was set, false, if the checkpoint is already disposed; + */ + public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) { + synchronized (lock) { + if (!discarded) { + this.cancellerHandle = cancellerHandle; — End diff – I know this method will only be called once in `CheckpointCoordinator`. However, in order to make this class self contained I think it would either be good to fail if we have already set an `cancellerHandle` or cancel the old canceller to avoid an outdated discard call.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3548

          Thanks for the fast review. Will fix the comments and merge!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3548 Thanks for the fast review. Will fix the comments and merge!
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in 1.2 via 9d59e008d8849cdfe2daf302e251454435bb997f

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2 via 9d59e008d8849cdfe2daf302e251454435bb997f
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3548

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3548
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.3.0 via 70252f3468916758e8bc456bbf482549c38ad7ff
          • 1.2.1 via 9d59e008d8849cdfe2daf302e251454435bb997f
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.3.0 via 70252f3468916758e8bc456bbf482549c38ad7ff 1.2.1 via 9d59e008d8849cdfe2daf302e251454435bb997f
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3679

          FLINK-5962 [runtime-web] [tests] Replace deprecated usage og JsonNo…

          This PR replaces the usage of `JsonNode#getValueAsText()` with `JsonNode#asText()` in the `JobCancellationWithSavepointHandlersTest`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5952_json_deprecated

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3679.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3679


          commit a92199efc40579d861cdbc97b717e0db7b775c9c
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-05T21:50:12Z

          FLINK-5962 [runtime-web] [tests] Replace deprecated usage og JsonNode#getValueAsText


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3679 FLINK-5962 [runtime-web] [tests] Replace deprecated usage og JsonNo… This PR replaces the usage of `JsonNode#getValueAsText()` with `JsonNode#asText()` in the `JobCancellationWithSavepointHandlersTest`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5952_json_deprecated Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3679 commit a92199efc40579d861cdbc97b717e0db7b775c9c Author: zentol <chesnay@apache.org> Date: 2017-04-05T21:50:12Z FLINK-5962 [runtime-web] [tests] Replace deprecated usage og JsonNode#getValueAsText
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3679

          Good fix, merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3679 Good fix, merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3679

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3679

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development