Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5759

Set an UncaughtExceptionHandler for all Thread Pools in JobManager

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.3.0
    • Component/s: JobManager
    • Labels:
      None

      Description

      Currently, the thread pools of the JobManager do not have any UncaughtExceptionHandler.

      While uncaught exceptions are rare (Flink handles exceptions aggressively in most places), when exceptions slip through in these threads (which execute future responses and delayed actions), the JobManager may be in an inconsistent state and not function properly any more.

      We should add a handler that results in a process kill in the case of uncaught exceptions. Letting the JobManager be restarted by the respective cluster framework is the only guaranteed way to be safe.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3290

          FLINK-5759 [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools

          Currently, the thread pools of the `JobManager` do not have any `UncaughtExceptionHandler`.

          While uncaught exceptions are rare (Flink handles exceptions aggressively in most places), when exceptions slip through in these threads (which execute future responses and delayed actions), the `JobManager` may be in an inconsistent state and not function properly any more.

          This pull request adds a handler that results in a process kill in the case of uncaught exceptions. Letting the JobManager be restarted by the respective cluster framework is the only guaranteed way to be safe.

          This also unifies the `ExecutorThreadFactory` and `NamedThreadFactory`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink uncaught_handlers

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3290.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3290


          commit 3602631353dbdf230044db7fba1890600e648101
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-02-09T13:04:17Z

          FLINK-5759 [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3290 FLINK-5759 [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools Currently, the thread pools of the `JobManager` do not have any `UncaughtExceptionHandler`. While uncaught exceptions are rare (Flink handles exceptions aggressively in most places), when exceptions slip through in these threads (which execute future responses and delayed actions), the `JobManager` may be in an inconsistent state and not function properly any more. This pull request adds a handler that results in a process kill in the case of uncaught exceptions. Letting the JobManager be restarted by the respective cluster framework is the only guaranteed way to be safe. This also unifies the `ExecutorThreadFactory` and `NamedThreadFactory`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink uncaught_handlers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3290.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3290 commit 3602631353dbdf230044db7fba1890600e648101 Author: Stephan Ewen <sewen@apache.org> Date: 2017-02-09T13:04:17Z FLINK-5759 [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100501680

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java —
          @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException {
          this.shutdownHook = createShutdownHook(this, LOG);

          this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();

          • this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
            + this.executorService = Executors.newScheduledThreadPool(10,
              • End diff –

          Is there any rational behind the magic number 10 or do we use this because it was 10 before?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100501680 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java — @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException { this.shutdownHook = createShutdownHook(this, LOG); this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>(); this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); + this.executorService = Executors.newScheduledThreadPool(10, End diff – Is there any rational behind the magic number 10 or do we use this because it was 10 before?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100502133

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java —
          @@ -216,11 +220,11 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie

          futureExecutor = Executors.newScheduledThreadPool(
          numberProcessors,

          • new NamedThreadFactory("mesos-jobmanager-future-", "thread"));
            + new ExecutorThreadFactory("mesos-jobmanager-future"));
              • End diff –

          Just wondering if 'akkaExecutor' and 'mesos-jobmanager-akka' (or 'coordinationFutureExecutor' if we want to be more general than 'akka') would carry more information for people not familiar with the code. As far as I can see, this pool is only used by Akka, whereas the name could imply that it is somehow used for general futures or even async user code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100502133 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java — @@ -216,11 +220,11 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie futureExecutor = Executors.newScheduledThreadPool( numberProcessors, new NamedThreadFactory("mesos-jobmanager-future-", " thread ")); + new ExecutorThreadFactory("mesos-jobmanager-future")); End diff – Just wondering if 'akkaExecutor' and 'mesos-jobmanager-akka' (or 'coordinationFutureExecutor' if we want to be more general than 'akka') would carry more information for people not familiar with the code. As far as I can see, this pool is only used by Akka, whereas the name could imply that it is somehow used for general futures or even async user code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100502389

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java —
          @@ -18,49 +18,112 @@

          package org.apache.flink.runtime.util;

          +import java.lang.Thread.UncaughtExceptionHandler;
          import java.util.concurrent.ThreadFactory;
          import java.util.concurrent.atomic.AtomicInteger;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +/**
          + * A thread
          — End diff –

          I think this line is either incomplete or could be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100502389 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java — @@ -18,49 +18,112 @@ package org.apache.flink.runtime.util; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A thread — End diff – I think this line is either incomplete or could be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100502245

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java —
          @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException {
          this.shutdownHook = createShutdownHook(this, LOG);

          this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();

          • this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
            + this.executorService = Executors.newScheduledThreadPool(10,
              • End diff –

          Is there any rational behind the magic number 10? Or just because it has been 10 before?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100502245 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java — @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException { this.shutdownHook = createShutdownHook(this, LOG); this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>(); this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); + this.executorService = Executors.newScheduledThreadPool(10, End diff – Is there any rational behind the magic number 10? Or just because it has been 10 before?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100511182

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java —
          @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration(
          final FiniteDuration timeout;
          try

          { timeout = AkkaUtils.getTimeout(config); - }

          catch (NumberFormatException e)

          { + }

          + catch (NumberFormatException e) {
          — End diff –

          unnecessary reformatting that complicates `git blame`

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100511182 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java — @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration( final FiniteDuration timeout; try { timeout = AkkaUtils.getTimeout(config); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { — End diff – unnecessary reformatting that complicates `git blame`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shijinkui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100516387

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java —
          @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration(
          final FiniteDuration timeout;
          try

          { timeout = AkkaUtils.getTimeout(config); - }

          catch (NumberFormatException e)

          { + }

          + catch (NumberFormatException e)

          { --- End diff -- IMO, we should have a discuss about whether needing line break behand `}

          `. enable it to checkstyle

          Show
          githubbot ASF GitHub Bot added a comment - Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100516387 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java — @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration( final FiniteDuration timeout; try { timeout = AkkaUtils.getTimeout(config); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { --- End diff -- IMO, we should have a discuss about whether needing line break behand `} `. enable it to checkstyle
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100534934

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java —
          @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException {
          this.shutdownHook = createShutdownHook(this, LOG);

          this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();

          • this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
            + this.executorService = Executors.newScheduledThreadPool(10,
              • End diff –

          This PR just did not want to change anything else than what its goal was.
          The `10` is pretty magic, though, agreed. Something relative to the number of cores seems to make more sense, intuitively.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100534934 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java — @@ -99,7 +99,8 @@ public FileCache(String[] tempDirectories) throws IOException { this.shutdownHook = createShutdownHook(this, LOG); this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>(); this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); + this.executorService = Executors.newScheduledThreadPool(10, End diff – This PR just did not want to change anything else than what its goal was. The `10` is pretty magic, though, agreed. Something relative to the number of cores seems to make more sense, intuitively.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100535249

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java —
          @@ -216,11 +220,11 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie

          futureExecutor = Executors.newScheduledThreadPool(
          numberProcessors,

          • new NamedThreadFactory("mesos-jobmanager-future-", "thread"));
            + new ExecutorThreadFactory("mesos-jobmanager-future"));
              • End diff –

          The pool is not really tied to Akka. Akka has its own threads for the actors. The JobManager actor uses the "future" pool for futures produced by the actors. The ExecutionGraph also uses that pool for some callbacks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100535249 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java — @@ -216,11 +220,11 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie futureExecutor = Executors.newScheduledThreadPool( numberProcessors, new NamedThreadFactory("mesos-jobmanager-future-", " thread ")); + new ExecutorThreadFactory("mesos-jobmanager-future")); End diff – The pool is not really tied to Akka. Akka has its own threads for the actors. The JobManager actor uses the "future" pool for futures produced by the actors. The ExecutionGraph also uses that pool for some callbacks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100535279

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java —
          @@ -18,49 +18,112 @@

          package org.apache.flink.runtime.util;

          +import java.lang.Thread.UncaughtExceptionHandler;
          import java.util.concurrent.ThreadFactory;
          import java.util.concurrent.atomic.AtomicInteger;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +/**
          + * A thread
          — End diff –

          True, incomplete, will fix that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100535279 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java — @@ -18,49 +18,112 @@ package org.apache.flink.runtime.util; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A thread — End diff – True, incomplete, will fix that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3290#discussion_r100535521

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java —
          @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration(
          final FiniteDuration timeout;
          try

          { timeout = AkkaUtils.getTimeout(config); - }

          catch (NumberFormatException e)

          { + }

          + catch (NumberFormatException e) {
          — End diff –

          Yes, that is still my old code style config. IntelliJ sometimes triggers some local reformatting.
          @shijinkui Updating the code style has been a discussion forever. To include this into the style, one would need to fix many styles. But it is ultimately a good idea to have this, agreed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3290#discussion_r100535521 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java — @@ -116,12 +116,17 @@ public static JobManagerServices fromConfiguration( final FiniteDuration timeout; try { timeout = AkkaUtils.getTimeout(config); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { — End diff – Yes, that is still my old code style config. IntelliJ sometimes triggers some local reformatting. @shijinkui Updating the code style has been a discussion forever. To include this into the style, one would need to fix many styles. But it is ultimately a good idea to have this, agreed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3290

          Addressing the comments and merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3290 Addressing the comments and merging this...
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in ef77c254dadbe4c04810681fe765f5ec7d2a7400

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in ef77c254dadbe4c04810681fe765f5ec7d2a7400
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3290

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3290

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development