Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4715

TaskManager should commit suicide after cancellation failure

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.1.4
    • Labels:
      None

      Description

      In case of a failed cancellation, e.g. the task cannot be cancelled after a given time, the TaskManager should kill itself. That way we guarantee that there is no resource leak.

      This behaviour acts as a safety-net against faulty user code.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2652

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2652
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in cc6655b (release-1.1), 27fd249 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in cc6655b (release-1.1), 27fd249 (master).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          OK thanks. I addressed one last issue with shutting down the watch dog thread (it was lingering around sleeping and only noticed that the task has terminated after that sleep... now the task canceler interrupts the watchdog when everything works as expected and the watch dog thread terminates early).

          I've also backported this to the `release-1.1` branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 OK thanks. I addressed one last issue with shutting down the watch dog thread (it was lingering around sleeping and only noticed that the task has terminated after that sleep... now the task canceler interrupts the watchdog when everything works as expected and the watch dog thread terminates early). I've also backported this to the `release-1.1` branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2652

          +1 go ahead

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 +1 go ahead
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          OK, I would like to go ahead and merge this if there are no objections.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 OK, I would like to go ahead and merge this if there are no objections.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2652

          Concerning the Kafka test: From the logs, the test fails because a topic cannot be deleted. The ZooKeeper operation blocks and test times out. I am pretty sure that this is unrelated, as no Flink is running at that point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Concerning the Kafka test: From the logs, the test fails because a topic cannot be deleted. The ZooKeeper operation blocks and test times out. I am pretty sure that this is unrelated, as no Flink is running at that point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          Travis gives the green light except for a Kafka failure. @StephanEwen @rmetzger Do you know whether this is a known issue? Or might it be a regression from this change? https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Travis gives the green light except for a Kafka failure. @StephanEwen @rmetzger Do you know whether this is a known issue? Or might it be a regression from this change? https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          Haha, that picture convinced me to actually add the test :smile:

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Haha, that picture convinced me to actually add the test :smile:
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2652

          Yeah, this sort of covers it. Just afraid of such a situation here: https://twitter.com/thepracticaldev/status/687672086152753152

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Yeah, this sort of covers it. Just afraid of such a situation here: https://twitter.com/thepracticaldev/status/687672086152753152
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          We have the `TaskManagerProcessReapingTest` which tests that the TaskManager process properly exits when the TaskManager actor dies. In addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests that the `FatalError` message terminates the actor.

          Do you think we are already covered by this? We can certainly add a process reaper test variant that sends a `FatalError` message instead of the `PoisonPill`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 We have the `TaskManagerProcessReapingTest` which tests that the TaskManager process properly exits when the TaskManager actor dies. In addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests that the `FatalError` message terminates the actor. Do you think we are already covered by this? We can certainly add a process reaper test variant that sends a `FatalError` message instead of the `PoisonPill`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2652

          What do you think about a followup test, where we ensure that a fatal error notification on the TaskManager actually results in a process kill?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 What do you think about a followup test, where we ensure that a fatal error notification on the TaskManager actually results in a process kill?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can easily migrate the task manager options as a follow up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can easily migrate the task manager options as a follow up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83842005

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Tests that interrupt happens via watch dog if canceller is stuck in cancel.
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testWatchDogInterruptsTask() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableBlockingInCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          +
          + // No fatal error
          + for (Object msg : taskManagerMessages)

          { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + }

          + }
          +
          + /**
          + * The invoke() method holds a lock (trigger awaitLatch after acquisition)
          + * and cancel cannot complete because it also tries to acquire the same lock.
          + * This is resolved by the watch dog, no fatal error.
          + */
          + @Test
          + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          +
          + // No fatal error
          + for (Object msg : taskManagerMessages) {
          + assertEquals(false, msg instanceof TaskManagerMessages.FatalError);
          — End diff –

          You can also give a message to `assertFalse` - I like assertEquals for printing the expected value, but if the expected value is false, the former seems more natural to me...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83842005 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Tests that interrupt happens via watch dog if canceller is stuck in cancel. + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** + * The invoke() method holds a lock (trigger awaitLatch after acquisition) + * and cancel cannot complete because it also tries to acquire the same lock. + * This is resolved by the watch dog, no fatal error. + */ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); — End diff – You can also give a message to `assertFalse` - I like assertEquals for printing the expected value, but if the expected value is false, the former seems more natural to me...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83841451

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1299,124 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + logger.error("Error in the task canceler", t); + }

          + }
          + }
          +
          + /**
          + * Watchdog for the cancellation. If the task is stuck in cancellation,
          + * we notify the task manager about a fatal error.
          + */
          + private static class TaskCancellationWatchDog extends TimerTask {
          +
          + /**
          + * Pass logger in order to prevent that the compiler needs to inject static bridge methods
          + * to access it.
          + */
          + private final Logger logger;
          +
          + /** Thread executing the Task. */
          + private final Thread executor;
          +
          + /** Interrupt interval. */
          + private final long interruptInterval;
          +
          + /** Timeout after which a fatal error notification happens. */
          + private final long interruptTimeout;
          +
          + /** TaskManager to notify about a timeout */
          + private final TaskManagerConnection taskManager;
          +
          + /** Task name (for logging and error messages). */
          + private final String taskName;
          +
          + /** Synchronization with the

          {@link TaskCanceler}

          thread. */
          + private final CountDownLatch taskCancellerLatch;
          +
          + public TaskCancellationWatchDog(
          + Logger logger,
          + Thread executor,
          + long interruptInterval,
          + long interruptTimeout,
          + TaskManagerConnection taskManager,
          + String taskName,
          + CountDownLatch taskCancellerLatch)

          { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

          +
          + @Override
          + public void run() {
          + try

          { + // Synchronize with task canceler + taskCancellerLatch.await(); + }

          catch (Exception e)

          { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + }

          +
          + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
          + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
          + long deadline = System.nanoTime() + timeoutNanos;
          +
          + try

          { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + }

          catch (InterruptedException ignored)

          { + }

          +
          + // It is possible that the user code does not react to the task canceller.
          + // for that reason, we spawn this separate thread that repeatedly interrupts
          + // the user code until it exits. If the suer user code does not exit within
          + // the timeout, we notify the job manager about a fatal error.
          + while (executor.isAlive()) {
          + long now = System.nanoTime();
          +
          + // build the stack trace of where the thread is stuck, for the log
          + StringBuilder bld = new StringBuilder();
          + StackTraceElement[] stack = executor.getStackTrace();
          + for (StackTraceElement e : stack)

          { + bld.append(e).append('\n'); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + if (now >= deadline)

            { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); + + taskManager.notifyFatalError(msg, null); + return; // done, don't forget to leave the loop + }

            else {
            logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            taskName, bld.toString());

          • executer.interrupt();
            + executor.interrupt();
            try { - executer.join(taskCancellationIntervalMillis); - }
          • catch (InterruptedException e) {
          • // we can ignore this
            + long timeLeftNanos = Math.min(intervalNanos, deadline - now - intervalNanos);
              • End diff –

          No, your suggestion is correct.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83841451 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** + * Pass logger in order to prevent that the compiler needs to inject static bridge methods + * to access it. + */ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executor.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + if (now >= deadline) { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); + + taskManager.notifyFatalError(msg, null); + return; // done, don't forget to leave the loop + } else { logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", taskName, bld.toString()); executer.interrupt(); + executor.interrupt(); try { - executer.join(taskCancellationIntervalMillis); - } catch (InterruptedException e) { // we can ignore this + long timeLeftNanos = Math.min(intervalNanos, deadline - now - intervalNanos); End diff – No, your suggestion is correct.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83840872

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Tests that interrupt happens via watch dog if canceller is stuck in cancel.
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testWatchDogInterruptsTask() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableBlockingInCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          +
          + // No fatal error
          + for (Object msg : taskManagerMessages)

          { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + }

          + }
          +
          + /**
          + * The invoke() method holds a lock (trigger awaitLatch after acquisition)
          + * and cancel cannot complete because it also tries to acquire the same lock.
          + * This is resolved by the watch dog, no fatal error.
          + */
          + @Test
          + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          +
          + // No fatal error
          + for (Object msg : taskManagerMessages) {
          + assertEquals(false, msg instanceof TaskManagerMessages.FatalError);
          — End diff –

          `assertFalse` only fails with printing `AssertionError` and you have to check the stack trace whereas `assertEquals` has a message printing expected and actual inputs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840872 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Tests that interrupt happens via watch dog if canceller is stuck in cancel. + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** + * The invoke() method holds a lock (trigger awaitLatch after acquisition) + * and cancel cannot complete because it also tries to acquire the same lock. + * This is resolved by the watch dog, no fatal error. + */ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); — End diff – `assertFalse` only fails with printing `AssertionError` and you have to check the stack trace whereas `assertEquals` has a message printing expected and actual inputs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83840545

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Tests that interrupt happens via watch dog if canceller is stuck in cancel.
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testWatchDogInterruptsTask() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableBlockingInCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          +
          + // No fatal error
          + for (Object msg : taskManagerMessages)

          { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + }

          + }
          +
          + /**
          + * The invoke() method holds a lock (trigger awaitLatch after acquisition)
          + * and cancel cannot complete because it also tries to acquire the same lock.
          + * This is resolved by the watch dog, no fatal error.
          + */
          + @Test
          + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
          + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
          +
          + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + triggerLatch.await();
          — End diff –

          how about adding `task.getExecutingThread().join()` instead of the using the trigger latch? Seems more intuitive and safer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840545 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Tests that interrupt happens via watch dog if canceller is stuck in cancel. + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** + * The invoke() method holds a lock (trigger awaitLatch after acquisition) + * and cancel cannot complete because it also tries to acquire the same lock. + * This is resolved by the watch dog, no fatal error. + */ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); — End diff – how about adding `task.getExecutingThread().join()` instead of the using the trigger latch? Seems more intuitive and safer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83839720

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1299,124 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + logger.error("Error in the task canceler", t); + }

          + }
          + }
          +
          + /**
          + * Watchdog for the cancellation. If the task is stuck in cancellation,
          + * we notify the task manager about a fatal error.
          + */
          + private static class TaskCancellationWatchDog extends TimerTask {
          +
          + /**
          + * Pass logger in order to prevent that the compiler needs to inject static bridge methods
          + * to access it.
          + */
          + private final Logger logger;
          +
          + /** Thread executing the Task. */
          + private final Thread executor;
          +
          + /** Interrupt interval. */
          + private final long interruptInterval;
          +
          + /** Timeout after which a fatal error notification happens. */
          + private final long interruptTimeout;
          +
          + /** TaskManager to notify about a timeout */
          + private final TaskManagerConnection taskManager;
          +
          + /** Task name (for logging and error messages). */
          + private final String taskName;
          +
          + /** Synchronization with the

          {@link TaskCanceler}

          thread. */
          + private final CountDownLatch taskCancellerLatch;
          +
          + public TaskCancellationWatchDog(
          + Logger logger,
          + Thread executor,
          + long interruptInterval,
          + long interruptTimeout,
          + TaskManagerConnection taskManager,
          + String taskName,
          + CountDownLatch taskCancellerLatch)

          { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

          +
          + @Override
          + public void run() {
          + try

          { + // Synchronize with task canceler + taskCancellerLatch.await(); + }

          catch (Exception e)

          { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + }

          +
          + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
          + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
          + long deadline = System.nanoTime() + timeoutNanos;
          +
          + try

          { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + }

          catch (InterruptedException ignored)

          { + }

          +
          + // It is possible that the user code does not react to the task canceller.
          + // for that reason, we spawn this separate thread that repeatedly interrupts
          + // the user code until it exits. If the suer user code does not exit within
          + // the timeout, we notify the job manager about a fatal error.
          + while (executor.isAlive()) {
          + long now = System.nanoTime();
          +
          + // build the stack trace of where the thread is stuck, for the log
          + StringBuilder bld = new StringBuilder();
          + StackTraceElement[] stack = executor.getStackTrace();
          + for (StackTraceElement e : stack)

          { + bld.append(e).append('\n'); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + if (now >= deadline)

            { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); + + taskManager.notifyFatalError(msg, null); + return; // done, don't forget to leave the loop + }

            else {
            logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            taskName, bld.toString());

          • executer.interrupt();
            + executor.interrupt();
            try { - executer.join(taskCancellationIntervalMillis); - }
          • catch (InterruptedException e) {
          • // we can ignore this
            + long timeLeftNanos = Math.min(intervalNanos, deadline - now - intervalNanos);
              • End diff –

          Is this line correct? Should it not be `Math.min(intervalNanos, deadline - now);

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83839720 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** + * Pass logger in order to prevent that the compiler needs to inject static bridge methods + * to access it. + */ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executor.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + if (now >= deadline) { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); + + taskManager.notifyFatalError(msg, null); + return; // done, don't forget to leave the loop + } else { logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", taskName, bld.toString()); executer.interrupt(); + executor.interrupt(); try { - executer.join(taskCancellationIntervalMillis); - } catch (InterruptedException e) { // we can ignore this + long timeLeftNanos = Math.min(intervalNanos, deadline - now - intervalNanos); End diff – Is this line correct? Should it not be `Math.min(intervalNanos, deadline - now);
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2652

          Should we have one class `TaskManagerOptions`? To not spread the config over too many classes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Should we have one class `TaskManagerOptions`? To not spread the config over too many classes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2652

          Thanks for the valuable feedback. Some of the errors were a little sloppy on my side. Sorry for that. I addressed all your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Thanks for the valuable feedback. Some of the errors were a little sloppy on my side. Sorry for that. I addressed all your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83824898

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testTaskCancelWatchDog() throws Exception

          { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + }

          +
          + @Test
          + public void testReportFatalErrorAfterCancellationTimeout() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
          + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
          +
          + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + for (int i = 0; i < 10; i++) {
          + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS);
          + if (msg instanceof TaskManagerMessages.FatalError)

          { + System.out.println(msg); + return; // success + }

          + }
          +
          + fail("Did not receive expected task manager message");
          — End diff –

          Good catch. Will let the test thread let it shut down.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83824898 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testTaskCancelWatchDog() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + } + + @Test + public void testReportFatalErrorAfterCancellationTimeout() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200); + + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + for (int i = 0; i < 10; i++) { + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS); + if (msg instanceof TaskManagerMessages.FatalError) { + System.out.println(msg); + return; // success + } + } + + fail("Did not receive expected task manager message"); — End diff – Good catch. Will let the test thread let it shut down.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83824787

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");
            + }
            + }
            +
            + /**
            + * Watchdog for the cancellation. If the task is stuck in cancellation,
            + * we notify the task manager about a fatal error.
            + */
            + private static class TaskCancellationWatchDog extends TimerTask {
            +
            + /** Thread executing the Task. */
            + private final Thread executor;
            +
            + /** Interrupt interval. */
            + private final long interruptInterval;
            +
            + /** Timeout after which a fatal error notification happens. */
            + private final long interruptTimeout;
            +
            + /** TaskManager to notify about a timeout */
            + private final TaskManagerConnection taskManager;

          • logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            + /** Task name (for logging and error messages). */
            + private final String taskName;
            +
            + /** Synchronization with the {@link TaskCanceler}

            thread. */
            + private final CountDownLatch taskCancellerLatch;
            +
            + public TaskCancellationWatchDog(
            + Thread executor,
            + long interruptInterval,
            + long interruptTimeout,
            + TaskManagerConnection taskManager,
            + String taskName,
            + CountDownLatch taskCancellerLatch)

            { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

            +
            + @Override
            + public void run() {
            + try

            Unknown macro: { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return + } + }

            catch (InterruptedException e)

            { + return; + }

            +
            + long deadline = System.currentTimeMillis() + interruptTimeout;

              • End diff –

          OK, will change

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83824787 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try Unknown macro: { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return + } + } catch (InterruptedException e) { + return; + } + + long deadline = System.currentTimeMillis() + interruptTimeout; End diff – OK, will change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83824750

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");
            + }
            + }
            +
            + /**
            + * Watchdog for the cancellation. If the task is stuck in cancellation,
            + * we notify the task manager about a fatal error.
            + */
            + private static class TaskCancellationWatchDog extends TimerTask {
            +
            + /** Thread executing the Task. */
            + private final Thread executor;
            +
            + /** Interrupt interval. */
            + private final long interruptInterval;
            +
            + /** Timeout after which a fatal error notification happens. */
            + private final long interruptTimeout;
            +
            + /** TaskManager to notify about a timeout */
            + private final TaskManagerConnection taskManager;

          • logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            + /** Task name (for logging and error messages). */
            + private final String taskName;
            +
            + /** Synchronization with the {@link TaskCanceler}

            thread. */
            + private final CountDownLatch taskCancellerLatch;
            +
            + public TaskCancellationWatchDog(
            + Thread executor,
            + long interruptInterval,
            + long interruptTimeout,
            + TaskManagerConnection taskManager,
            + String taskName,
            + CountDownLatch taskCancellerLatch)

            { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

            +
            + @Override
            + public void run() {
            + try {
            + // Synchronize with task canceler
            + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) {
            + return; // Did not return

              • End diff –

          Leftover, changed to trigger fatal error as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83824750 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return End diff – Leftover, changed to trigger fatal error as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83824704

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");

              • End diff –

          Yes, removed

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83824704 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); End diff – Yes, removed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83800506

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");

              • End diff –

          Leftover?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800506 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); End diff – Leftover?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83800264

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -289,6 +294,10 @@ public Task(
          ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
          ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);

          + this.taskCancellationTimeout = jobConfiguration.getLong(
          — End diff –

          I think the `jobConfiguration` is the wrong configuration here. It should probably be the TaskManager's configuration in the `taskManagerConfig`. Same probably holds for the cancellation interval above.

          Seems neither were ever tested?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800264 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -289,6 +294,10 @@ public Task( ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS); + this.taskCancellationTimeout = jobConfiguration.getLong( — End diff – I think the `jobConfiguration` is the wrong configuration here. It should probably be the TaskManager's configuration in the `taskManagerConfig`. Same probably holds for the cancellation interval above. Seems neither were ever tested?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83800845

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");
            + }
            + }
            +
            + /**
            + * Watchdog for the cancellation. If the task is stuck in cancellation,
            + * we notify the task manager about a fatal error.
            + */
            + private static class TaskCancellationWatchDog extends TimerTask {
            +
            + /** Thread executing the Task. */
            + private final Thread executor;
            +
            + /** Interrupt interval. */
            + private final long interruptInterval;
            +
            + /** Timeout after which a fatal error notification happens. */
            + private final long interruptTimeout;
            +
            + /** TaskManager to notify about a timeout */
            + private final TaskManagerConnection taskManager;

          • logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            + /** Task name (for logging and error messages). */
            + private final String taskName;
            +
            + /** Synchronization with the {@link TaskCanceler}

            thread. */
            + private final CountDownLatch taskCancellerLatch;
            +
            + public TaskCancellationWatchDog(
            + Thread executor,
            + long interruptInterval,
            + long interruptTimeout,
            + TaskManagerConnection taskManager,
            + String taskName,
            + CountDownLatch taskCancellerLatch)

            { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

            +
            + @Override
            + public void run() {
            + try

            Unknown macro: { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return + } + }

            catch (InterruptedException e)

            { + return; + }

            +
            + long deadline = System.currentTimeMillis() + interruptTimeout;

              • End diff –

          Using `System.nanoTime()` is more stable than `System.currentTimeMillis()`. Would be good to use, especially if we are dealing with timeouts that want to kill the process.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800845 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try Unknown macro: { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return + } + } catch (InterruptedException e) { + return; + } + + long deadline = System.currentTimeMillis() + interruptTimeout; End diff – Using `System.nanoTime()` is more stable than `System.currentTimeMillis()`. Would be good to use, especially if we are dealing with timeouts that want to kill the process.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83803660

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testTaskCancelWatchDog() throws Exception

          { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + }

          +
          + @Test
          + public void testReportFatalErrorAfterCancellationTimeout() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
          + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
          +
          + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + for (int i = 0; i < 10; i++) {
          + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS);
          + if (msg instanceof TaskManagerMessages.FatalError)

          { + System.out.println(msg); + return; // success + }

          + }
          +
          + fail("Did not receive expected task manager message");
          — End diff –

          Does this test leave a lingering endlessly looped execution thread?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83803660 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testTaskCancelWatchDog() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + } + + @Test + public void testReportFatalErrorAfterCancellationTimeout() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200); + + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + for (int i = 0; i < 10; i++) { + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS); + if (msg instanceof TaskManagerMessages.FatalError) { + System.out.println(msg); + return; // success + } + } + + fail("Did not receive expected task manager message"); — End diff – Does this test leave a lingering endlessly looped execution thread?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83800757

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1251,33 +1289,113 @@ public void run() {
          catch (InterruptedException e)

          { // we can ignore this }

          + }
          + catch (Throwable t)

          { + LOG.error("Error in the task canceler", t); + }
          • // it is possible that the user code does not react immediately. for that
          • // reason, we spawn a separate thread that repeatedly interrupts the user code until
          • // it exits
          • while (executer.isAlive()) {
          • // build the stack trace of where the thread is stuck, for the log
          • StringBuilder bld = new StringBuilder();
          • StackTraceElement[] stack = executer.getStackTrace();
          • for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - }

            + System.out.println("Canceler done");
            + }
            + }
            +
            + /**
            + * Watchdog for the cancellation. If the task is stuck in cancellation,
            + * we notify the task manager about a fatal error.
            + */
            + private static class TaskCancellationWatchDog extends TimerTask {
            +
            + /** Thread executing the Task. */
            + private final Thread executor;
            +
            + /** Interrupt interval. */
            + private final long interruptInterval;
            +
            + /** Timeout after which a fatal error notification happens. */
            + private final long interruptTimeout;
            +
            + /** TaskManager to notify about a timeout */
            + private final TaskManagerConnection taskManager;

          • logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}",
            + /** Task name (for logging and error messages). */
            + private final String taskName;
            +
            + /** Synchronization with the {@link TaskCanceler}

            thread. */
            + private final CountDownLatch taskCancellerLatch;
            +
            + public TaskCancellationWatchDog(
            + Thread executor,
            + long interruptInterval,
            + long interruptTimeout,
            + TaskManagerConnection taskManager,
            + String taskName,
            + CountDownLatch taskCancellerLatch)

            { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + }

            +
            + @Override
            + public void run() {
            + try {
            + // Synchronize with task canceler
            + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) {
            + return; // Did not return

              • End diff –

          I think returning early here sort of defeats the idea of the watch dog...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800757 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1251,33 +1289,113 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + LOG.error("Error in the task canceler", t); + } // it is possible that the user code does not react immediately. for that // reason, we spawn a separate thread that repeatedly interrupts the user code until // it exits while (executer.isAlive()) { // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + System.out.println("Canceler done"); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + if (!taskCancellerLatch.await(interruptTimeout, TimeUnit.MILLISECONDS)) { + return; // Did not return End diff – I think returning early here sort of defeats the idea of the watch dog...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83800396

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -1183,47 +1220,48 @@ public String toString() {
          */
          private static class TaskCanceler implements Runnable {

          • private final Logger logger;
            private final AbstractInvokable invokable;
            private final Thread executer;
          • private final String taskName;
            private final long taskCancellationIntervalMillis;
            private final ResultPartition[] producedPartitions;
            private final SingleInputGate[] inputGates;
            + private final CountDownLatch watchDogLatch;

          public TaskCanceler(

          • Logger logger,
              • End diff –

          I think the logger was added to prevent that the compiler needs to inject static bridge methods to access it. Why not keep it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83800396 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1183,47 +1220,48 @@ public String toString() { */ private static class TaskCanceler implements Runnable { private final Logger logger; private final AbstractInvokable invokable; private final Thread executer; private final String taskName; private final long taskCancellationIntervalMillis; private final ResultPartition[] producedPartitions; private final SingleInputGate[] inputGates; + private final CountDownLatch watchDogLatch; public TaskCanceler( Logger logger, End diff – I think the logger was added to prevent that the compiler needs to inject static bridge methods to access it. Why not keep it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2652#discussion_r83801277

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java —
          @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception

          { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); }

          + /**
          + * Task cancellation blocks the task canceller. Interrupt after cancel via
          + * cancellation watch dog.
          + */
          + @Test
          + public void testTaskCancelWatchDog() throws Exception

          { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + }

          +
          + @Test
          + public void testReportFatalErrorAfterCancellationTimeout() throws Exception {
          + Configuration config = new Configuration();
          + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10);
          + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200);
          +
          + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config);
          + task.startTaskThread();
          +
          + awaitLatch.await();
          +
          + task.cancelExecution();
          +
          + for (int i = 0; i < 10; i++) {
          + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS);
          + if (msg instanceof TaskManagerMessages.FatalError) {
          + System.out.println(msg);
          — End diff –

          Leftover?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83801277 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java — @@ -565,6 +568,50 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testTaskCancelWatchDog() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 100); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 1000); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + } + + @Test + public void testReportFatalErrorAfterCancellationTimeout() throws Exception { + Configuration config = new Configuration(); + config.setLong(ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, 10); + config.setLong(ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, 200); + + Task task = createTask(InvokableBlockingInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + for (int i = 0; i < 10; i++) { + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS); + if (msg instanceof TaskManagerMessages.FatalError) { + System.out.println(msg); — End diff – Leftover?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2652

          FLINK-4715 Fail TaskManager with fatal error if task cancellation is stuck

          • Splits the cancellation up into two threads:
          • The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on the executing Thread. It then exists.
          • The `TaskCancellationWatchDog` kicks in after the task cancellation timeout (current default: 30 secs) and periodically calls `interrupt` on the executing Thread. If the Thread does not terminate within the task cancellation timeout (new config value, default 3 mins), the task manager is notified about a fatal error, leading to termination of the JVM.
          • The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
            (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 4715-suicide

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2652.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2652



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2652 FLINK-4715 Fail TaskManager with fatal error if task cancellation is stuck Splits the cancellation up into two threads: The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on the executing Thread. It then exists. The `TaskCancellationWatchDog` kicks in after the task cancellation timeout (current default: 30 secs) and periodically calls `interrupt` on the executing Thread. If the Thread does not terminate within the task cancellation timeout (new config value, default 3 mins), the task manager is notified about a fatal error, leading to termination of the JVM. The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS` (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval). You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4715-suicide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2652
          Hide
          StephanEwen Stephan Ewen added a comment -

          I think we should do the following:

          Split the cancellation up into two threads:

          1. The first thread calls cancel() on the task and interrupt() on the main thread. It then exits.
          2. The second thread is a watchdog that kicks in after n seconds (default is 10, I think) and periodically calls interrupt() every n seconds. After a maximum duration (lets say 1 minute) it notifies the TaskManager of a fatal error. In most setups, this leads to a process kill.

          The reason to separate this into two threads is that we have seen cases where cancel() blocks waiting on a lock held by the main thread. In that case, neither an interrupt() call would come, nor would the "task manager exit" safety net ever kick in.

          Show
          StephanEwen Stephan Ewen added a comment - I think we should do the following: Split the cancellation up into two threads: The first thread calls cancel() on the task and interrupt() on the main thread. It then exits. The second thread is a watchdog that kicks in after n seconds (default is 10, I think) and periodically calls interrupt() every n seconds. After a maximum duration (lets say 1 minute) it notifies the TaskManager of a fatal error. In most setups, this leads to a process kill. The reason to separate this into two threads is that we have seen cases where cancel() blocks waiting on a lock held by the main thread. In that case, neither an interrupt() call would come, nor would the "task manager exit" safety net ever kick in.
          Hide
          zjwang zhijiang added a comment - - edited

          Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled. We take the way that if the job master cancel the task failed many times, the job master will let the task manager exit itself.

          Show
          zjwang zhijiang added a comment - - edited Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled. We take the way that if the job master cancel the task failed many times, the job master will let the task manager exit itself.

            People

            • Assignee:
              uce Ufuk Celebi
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development