Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7216

ExecutionGraph can perform concurrent global restarts to scheduling

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.1, 1.3.1
    • Fix Version/s: 1.4.0, 1.3.2
    • Labels:
      None
    • Release Note:
      Hide
      Fixed in
        - 1.4.0 via 74a6cbab4e736cdb353d100cdd29f51809325796
        - 1.3.2 via e6348fbde1fc0ee8ea682063a4d6503ba3b68864
      Show
      Fixed in   - 1.4.0 via 74a6cbab4e736cdb353d100cdd29f51809325796   - 1.3.2 via e6348fbde1fc0ee8ea682063a4d6503ba3b68864

      Description

      Because ExecutionGraph restarts happen asynchronously and possibly delayed, it can happen in rare corner cases that two restarts are attempted concurrently, in which case some structures on the Execution Graph undergo a concurrent access:

      Sample stack trace:

      WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Failed to restart the job.
      java.lang.IllegalStateException: SlotSharingGroup cannot clear task assignment, group still has allocated resources.
          at org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup.clearTaskAssignment(SlotSharingGroup.java:78)
          at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:535)
          at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1151)
          at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter$1.call(ExecutionGraphRestarter.java:40)
          at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:748)
      

      The solution is to strictly guard against "subsumed" restarts via the globalModVersion in a similar way as we fence local restarts against global restarts.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/4364

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/4364
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4364

          +1 for merging!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4364 +1 for merging!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128515844

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java —
          @@ -0,0 +1,128 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.executiongraph.restart;
          +
          +import org.apache.flink.api.common.time.Time;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
          +
          +import org.junit.After;
          +import org.junit.Test;
          +
          +import java.util.concurrent.Executors;
          +import java.util.concurrent.ScheduledExecutorService;
          +
          +import static org.junit.Assert.assertFalse;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Unit test for the

          {@link FailureRateRestartStrategy}

          .
          + */
          +public class FailureRateRestartStrategyTest {
          +
          + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
          +
          + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
          +
          + @After
          + public void shutdownExecutor()

          { + executorService.shutdownNow(); + }

          +
          + // ------------------------------------------------------------------------
          +
          + @Test
          + public void testManyFailuresWithinRate() throws Exception {
          + final int numAttempts = 10;
          + final int intervalMillis = 1;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + }

          +
          + assertTrue(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testFailuresExceedingRate() throws Exception {
          + final int numFailures = 3;
          + final int intervalMillis = 10_000;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + }

          +
          + // now the rate should be exceeded
          + assertFalse(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testDelay() throws Exception {
          + final long restartDelay = 2;
          + final int numberRestarts = 10;
          +
          + final FailureRateRestartStrategy strategy =
          + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
          +
          + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft)

          { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + }

          + }
          +
          + // ------------------------------------------------------------------------
          +
          + /**
          + * This method makes sure that the actual interval and is not spuriously waking up.
          — End diff –

          Perfect

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128515844 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java — @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the {@link FailureRateRestartStrategy} . + */ +public class FailureRateRestartStrategyTest { + + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); + + @After + public void shutdownExecutor() { + executorService.shutdownNow(); + } + + // ------------------------------------------------------------------------ + + @Test + public void testManyFailuresWithinRate() throws Exception { + final int numAttempts = 10; + final int intervalMillis = 1; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + } + + assertTrue(restartStrategy.canRestart()); + } + + @Test + public void testFailuresExceedingRate() throws Exception { + final int numFailures = 3; + final int intervalMillis = 10_000; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + } + + // now the rate should be exceeded + assertFalse(restartStrategy.canRestart()); + } + + @Test + public void testDelay() throws Exception { + final long restartDelay = 2; + final int numberRestarts = 10; + + final FailureRateRestartStrategy strategy = + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay)); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + } + } + + // ------------------------------------------------------------------------ + + /** + * This method makes sure that the actual interval and is not spuriously waking up. — End diff – Perfect
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128515593

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
          + eg.setScheduleMode(ScheduleMode.EAGER);
          + eg.scheduleForExecution();
          +
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          +
          + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
          + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
          + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
          +
          + final OneShotLatch failTrigger = new OneShotLatch();
          + final CountDownLatch readyLatch = new CountDownLatch(2);
          +
          + Thread failure1 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try

          { + failTrigger.await(); + } catch (InterruptedException ignored) {}
          +
          + first.fail(new Exception("intended test failure 1"));
          + }
          + };
          +
          + Thread failure2 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try { + failTrigger.await(); + }

          catch (InterruptedException ignored) {}
          +
          + last.fail(new Exception("intended test failure 2"));
          + }
          + };
          +
          + // make sure both threads start simultaneously
          + failure1.start();
          + failure2.start();
          + readyLatch.await();
          + failTrigger.trigger();
          +
          + waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
          + completeCancellingForAllVertices(eg);
          +
          + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          + finishAllVertices(eg);
          +
          + eg.waitUntilTerminal();
          + assertEquals(JobStatus.FINISHED, eg.getState());
          + }
          +
          + @Test
          + public void testConcurrentGlobalFailAndRestarts() throws Exception {
          — End diff –

          Jip, I think so too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128515593 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices() [0] .getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices() [vertex.getParallelism() - 1] .getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); + + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); + waitUntilDeployedAndSwitchToRunning(eg, 1000); + finishAllVertices(eg); + + eg.waitUntilTerminal(); + assertEquals(JobStatus.FINISHED, eg.getState()); + } + + @Test + public void testConcurrentGlobalFailAndRestarts() throws Exception { — End diff – Jip, I think so too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4364

          Thanks for the reviews. Addressing the comments, rerunning tests, and merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4364 Thanks for the reviews. Addressing the comments, rerunning tests, and merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128512830

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java —
          @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) {
          }
          }

          + /**
          + * Turns a newly scheduled execution graph into a state where all vertices run.
          + * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
          + */
          + public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
          + // wait until everything is running
          + for (ExecutionVertex ev : eg.getAllExecutionVertices())

          { + final Execution exec = ev.getCurrentExecutionAttempt(); + waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout); + }

          +
          + // Note: As ugly as it is, we need this minor sleep, because between switching
          + // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
          + // against concurrent modifications (cancel / fail). We can only switch this to running
          + // once that check is passed. For the actual runtime, this switch is triggered by a callback
          + // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
          + // which cannot easily tell us when that condition has happened, unfortunately.
          + try {
          + Thread.sleep(2);
          — End diff –

          In very rare cases, it might. I want to change the `Execution` a bit on the `master` to make this unnecessary.

          However, that is too much surgery in a critical part for a bugfix release, so I decided to be conservative in the runtime code and rather pay this price in the tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128512830 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java — @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) { } } + /** + * Turns a newly scheduled execution graph into a state where all vertices run. + * This waits until all executions have reached state 'DEPLOYING' and then switches them to running. + */ + public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException { + // wait until everything is running + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + final Execution exec = ev.getCurrentExecutionAttempt(); + waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout); + } + + // Note: As ugly as it is, we need this minor sleep, because between switching + // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check + // against concurrent modifications (cancel / fail). We can only switch this to running + // once that check is passed. For the actual runtime, this switch is triggered by a callback + // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers + // which cannot easily tell us when that condition has happened, unfortunately. + try { + Thread.sleep(2); — End diff – In very rare cases, it might. I want to change the `Execution` a bit on the `master` to make this unnecessary. However, that is too much surgery in a critical part for a bugfix release, so I decided to be conservative in the runtime code and rather pay this price in the tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128512192

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java —
          @@ -0,0 +1,128 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.executiongraph.restart;
          +
          +import org.apache.flink.api.common.time.Time;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
          +
          +import org.junit.After;
          +import org.junit.Test;
          +
          +import java.util.concurrent.Executors;
          +import java.util.concurrent.ScheduledExecutorService;
          +
          +import static org.junit.Assert.assertFalse;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Unit test for the

          {@link FailureRateRestartStrategy}

          .
          + */
          +public class FailureRateRestartStrategyTest {
          +
          + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
          +
          + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
          +
          + @After
          + public void shutdownExecutor()

          { + executorService.shutdownNow(); + }

          +
          + // ------------------------------------------------------------------------
          +
          + @Test
          + public void testManyFailuresWithinRate() throws Exception {
          + final int numAttempts = 10;
          + final int intervalMillis = 1;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + }

          +
          + assertTrue(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testFailuresExceedingRate() throws Exception {
          + final int numFailures = 3;
          + final int intervalMillis = 10_000;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + }

          +
          + // now the rate should be exceeded
          + assertFalse(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testDelay() throws Exception {
          + final long restartDelay = 2;
          + final int numberRestarts = 10;
          +
          + final FailureRateRestartStrategy strategy =
          + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
          +
          + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft)

          { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + }

          + }
          +
          + // ------------------------------------------------------------------------
          +
          + /**
          + * This method makes sure that the actual interval and is not spuriously waking up.
          — End diff –

          Then the whole method and test anyways aborts exceptionally.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128512192 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java — @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the {@link FailureRateRestartStrategy} . + */ +public class FailureRateRestartStrategyTest { + + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); + + @After + public void shutdownExecutor() { + executorService.shutdownNow(); + } + + // ------------------------------------------------------------------------ + + @Test + public void testManyFailuresWithinRate() throws Exception { + final int numAttempts = 10; + final int intervalMillis = 1; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + } + + assertTrue(restartStrategy.canRestart()); + } + + @Test + public void testFailuresExceedingRate() throws Exception { + final int numFailures = 3; + final int intervalMillis = 10_000; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + } + + // now the rate should be exceeded + assertFalse(restartStrategy.canRestart()); + } + + @Test + public void testDelay() throws Exception { + final long restartDelay = 2; + final int numberRestarts = 10; + + final FailureRateRestartStrategy strategy = + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay)); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + } + } + + // ------------------------------------------------------------------------ + + /** + * This method makes sure that the actual interval and is not spuriously waking up. — End diff – Then the whole method and test anyways aborts exceptionally.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128511963

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
          + eg.setScheduleMode(ScheduleMode.EAGER);
          + eg.scheduleForExecution();
          +
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          +
          + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
          + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
          + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
          +
          + final OneShotLatch failTrigger = new OneShotLatch();
          + final CountDownLatch readyLatch = new CountDownLatch(2);
          +
          + Thread failure1 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try

          { + failTrigger.await(); + } catch (InterruptedException ignored) {}
          +
          + first.fail(new Exception("intended test failure 1"));
          + }
          + };
          +
          + Thread failure2 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try { + failTrigger.await(); + }

          catch (InterruptedException ignored) {}
          +
          + last.fail(new Exception("intended test failure 2"));
          + }
          + };
          +
          + // make sure both threads start simultaneously
          + failure1.start();
          + failure2.start();
          + readyLatch.await();
          + failTrigger.trigger();
          +
          + waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
          + completeCancellingForAllVertices(eg);
          +
          + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          + finishAllVertices(eg);
          +
          + eg.waitUntilTerminal();
          + assertEquals(JobStatus.FINISHED, eg.getState());
          + }
          +
          + @Test
          + public void testConcurrentGlobalFailAndRestarts() throws Exception {
          — End diff –

          From the offline chat: I think you are missing the asynchrony in the restarting, leading to a lock in the cherrypicked code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128511963 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices() [0] .getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices() [vertex.getParallelism() - 1] .getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); + + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); + waitUntilDeployedAndSwitchToRunning(eg, 1000); + finishAllVertices(eg); + + eg.waitUntilTerminal(); + assertEquals(JobStatus.FINISHED, eg.getState()); + } + + @Test + public void testConcurrentGlobalFailAndRestarts() throws Exception { — End diff – From the offline chat: I think you are missing the asynchrony in the restarting, leading to a lock in the cherrypicked code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128510738

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
          + eg.setScheduleMode(ScheduleMode.EAGER);
          + eg.scheduleForExecution();
          +
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          +
          + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
          + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
          + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
          +
          + final OneShotLatch failTrigger = new OneShotLatch();
          + final CountDownLatch readyLatch = new CountDownLatch(2);
          +
          + Thread failure1 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try

          { + failTrigger.await(); + } catch (InterruptedException ignored) {}
          +
          + first.fail(new Exception("intended test failure 1"));
          + }
          + };
          +
          + Thread failure2 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try { + failTrigger.await(); + }

          catch (InterruptedException ignored) {}
          +
          + last.fail(new Exception("intended test failure 2"));
          + }
          + };
          +
          + // make sure both threads start simultaneously
          + failure1.start();
          + failure2.start();
          + readyLatch.await();
          + failTrigger.trigger();
          +
          + waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
          + completeCancellingForAllVertices(eg);
          — End diff –

          True, those docs are copy/paste wrong I fixed them...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128510738 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices() [0] .getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices() [vertex.getParallelism() - 1] .getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); — End diff – True, those docs are copy/paste wrong I fixed them...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128509863

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          — End diff –

          Right, this one was a test that should have been there in the first place and I took this chance to add it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128509863 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { — End diff – Right, this one was a test that should have been there in the first place and I took this chance to add it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128509787

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception {
          }

          @Test

          • public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
              • End diff –

          Yes, as part of introducing the "callback" indirection, we can now also test the restart strategies much better, without always setting up a full ExecutionGraph. I added it to the refactoring.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128509787 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception { } @Test public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception { End diff – Yes, as part of introducing the "callback" indirection, we can now also test the restart strategies much better, without always setting up a full ExecutionGraph. I added it to the refactoring.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128475216

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java —
          @@ -0,0 +1,128 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.executiongraph.restart;
          +
          +import org.apache.flink.api.common.time.Time;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
          +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
          +
          +import org.junit.After;
          +import org.junit.Test;
          +
          +import java.util.concurrent.Executors;
          +import java.util.concurrent.ScheduledExecutorService;
          +
          +import static org.junit.Assert.assertFalse;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Unit test for the

          {@link FailureRateRestartStrategy}

          .
          + */
          +public class FailureRateRestartStrategyTest {
          +
          + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
          +
          + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
          +
          + @After
          + public void shutdownExecutor()

          { + executorService.shutdownNow(); + }

          +
          + // ------------------------------------------------------------------------
          +
          + @Test
          + public void testManyFailuresWithinRate() throws Exception {
          + final int numAttempts = 10;
          + final int intervalMillis = 1;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + }

          +
          + assertTrue(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testFailuresExceedingRate() throws Exception {
          + final int numFailures = 3;
          + final int intervalMillis = 10_000;
          +
          + final FailureRateRestartStrategy restartStrategy =
          + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
          +
          + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft)

          { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + }

          +
          + // now the rate should be exceeded
          + assertFalse(restartStrategy.canRestart());
          + }
          +
          + @Test
          + public void testDelay() throws Exception {
          + final long restartDelay = 2;
          + final int numberRestarts = 10;
          +
          + final FailureRateRestartStrategy strategy =
          + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
          +
          + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft)

          { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + }

          + }
          +
          + // ------------------------------------------------------------------------
          +
          + /**
          + * This method makes sure that the actual interval and is not spuriously waking up.
          — End diff –

          "This method makes sure to sleep for the required interval and that we don't spuriously wake up."?

          Also, what happens if `Thread.sleep()` is interrupted?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128475216 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java — @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the {@link FailureRateRestartStrategy} . + */ +public class FailureRateRestartStrategyTest { + + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); + + @After + public void shutdownExecutor() { + executorService.shutdownNow(); + } + + // ------------------------------------------------------------------------ + + @Test + public void testManyFailuresWithinRate() throws Exception { + final int numAttempts = 10; + final int intervalMillis = 1; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + } + + assertTrue(restartStrategy.canRestart()); + } + + @Test + public void testFailuresExceedingRate() throws Exception { + final int numFailures = 3; + final int intervalMillis = 10_000; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + } + + // now the rate should be exceeded + assertFalse(restartStrategy.canRestart()); + } + + @Test + public void testDelay() throws Exception { + final long restartDelay = 2; + final int numberRestarts = 10; + + final FailureRateRestartStrategy strategy = + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay)); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + } + } + + // ------------------------------------------------------------------------ + + /** + * This method makes sure that the actual interval and is not spuriously waking up. — End diff – "This method makes sure to sleep for the required interval and that we don't spuriously wake up."? Also, what happens if `Thread.sleep()` is interrupted?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128477515

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -727,4 +837,46 @@ private static void haltExecution(ExecutionGraph eg)

          { assertEquals(JobStatus.FINISHED, eg.getState()); }

          +
          + // ------------------------------------------------------------------------
          +
          + /**
          + * A TaskManager gateway that does not ack cancellations.
          + */
          + private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
          +
          + @Override
          + public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout)

          { + return new FlinkCompletableFuture<>(); + }

          + }
          +
          + private static final class TriggeredRestartStrategy implements RestartStrategy {
          — End diff –

          "A

          {@link RestartStrategy}

          that blocks restarting on a given

          {@link OneShotLatch}

          ."?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128477515 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -727,4 +837,46 @@ private static void haltExecution(ExecutionGraph eg) { assertEquals(JobStatus.FINISHED, eg.getState()); } + + // ------------------------------------------------------------------------ + + /** + * A TaskManager gateway that does not ack cancellations. + */ + private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway { + + @Override + public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return new FlinkCompletableFuture<>(); + } + } + + private static final class TriggeredRestartStrategy implements RestartStrategy { — End diff – "A {@link RestartStrategy} that blocks restarting on a given {@link OneShotLatch} ."?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128494097

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
          + eg.setScheduleMode(ScheduleMode.EAGER);
          + eg.scheduleForExecution();
          +
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          +
          + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
          + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
          + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
          +
          + final OneShotLatch failTrigger = new OneShotLatch();
          + final CountDownLatch readyLatch = new CountDownLatch(2);
          +
          + Thread failure1 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try

          { + failTrigger.await(); + } catch (InterruptedException ignored) {}
          +
          + first.fail(new Exception("intended test failure 1"));
          + }
          + };
          +
          + Thread failure2 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try { + failTrigger.await(); + }

          catch (InterruptedException ignored) {}
          +
          + last.fail(new Exception("intended test failure 2"));
          + }
          + };
          +
          + // make sure both threads start simultaneously
          + failure1.start();
          + failure2.start();
          + readyLatch.await();
          + failTrigger.trigger();
          +
          + waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
          + completeCancellingForAllVertices(eg);
          +
          + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          + finishAllVertices(eg);
          +
          + eg.waitUntilTerminal();
          + assertEquals(JobStatus.FINISHED, eg.getState());
          + }
          +
          + @Test
          + public void testConcurrentGlobalFailAndRestarts() throws Exception {
          — End diff –

          I tried running this on current master and the test failed but I didn't see a "storm of restarts"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128494097 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices() [0] .getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices() [vertex.getParallelism() - 1] .getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); + + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); + waitUntilDeployedAndSwitchToRunning(eg, 1000); + finishAllVertices(eg); + + eg.waitUntilTerminal(); + assertEquals(JobStatus.FINISHED, eg.getState()); + } + + @Test + public void testConcurrentGlobalFailAndRestarts() throws Exception { — End diff – I tried running this on current master and the test failed but I didn't see a "storm of restarts"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128477775

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java —
          @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) {
          }
          }

          + /**
          + * Turns a newly scheduled execution graph into a state where all vertices run.
          + * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
          + */
          + public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
          + // wait until everything is running
          + for (ExecutionVertex ev : eg.getAllExecutionVertices())

          { + final Execution exec = ev.getCurrentExecutionAttempt(); + waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout); + }

          +
          + // Note: As ugly as it is, we need this minor sleep, because between switching
          + // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
          + // against concurrent modifications (cancel / fail). We can only switch this to running
          + // once that check is passed. For the actual runtime, this switch is triggered by a callback
          + // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
          + // which cannot easily tell us when that condition has happened, unfortunately.
          + try {
          + Thread.sleep(2);
          — End diff –

          😢 but it seems there's no way around it. Could this lead to flaky tests?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128477775 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java — @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) { } } + /** + * Turns a newly scheduled execution graph into a state where all vertices run. + * This waits until all executions have reached state 'DEPLOYING' and then switches them to running. + */ + public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException { + // wait until everything is running + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + final Execution exec = ev.getCurrentExecutionAttempt(); + waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout); + } + + // Note: As ugly as it is, we need this minor sleep, because between switching + // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check + // against concurrent modifications (cancel / fail). We can only switch this to running + // once that check is passed. For the actual runtime, this switch is triggered by a callback + // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers + // which cannot easily tell us when that condition has happened, unfortunately. + try { + Thread.sleep(2); — End diff – 😢 but it seems there's no way around it. Could this lead to flaky tests?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128491763

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
          + eg.setScheduleMode(ScheduleMode.EAGER);
          + eg.scheduleForExecution();
          +
          + waitUntilDeployedAndSwitchToRunning(eg, 1000);
          +
          + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
          + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
          + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
          +
          + final OneShotLatch failTrigger = new OneShotLatch();
          + final CountDownLatch readyLatch = new CountDownLatch(2);
          +
          + Thread failure1 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try

          { + failTrigger.await(); + } catch (InterruptedException ignored) {}
          +
          + first.fail(new Exception("intended test failure 1"));
          + }
          + };
          +
          + Thread failure2 = new Thread() {
          + @Override
          + public void run() {
          + readyLatch.countDown();
          + try { + failTrigger.await(); + }

          catch (InterruptedException ignored) {}
          +
          + last.fail(new Exception("intended test failure 2"));
          + }
          + };
          +
          + // make sure both threads start simultaneously
          + failure1.start();
          + failure2.start();
          + readyLatch.await();
          + failTrigger.trigger();
          +
          + waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
          + completeCancellingForAllVertices(eg);
          — End diff –

          By the way, I noticed that `completeCancellingForAllVertices()` and `finishAllVertices()` have slightly misleading Javadoc. That threw me off a bit when reviewing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128491763 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices() [0] .getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices() [vertex.getParallelism() - 1] .getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); — End diff – By the way, I noticed that `completeCancellingForAllVertices()` and `finishAllVertices()` have slightly misleading Javadoc. That threw me off a bit when reviewing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128474733

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception {
          }

          @Test

          • public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
              • End diff –

          These tests are superseded by the newly added tests in `FailureRateRestartStrategyTest`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128474733 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception { } @Test public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception { End diff – These tests are superseded by the newly added tests in `FailureRateRestartStrategyTest`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128476571

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -1120,10 +1125,16 @@ public void accept(Void value) {
          }
          }

          • public void restart() {
            + public void restart(long expectedGlobalVersion) {
            try {
            synchronized (progressLock) {
          • JobStatus current = state;
            + // check and increment the global version to move this recovery up
              • End diff –

          "check the current global version to determine whether our recovery attempt is still current"?

          It's not incrementing the global version here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128476571 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -1120,10 +1125,16 @@ public void accept(Void value) { } } public void restart() { + public void restart(long expectedGlobalVersion) { try { synchronized (progressLock) { JobStatus current = state; + // check and increment the global version to move this recovery up End diff – "check the current global version to determine whether our recovery attempt is still current"? It's not incrementing the global version here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128480263

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java —
          @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception

          { assertEquals(JobStatus.SUSPENDED, eg.getState()); }

          + @Test
          + public void testConcurrentLocalFailAndRestart() throws Exception {
          — End diff –

          This only verifies that we don't break the existing and working local failover, right? This test should also succeed on the current master and I checked and it indeed does.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128480263 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java — @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { — End diff – This only verifies that we don't break the existing and working local failover, right? This test should also succeed on the current master and I checked and it indeed does.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4364

          Okay, will update the periods. The linguist in my heart cries a bit, but I guess it makes sense that we cannot expect checkstyle to figure out if a sentence is a complete sentence or not...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4364 Okay, will update the periods. The linguist in my heart cries a bit, but I guess it makes sense that we cannot expect checkstyle to figure out if a sentence is a complete sentence or not...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4364

          With the current rules, the first sentence of any *javadoc* must end in a period.

          So, this is invalid:
          ```
          /** some parameter */
          private final int myParameter
          ```

          But, this is fine:
          ```
          // some parameter
          private final int myParamter
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4364 With the current rules, the first sentence of any * javadoc * must end in a period. So, this is invalid: ``` /** some parameter */ private final int myParameter ``` But, this is fine: ``` // some parameter private final int myParamter ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4364

          Concerning the 'period' check style rule:

          I think that the common language rules (not JavaDoc specific) are to add a period after complete sentences. That would mean that parameter descriptions, when not complete sentences, are not terminated by a period.

          Are we rolling a rule that every text line has to be terminated in a period/fullstop?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4364 Concerning the 'period' check style rule: I think that the common language rules (not JavaDoc specific) are to add a period after complete sentences. That would mean that parameter descriptions, when not complete sentences, are not terminated by a period. Are we rolling a rule that every text line has to be terminated in a period/fullstop?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128197299

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java —
          @@ -37,15 +37,26 @@
          /** Atomic flag to make sure this is used only once */
          private final AtomicBoolean used;

          • public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
            + /** The globalModVersion that the ExecutionGraph needs to have for the restart to go through */
              • End diff –

          Please add a period.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128197299 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java — @@ -37,15 +37,26 @@ /** Atomic flag to make sure this is used only once */ private final AtomicBoolean used; public ExecutionGraphRestartCallback(ExecutionGraph execGraph) { + /** The globalModVersion that the ExecutionGraph needs to have for the restart to go through */ End diff – Please add a period.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128196982

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java —
          @@ -33,9 +34,14 @@
          boolean canRestart();

          /**

          • * Restarts the given {@link ExecutionGraph}

            .
            + * Called by the ExecutionGraph to eventually trigger a full recovery.
            + * The recovery must be triggered on the given callback object, and may be delayed
            + * with the help of the given scheduled executor.
            + *

              • End diff –

          Please remove the trailing space.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128196982 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java — @@ -33,9 +34,14 @@ boolean canRestart(); /** * Restarts the given {@link ExecutionGraph} . + * Called by the ExecutionGraph to eventually trigger a full recovery. + * The recovery must be triggered on the given callback object, and may be delayed + * with the help of the given scheduled executor. + * End diff – Please remove the trailing space.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128196537

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java —
          @@ -19,27 +19,33 @@
          package org.apache.flink.runtime.executiongraph.restart;

          import org.apache.flink.runtime.executiongraph.ExecutionGraph;
          -import org.slf4j.Logger;
          -import org.slf4j.LoggerFactory;
          -
          -import java.util.concurrent.Callable;
          -
          -class ExecutionGraphRestarter {

          • private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
          • public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
          • return new Callable<Object>() {
          • @Override
          • public Object call() throws Exception {
          • try {
          • LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
          • // do the delay
          • Thread.sleep(delayBetweenRestartAttemptsInMillis);
          • } catch(InterruptedException e) { - // should only happen on shutdown - }
          • executionGraph.restart();
          • return null;
          • }
          • };
            +
            +import java.util.concurrent.atomic.AtomicBoolean;
            +
            +import static org.apache.flink.util.Preconditions.checkNotNull;
            +
            +/**
            + * A {@link RestartCallback}

            that abstracts restart calls on an

            {@link ExecutionGraph}

            .
            + *
            + * <p>This callback implementation is one-shot; it can only be used once.
            + */
            +public class ExecutionGraphRestartCallback implements RestartCallback {
            +
            + /** The ExecutionGraph to restart */
            + private final ExecutionGraph execGraph;
            +
            + /** Atomic flag to make sure this is used only once */

              • End diff –

          Please add a period here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128196537 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java — @@ -19,27 +19,33 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -class ExecutionGraphRestarter { private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class); public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) { return new Callable<Object>() { @Override public Object call() throws Exception { try { LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis); // do the delay Thread.sleep(delayBetweenRestartAttemptsInMillis); } catch(InterruptedException e) { - // should only happen on shutdown - } executionGraph.restart(); return null; } }; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph} . + * + * <p>This callback implementation is one-shot; it can only be used once. + */ +public class ExecutionGraphRestartCallback implements RestartCallback { + + /** The ExecutionGraph to restart */ + private final ExecutionGraph execGraph; + + /** Atomic flag to make sure this is used only once */ End diff – Please add a period here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128196559

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java —
          @@ -19,27 +19,33 @@
          package org.apache.flink.runtime.executiongraph.restart;

          import org.apache.flink.runtime.executiongraph.ExecutionGraph;
          -import org.slf4j.Logger;
          -import org.slf4j.LoggerFactory;
          -
          -import java.util.concurrent.Callable;
          -
          -class ExecutionGraphRestarter {

          • private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
          • public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
          • return new Callable<Object>() {
          • @Override
          • public Object call() throws Exception {
          • try {
          • LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
          • // do the delay
          • Thread.sleep(delayBetweenRestartAttemptsInMillis);
          • } catch(InterruptedException e) { - // should only happen on shutdown - }
          • executionGraph.restart();
          • return null;
          • }
          • };
            +
            +import java.util.concurrent.atomic.AtomicBoolean;
            +
            +import static org.apache.flink.util.Preconditions.checkNotNull;
            +
            +/**
            + * A {@link RestartCallback}

            that abstracts restart calls on an

            {@link ExecutionGraph}

            .
            + *
            + * <p>This callback implementation is one-shot; it can only be used once.
            + */
            +public class ExecutionGraphRestartCallback implements RestartCallback {
            +
            + /** The ExecutionGraph to restart */

              • End diff –

          Please add a period here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128196559 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java — @@ -19,27 +19,33 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -class ExecutionGraphRestarter { private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class); public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) { return new Callable<Object>() { @Override public Object call() throws Exception { try { LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis); // do the delay Thread.sleep(delayBetweenRestartAttemptsInMillis); } catch(InterruptedException e) { - // should only happen on shutdown - } executionGraph.restart(); return null; } }; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph} . + * + * <p>This callback implementation is one-shot; it can only be used once. + */ +public class ExecutionGraphRestartCallback implements RestartCallback { + + /** The ExecutionGraph to restart */ End diff – Please add a period here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4364#discussion_r128196611

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java —
          @@ -19,27 +19,33 @@
          package org.apache.flink.runtime.executiongraph.restart;

          import org.apache.flink.runtime.executiongraph.ExecutionGraph;
          -import org.slf4j.Logger;
          -import org.slf4j.LoggerFactory;
          -
          -import java.util.concurrent.Callable;
          -
          -class ExecutionGraphRestarter {

          • private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
          • public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
          • return new Callable<Object>() {
          • @Override
          • public Object call() throws Exception {
          • try {
          • LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
          • // do the delay
          • Thread.sleep(delayBetweenRestartAttemptsInMillis);
          • } catch(InterruptedException e) { - // should only happen on shutdown - }
          • executionGraph.restart();
          • return null;
          • }
          • };
            +
            +import java.util.concurrent.atomic.AtomicBoolean;
            +
            +import static org.apache.flink.util.Preconditions.checkNotNull;
            +
            +/**
            + * A {@link RestartCallback}

            that abstracts restart calls on an

            {@link ExecutionGraph}

            .
            + *

              • End diff –

          Please remove the trailing space.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4364#discussion_r128196611 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java — @@ -19,27 +19,33 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -class ExecutionGraphRestarter { private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class); public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) { return new Callable<Object>() { @Override public Object call() throws Exception { try { LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis); // do the delay Thread.sleep(delayBetweenRestartAttemptsInMillis); } catch(InterruptedException e) { - // should only happen on shutdown - } executionGraph.restart(); return null; } }; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph} . + * End diff – Please remove the trailing space.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/4364

          FLINK-7216 [distr. coordination] Guard against concurrent global failover

          *This is one of the blocker issues for the 1.3.2 release.*

            1. What is the purpose of the change

          This fixed the bug FLINK-7216(https://issues.apache.org/jira/browse/FLINK-7216) where some race conditions can trigger concurrent failovers, triggering a restart-storm.

          The heart of the bug is the fact that we allow initiating another restart while already being in state `RESTARTING`. That was introduced as a safety net to catch exceptions (implementation bugs) that are reported in that state and need a full recovery to ensure consistency.

          However, this means that accidentally, multiple restarts may be triggered/queued and then execute after another. While one attempt is executing the failover, the next one will interfere or abort (as detected conflicting) and schedule another recovery, leading to the above mentioned restart storm. The restart storm subsides once one restart attempt makes enough progress (before the other interferes) to actually finish the scheduling phase.

            1. Brief change log

          This contains three issues, because the first two were needed for a preparing the fix.

          • FLINK-6665(https://issues.apache.org/jira/browse/FLINK-6665) and FLINK-6667(https://issues.apache.org/jira/browse/FLINK-6667) introduce an indirection where the `RestartStrategy` does no longer call `restart()` on the `ExecutionGraph` directly. Instead, they call a callback to initiate the restart.
          • The actual fix makes sure that the `globalModVersion` (which tracks global changes such as full restarts in the ExecutionGraph) is unchanged between triggering the restart and executing it. When scheduling multiple restart requests, only one will actually take effect, while the others detect being subsumed.
            1. Verifying this change

          This change added the following tests:

          • `ExecutionGraphRestartTest#testConcurrentGlobalFailAndRestarts()` tests explicitly that setting
          • `ExecutionGraphRestartTest#testConcurrentLocalFailAndRestart()` tests a similar setup

          The general working of that mechanism is also covered by various existing test in `org.apache.flink.runtime.executiongraph.restart`

            1. Does this pull request potentially affect one of the following parts:
          • Dependencies (does it add or upgrade a dependency): *no*
          • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: *no*
          • The serializers: *no*
          • The runtime per-record code paths (performance sensitive): *no*
          • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: *yes*:

          It the change affects the restart logic on the `JobManager`.

            1. Documentation
          • Does this pull request introduce a new feature? *no*
          • If yes, how is the feature documented? *not applicable*

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink concurrent_restarts_13

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4364.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4364


          commit 1abb816d664bdac9d8b9af438769b9f685e768ce
          Author: zjureel <zjureel@gmail.com>
          Date: 2017-07-18T17:27:56Z

          FLINK-6665 FLINK-6667 [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts

          Initial work by zjureel@gmail.com , improved by sewen@apache.org.

          commit ef88524c808766e08d990f3bb69c45b04807c7c2
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-07-18T17:49:56Z

          FLINK-7216 [distr. coordination] Guard against concurrent global failover


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4364 FLINK-7216 [distr. coordination] Guard against concurrent global failover * This is one of the blocker issues for the 1.3.2 release. * What is the purpose of the change This fixed the bug FLINK-7216 ( https://issues.apache.org/jira/browse/FLINK-7216 ) where some race conditions can trigger concurrent failovers, triggering a restart-storm. The heart of the bug is the fact that we allow initiating another restart while already being in state `RESTARTING`. That was introduced as a safety net to catch exceptions (implementation bugs) that are reported in that state and need a full recovery to ensure consistency. However, this means that accidentally, multiple restarts may be triggered/queued and then execute after another. While one attempt is executing the failover, the next one will interfere or abort (as detected conflicting) and schedule another recovery, leading to the above mentioned restart storm. The restart storm subsides once one restart attempt makes enough progress (before the other interferes) to actually finish the scheduling phase. Brief change log This contains three issues, because the first two were needed for a preparing the fix. FLINK-6665 ( https://issues.apache.org/jira/browse/FLINK-6665 ) and FLINK-6667 ( https://issues.apache.org/jira/browse/FLINK-6667 ) introduce an indirection where the `RestartStrategy` does no longer call `restart()` on the `ExecutionGraph` directly. Instead, they call a callback to initiate the restart. The actual fix makes sure that the `globalModVersion` (which tracks global changes such as full restarts in the ExecutionGraph) is unchanged between triggering the restart and executing it. When scheduling multiple restart requests, only one will actually take effect, while the others detect being subsumed. Verifying this change This change added the following tests: `ExecutionGraphRestartTest#testConcurrentGlobalFailAndRestarts()` tests explicitly that setting `ExecutionGraphRestartTest#testConcurrentLocalFailAndRestart()` tests a similar setup The general working of that mechanism is also covered by various existing test in `org.apache.flink.runtime.executiongraph.restart` Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): * no * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: * no * The serializers: * no * The runtime per-record code paths (performance sensitive): * no * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: * yes *: It the change affects the restart logic on the `JobManager`. Documentation Does this pull request introduce a new feature? * no * If yes, how is the feature documented? * not applicable * You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink concurrent_restarts_13 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4364.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4364 commit 1abb816d664bdac9d8b9af438769b9f685e768ce Author: zjureel <zjureel@gmail.com> Date: 2017-07-18T17:27:56Z FLINK-6665 FLINK-6667 [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts Initial work by zjureel@gmail.com , improved by sewen@apache.org. commit ef88524c808766e08d990f3bb69c45b04807c7c2 Author: Stephan Ewen <sewen@apache.org> Date: 2017-07-18T17:49:56Z FLINK-7216 [distr. coordination] Guard against concurrent global failover

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development