Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5748

Make the ExecutionGraph's FutureExecutor a ScheduledExecutionService

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      To handle timeouts and other scheduled actions more efficiently, the ExecutionGraph should use a ScheduledExecutorService for its futures and callbacks.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3289

          FLINK-5748 [jobmanager] Make the 'future executor' a ScheduledExecutorService

          This pull request changes the JobManager's Executor for Futures and Callbacks to be a `ScheduledExecutorService`. That is needed for more efficient handling of scheduled actions, like delayed restarts, timeouts, etc.

          The main changes in this pull request are

          1. Adjusting the setup logic of the JobManager (and it derived classes for Yarn / Mesos / ...) to use a ScheduledExecutorService

          2. Adjusting all the tests that manually create a JobManager to create a `ScheduledExecutor` instead
          of an `Executor`. There are a lot of tests that needed to be touched...

              1. Tests

          There are some tests that relied on that fact that the future executor is a direct executor that synchronously invokes the `Runnable`. This PR hence adds a `ScheduledDirectExecutorService` that executes immediately executable tasks synchronously, but supports scheduling of tasks as well (which will naturally not be executed synchronously).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink scheduled_future_executor

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3289.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3289


          commit bb05c5cd633a5e978defdd70a43090ba60624e02
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-02-08T19:51:46Z

          FLINK-5748 [jobmanager] Make the 'future executor' a ScheduledExecutorService


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3289 FLINK-5748 [jobmanager] Make the 'future executor' a ScheduledExecutorService This pull request changes the JobManager's Executor for Futures and Callbacks to be a `ScheduledExecutorService`. That is needed for more efficient handling of scheduled actions, like delayed restarts, timeouts, etc. The main changes in this pull request are 1. Adjusting the setup logic of the JobManager (and it derived classes for Yarn / Mesos / ...) to use a ScheduledExecutorService 2. Adjusting all the tests that manually create a JobManager to create a `ScheduledExecutor` instead of an `Executor`. There are a lot of tests that needed to be touched... Tests There are some tests that relied on that fact that the future executor is a direct executor that synchronously invokes the `Runnable`. This PR hence adds a `ScheduledDirectExecutorService` that executes immediately executable tasks synchronously, but supports scheduling of tasks as well (which will naturally not be executed synchronously). You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink scheduled_future_executor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3289.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3289 commit bb05c5cd633a5e978defdd70a43090ba60624e02 Author: Stephan Ewen <sewen@apache.org> Date: 2017-02-08T19:51:46Z FLINK-5748 [jobmanager] Make the 'future executor' a ScheduledExecutorService
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3289#discussion_r100303560

          — Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala —
          @@ -384,12 +434,12 @@ object TestingUtils {

          • @return
            */
            def createJobManager(
          • actorSystem: ActorSystem,
          • futureExecutor: Executor,
          • ioExecutor: Executor,
          • configuration: Configuration,
          • jobManagerClass: Class[_ <: JobManager],
          • prefix: String)
            + actorSystem: ActorSystem,
            + futureExecutor: ScheduledExecutorService,
            + ioExecutor: Executor,
            + configuration: Configuration,
            + jobManagerClass: Class[_ <: JobManager],
            + prefix: String)
              • End diff –

          I think the standard in Scala is double indentation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3289#discussion_r100303560 — Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala — @@ -384,12 +434,12 @@ object TestingUtils { @return */ def createJobManager( actorSystem: ActorSystem, futureExecutor: Executor, ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class [_ <: JobManager] , prefix: String) + actorSystem: ActorSystem, + futureExecutor: ScheduledExecutorService, + ioExecutor: Executor, + configuration: Configuration, + jobManagerClass: Class [_ <: JobManager] , + prefix: String) End diff – I think the standard in Scala is double indentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3289#discussion_r100304013

          — Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala —
          @@ -384,12 +434,12 @@ object TestingUtils {

          • @return
            */
            def createJobManager(
          • actorSystem: ActorSystem,
          • futureExecutor: Executor,
          • ioExecutor: Executor,
          • configuration: Configuration,
          • jobManagerClass: Class[_ <: JobManager],
          • prefix: String)
            + actorSystem: ActorSystem,
            + futureExecutor: ScheduledExecutorService,
            + ioExecutor: Executor,
            + configuration: Configuration,
            + jobManagerClass: Class[_ <: JobManager],
            + prefix: String)
              • End diff –

          yeah, I think that way an IDE auto format action. Never figured out how to turn that off. Will undo that...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3289#discussion_r100304013 — Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala — @@ -384,12 +434,12 @@ object TestingUtils { @return */ def createJobManager( actorSystem: ActorSystem, futureExecutor: Executor, ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class [_ <: JobManager] , prefix: String) + actorSystem: ActorSystem, + futureExecutor: ScheduledExecutorService, + ioExecutor: Executor, + configuration: Configuration, + jobManagerClass: Class [_ <: JobManager] , + prefix: String) End diff – yeah, I think that way an IDE auto format action. Never figured out how to turn that off. Will undo that...
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed via 665c7e399928188b22a7963cc05654589d47941c

          Show
          StephanEwen Stephan Ewen added a comment - Fixed via 665c7e399928188b22a7963cc05654589d47941c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3289

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3289

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development