Details

    • Release Note:
      Hide
      Fixed in
        - 1.4.0 via 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e
        - 1.3.2 via 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e

      1.3.2 needed to be fixed because this was a blocker for a critical bug fix
      Show
      Fixed in   - 1.4.0 via 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e   - 1.3.2 via 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e 1.3.2 needed to be fixed because this was a blocker for a critical bug fix

      Description

      Currently, the RestartStrategy is called when the ExecutionGraph should be restarted.

      To facilitate delays before restarting, the strategy simply sleeps, blocking the thread that runs the ExecutionGraph's recovery method.

      I suggest to pass ScheduledExecutorService) to the RestartStrategy and let it schedule the restart call that way, avoiding any sleeps.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel closed the pull request at:

          https://github.com/apache/flink/pull/4220

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4220
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/4220

          @StephanEwen Thank you for your merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4220 @StephanEwen Thank you for your merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4220

          Thank you for the contribution!
          The code form this PR has been improved and merged in this commit: 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e

          If you agree, could you close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4220 Thank you for the contribution! The code form this PR has been improved and merged in this commit: 65400bd0a0f6a3bdd3e0bad05e2179534eaf6e9e If you agree, could you close this PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4220

          I have taken and adapted the code for the patch in #4364

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4220 I have taken and adapted the code for the patch in #4364
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/4220

          @StephanEwen Thank you for your suggestion, I think you raise an important problem about this issue. I agree with you, I will pick FLINK-6667(https://issues.apache.org/jira/browse/FLINK-6667) and try to fix it before this PR. Thanks cc @tillrohrmann

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4220 @StephanEwen Thank you for your suggestion, I think you raise an important problem about this issue. I agree with you, I will pick FLINK-6667 ( https://issues.apache.org/jira/browse/FLINK-6667 ) and try to fix it before this PR. Thanks cc @tillrohrmann
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4220

          We have to also consider anther aspect: Changing this from a synchronous call to an asynchronous callback makes new types of races possible, against which we need to guard.

          We need to make sure that the `restart()` call cannot succeed if there was another cycle of failure that goes into `RESTARTING`.

          I would suggest to address https://issues.apache.org/jira/browse/FLINK-6667 first, so that the callback can check that the `globalModVersion` is unchanged upon restart. Then there is no anger in moving this to an asynchronous callback.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4220 We have to also consider anther aspect: Changing this from a synchronous call to an asynchronous callback makes new types of races possible, against which we need to guard. We need to make sure that the `restart()` call cannot succeed if there was another cycle of failure that goes into `RESTARTING`. I would suggest to address https://issues.apache.org/jira/browse/FLINK-6667 first, so that the callback can check that the `globalModVersion` is unchanged upon restart. Then there is no anger in moving this to an asynchronous callback.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/4220

          @tillrohrmann Thank you for your review, I have remove `restart(ExecutionGraph executionGraph)`, and use `ScheduledExecutor` instead of `ScheduledExecutorService` you metioned. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4220 @tillrohrmann Thank you for your review, I have remove `restart(ExecutionGraph executionGraph)`, and use `ScheduledExecutor` instead of `ScheduledExecutorService` you metioned. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4220#discussion_r125165262

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java —
          @@ -42,4 +48,23 @@ public Object call() throws Exception {
          }
          };
          }
          -}
          \ No newline at end of file
          +
          + public static void scheduleRestartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis,
          + final ScheduledExecutorService executorService) {
          + ScheduledFuture<Object> future = executorService.schedule(
          + new Callable<Object>() {
          + @Override
          + public Object call() throws Exception

          { + executionGraph.restart(); + return null; + }

          + }, delayBetweenRestartAttemptsInMillis, TimeUnit.MILLISECONDS
          + );
          +
          + try {
          + future.get();
          — End diff –

          This `get()` operation voids the idea of the executor service because it blocks the calling thread.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4220#discussion_r125165262 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java — @@ -42,4 +48,23 @@ public Object call() throws Exception { } }; } -} \ No newline at end of file + + public static void scheduleRestartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis, + final ScheduledExecutorService executorService) { + ScheduledFuture<Object> future = executorService.schedule( + new Callable<Object>() { + @Override + public Object call() throws Exception { + executionGraph.restart(); + return null; + } + }, delayBetweenRestartAttemptsInMillis, TimeUnit.MILLISECONDS + ); + + try { + future.get(); — End diff – This `get()` operation voids the idea of the executor service because it blocks the calling thread.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4220#discussion_r125165267

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java —
          @@ -42,4 +48,23 @@ public Object call() throws Exception {
          }
          };
          }
          -}
          \ No newline at end of file
          +
          + public static void scheduleRestartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis,
          + final ScheduledExecutorService executorService) {
          — End diff –

          can we call `delayBetweenRestartAttemptsInMillis` simply `delay`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4220#discussion_r125165267 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java — @@ -42,4 +48,23 @@ public Object call() throws Exception { } }; } -} \ No newline at end of file + + public static void scheduleRestartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis, + final ScheduledExecutorService executorService) { — End diff – can we call `delayBetweenRestartAttemptsInMillis` simply `delay`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4220#discussion_r125165228

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java —
          @@ -23,9 +23,15 @@
          import org.slf4j.LoggerFactory;

          import java.util.concurrent.Callable;
          +import java.util.concurrent.ScheduledExecutorService;
          +import java.util.concurrent.ScheduledFuture;
          +import java.util.concurrent.TimeUnit;

          class ExecutionGraphRestarter {
          private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
          +
          + /** @deprecated Using

          {@link #scheduleRestartWithDelay(ExecutionGraph, long, ScheduledExecutorService)}

          instead. */
          + @Deprecated
          — End diff –

          Can we remove this method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4220#discussion_r125165228 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java — @@ -23,9 +23,15 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; class ExecutionGraphRestarter { private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class); + + /** @deprecated Using {@link #scheduleRestartWithDelay(ExecutionGraph, long, ScheduledExecutorService)} instead. */ + @Deprecated — End diff – Can we remove this method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4220#discussion_r125165310

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java —
          @@ -36,6 +38,15 @@

          • Restarts the given {@link ExecutionGraph}.
            *
            * @param executionGraph The ExecutionGraph to be restarted
            + * @deprecated Use {@link #restart(ExecutionGraph, ScheduledExecutorService)} instead.
            */
            + @Deprecated
            void restart(ExecutionGraph executionGraph);
            +
            + /**
            + * Schedule the restart call of {@link ExecutionGraph}

            with the given

            {@link ScheduledExecutorService}

            .
            + * @param executionGraph The ExecutionGraph to be restarted
            + * @param executorService The ScheduledExecutorService to schedule the restart call
            + */
            + void restart(ExecutionGraph executionGraph, ScheduledExecutorService executorService);

              • End diff –

          Let's use a `ScheduledExecutor` here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4220#discussion_r125165310 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java — @@ -36,6 +38,15 @@ Restarts the given {@link ExecutionGraph}. * * @param executionGraph The ExecutionGraph to be restarted + * @deprecated Use {@link #restart(ExecutionGraph, ScheduledExecutorService)} instead. */ + @Deprecated void restart(ExecutionGraph executionGraph); + + /** + * Schedule the restart call of {@link ExecutionGraph} with the given {@link ScheduledExecutorService} . + * @param executionGraph The ExecutionGraph to be restarted + * @param executorService The ScheduledExecutorService to schedule the restart call + */ + void restart(ExecutionGraph executionGraph, ScheduledExecutorService executorService); End diff – Let's use a `ScheduledExecutor` here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4220#discussion_r125165302

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java —
          @@ -36,6 +38,15 @@

          • Restarts the given {@link ExecutionGraph}

            .
            *

          • @param executionGraph The ExecutionGraph to be restarted
            + * @deprecated Use {@link #restart(ExecutionGraph, ScheduledExecutorService)}

            instead.
            */
            + @Deprecated
            void restart(ExecutionGraph executionGraph);

              • End diff –

          Let's remove this method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4220#discussion_r125165302 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java — @@ -36,6 +38,15 @@ Restarts the given {@link ExecutionGraph} . * @param executionGraph The ExecutionGraph to be restarted + * @deprecated Use {@link #restart(ExecutionGraph, ScheduledExecutorService)} instead. */ + @Deprecated void restart(ExecutionGraph executionGraph); End diff – Let's remove this method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zjureel opened a pull request:

          https://github.com/apache/flink/pull/4220

          FLINK-6665 Pass a ScheduledExecutorService to the RestartStrategy

          … avoid any sleeps.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zjureel/flink FLINK-6665

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4220.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4220


          commit 8de578ed761d79969c6ceaf5f696dc926396f137
          Author: zjureel <zjureel@gmail.com>
          Date: 2017-06-29T07:10:34Z

          FLINK-6665 schedule the restart call by ScheduledExecutorService to avoid any sleeps.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4220 FLINK-6665 Pass a ScheduledExecutorService to the RestartStrategy … avoid any sleeps. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6665 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4220.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4220 commit 8de578ed761d79969c6ceaf5f696dc926396f137 Author: zjureel <zjureel@gmail.com> Date: 2017-06-29T07:10:34Z FLINK-6665 schedule the restart call by ScheduledExecutorService to avoid any sleeps.

            People

            • Assignee:
              zjureel Fang Yong
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development