Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5747

Eager Scheduling should deploy all Tasks together

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.3.0
    • Component/s: JobManager
    • Labels:
      None

      Description

      Currently, eager scheduling immediately triggers the scheduling for all vertices and their subtasks in topological order.

      This has two problems:

      • This works only, as long as resource acquisition is "synchronous". With dynamic resource acquisition in FLIP-6, the resources are returned as Futures which may complete out of order. This results in out-of-order (not in topological order) scheduling of tasks which does not work for streaming.
      • Deploying some tasks that depend on other tasks before it is clear that the other tasks have resources as well leads to situations where many deploy/recovery cycles happen before enough resources are available to get the job running fully.

      For eager scheduling, we should allocate all resources in one chunk and then deploy once we know that all are available.

      As a follow-up, the same should be done per pipelined component in lazy batch scheduling as well. That way we get lazy scheduling across blocking boundaries, and bulk (gang) scheduling in pipelined subgroups.

      This also does not apply for efforts of fine grained recovery, where individual tasks request replacement resources.

        Issue Links

          Activity

          Hide
          sihuazhou Sihua Zhou added a comment -

          Hi,Stephan Ewen, there's some problems i found with Eager Scheduling in flink 1.3.x. i will be preciate if you have time to review what i've posted in (FLINK-7153)link title, i will close the issue if i was wrong.
          Thanks.
          Sihua zhou

          Show
          sihuazhou Sihua Zhou added a comment - Hi, Stephan Ewen , there's some problems i found with Eager Scheduling in flink 1.3.x. i will be preciate if you have time to review what i've posted in ( FLINK-7153 ) link title , i will close the issue if i was wrong. Thanks. Sihua zhou
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3295

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3295
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed via f113d79451ba88c487358861cc3e20aac3d19257

          Show
          StephanEwen Stephan Ewen added a comment - Fixed via f113d79451ba88c487358861cc3e20aac3d19257
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101829659

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          +
          + // we need to first add the slots to this list, to be safe on release
          + resources.add(slots);
          +
          + for (ExecutionAndSlot ens : slots)

          { + slotFutures.add(ens.slotFuture); + }

          + }
          +
          + // this future is complete once all slot futures are complete.
          + // the future fails once one slot future fails.
          + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
          — End diff –

          True, it is not incorrect. But some tasks would be already deployed if we start as soon as some futures are ready. They would need to be canceled again, which gives these not so nice fast deploy/out-of-resource/cancel/wait-for-cancellation/retry/etc loops.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101829659 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + + // we need to first add the slots to this list, to be safe on release + resources.add(slots); + + for (ExecutionAndSlot ens : slots) { + slotFutures.add(ens.slotFuture); + } + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); — End diff – True, it is not incorrect. But some tasks would be already deployed if we start as soon as some futures are ready. They would need to be canceled again, which gives these not so nice fast deploy/out-of-resource/cancel/wait-for-cancellation/retry/etc loops.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101829019

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          +
          + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
          +
          + for (Future<?> future : futures)

          { + future.handle(conjunct.completionHandler); + }

          +
          + return conjunct;
          + }
          +
          + /**
          + * A future that is complete once multiple other futures completed. The futures are not
          + * necessarily of the same type, which is why the type of this Future is

          {@code Void}

          .
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
          + *

          {@link Future#thenCombine(Future, BiFunction)}

          ) is that ConjunctFuture also tracks how
          + * many of the Futures are already complete.
          + */
          + public interface ConjunctFuture extends CompletableFuture<Void>

          { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + }

          +
          + /**
          + * The implementation of the

          {@link ConjunctFuture}

          .
          + *
          + * <p>Implementation notice: The member fields all have package-private access, because they are
          + * either accessed by an inner subclass or by the enclosing class.
          + */
          + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
          — End diff –

          Yes, with set rather then add it should work. Since the list gets initialized with an array, I would actually just use an array in the first place.

          Followup

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101829019 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); + + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + + for (Future<?> future : futures) { + future.handle(conjunct.completionHandler); + } + + return conjunct; + } + + /** + * A future that is complete once multiple other futures completed. The futures are not + * necessarily of the same type, which is why the type of this Future is {@code Void} . + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via + * {@link Future#thenCombine(Future, BiFunction)} ) is that ConjunctFuture also tracks how + * many of the Futures are already complete. + */ + public interface ConjunctFuture extends CompletableFuture<Void> { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + } + + /** + * The implementation of the {@link ConjunctFuture} . + * + * <p>Implementation notice: The member fields all have package-private access, because they are + * either accessed by an inner subclass or by the enclosing class. + */ + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { — End diff – Yes, with set rather then add it should work. Since the list gets initialized with an array, I would actually just use an array in the first place. Followup
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101827724

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          — End diff –

          Yes, will change that...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101827724 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); — End diff – Yes, will change that...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3295

          +1 for merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3295 +1 for merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101766916

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java —
          @@ -0,0 +1,190 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.concurrent;
          +
          +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
          +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
          +
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.Arrays;
          +import java.util.Collections;
          +import java.util.concurrent.ExecutionException;
          +
          +import static org.junit.Assert.*;
          +
          +/**
          + * Tests for the utility methods in

          {@link FutureUtils}

          + */
          +public class FutureUtilsTest {
          +
          + @Test
          + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
          + try

          { + FutureUtils.combineAll(null); + fail(); + }

          catch (NullPointerException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + }

          catch (IllegalArgumentException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + }

          catch (NullPointerException ignored) {}
          + }
          +
          + @Test
          + public void testConjunctFutureCompletion() throws Exception {
          + // some futures that we combine
          + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
          +
          + // some future is initially completed
          + future2.complete(new Object());
          +
          + // build the conjunct future
          + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
          +
          + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
          + @Override
          + public void accept(Void value) {}
          + });
          — End diff –

          ah makes sense

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101766916 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java — @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +/** + * Tests for the utility methods in {@link FutureUtils} + */ +public class FutureUtilsTest { + + @Test + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { + try { + FutureUtils.combineAll(null); + fail(); + } catch (NullPointerException ignored) {} + + try { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + } catch (IllegalArgumentException ignored) {} + + try { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + } catch (NullPointerException ignored) {} + } + + @Test + public void testConjunctFutureCompletion() throws Exception { + // some futures that we combine + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // some future is initially completed + future2.complete(new Object()); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); — End diff – ah makes sense
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101766842

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          +
          + // we need to first add the slots to this list, to be safe on release
          + resources.add(slots);
          +
          + for (ExecutionAndSlot ens : slots)

          { + slotFutures.add(ens.slotFuture); + }

          + }
          +
          + // this future is complete once all slot futures are complete.
          + // the future fails once one slot future fails.
          + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
          — End diff –

          Shouldn't the `fail` operations be idempotent and only take effect for the first failure?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101766842 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + + // we need to first add the slots to this list, to be safe on release + resources.add(slots); + + for (ExecutionAndSlot ens : slots) { + slotFutures.add(ens.slotFuture); + } + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); — End diff – Shouldn't the `fail` operations be idempotent and only take effect for the first failure?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101765953

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          +
          + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
          +
          + for (Future<?> future : futures)

          { + future.handle(conjunct.completionHandler); + }

          +
          + return conjunct;
          + }
          +
          + /**
          + * A future that is complete once multiple other futures completed. The futures are not
          + * necessarily of the same type, which is why the type of this Future is

          {@code Void}

          .
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
          + *

          {@link Future#thenCombine(Future, BiFunction)}

          ) is that ConjunctFuture also tracks how
          + * many of the Futures are already complete.
          + */
          + public interface ConjunctFuture extends CompletableFuture<Void>

          { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + }

          +
          + /**
          + * The implementation of the

          {@link ConjunctFuture}

          .
          + *
          + * <p>Implementation notice: The member fields all have package-private access, because they are
          + * either accessed by an inner subclass or by the enclosing class.
          + */
          + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
          — End diff –

          Doesn't the `AtomicInteger` make it thread safe? Every call to the BiFunction will manipulate a distinct array field. And the array is fixed, so no resizing operation can take place.

          Btw: I think it should be `set(index, element)` instead of `add(index, element)` because of the fixed nature of the array list.

          Yes please do it as a follow-up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101765953 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); + + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + + for (Future<?> future : futures) { + future.handle(conjunct.completionHandler); + } + + return conjunct; + } + + /** + * A future that is complete once multiple other futures completed. The futures are not + * necessarily of the same type, which is why the type of this Future is {@code Void} . + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via + * {@link Future#thenCombine(Future, BiFunction)} ) is that ConjunctFuture also tracks how + * many of the Futures are already complete. + */ + public interface ConjunctFuture extends CompletableFuture<Void> { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + } + + /** + * The implementation of the {@link ConjunctFuture} . + * + * <p>Implementation notice: The member fields all have package-private access, because they are + * either accessed by an inner subclass or by the enclosing class. + */ + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { — End diff – Doesn't the `AtomicInteger` make it thread safe? Every call to the BiFunction will manipulate a distinct array field. And the array is fixed, so no resizing operation can take place. Btw: I think it should be `set(index, element)` instead of `add(index, element)` because of the fixed nature of the array list. Yes please do it as a follow-up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101754578

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          +
          + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
          +
          + for (Future<?> future : futures)

          { + future.handle(conjunct.completionHandler); + }

          +
          + return conjunct;
          + }
          +
          + /**
          + * A future that is complete once multiple other futures completed. The futures are not
          + * necessarily of the same type, which is why the type of this Future is

          {@code Void}

          .
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
          + *

          {@link Future#thenCombine(Future, BiFunction)}

          ) is that ConjunctFuture also tracks how
          + * many of the Futures are already complete.
          + */
          + public interface ConjunctFuture extends CompletableFuture<Void>

          { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + }

          +
          + /**
          + * The implementation of the

          {@link ConjunctFuture}

          .
          + *
          + * <p>Implementation notice: The member fields all have package-private access, because they are
          + * either accessed by an inner subclass or by the enclosing class.
          + */
          + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
          — End diff –

          Interesting idea. I think the linked implementation is not yet thread safe, because the BiFunction that adds the results to the collection is called concurrently as the different original futures complete. For this particular use case, we'd need to also preserve the order. This is easy to change by simply pre-allocating a target array and setting the results to the positions (the completion function would need to get the target index).

          I would actually like to do that as a separate follow-up, unless you object there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101754578 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); + + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + + for (Future<?> future : futures) { + future.handle(conjunct.completionHandler); + } + + return conjunct; + } + + /** + * A future that is complete once multiple other futures completed. The futures are not + * necessarily of the same type, which is why the type of this Future is {@code Void} . + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via + * {@link Future#thenCombine(Future, BiFunction)} ) is that ConjunctFuture also tracks how + * many of the Futures are already complete. + */ + public interface ConjunctFuture extends CompletableFuture<Void> { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + } + + /** + * The implementation of the {@link ConjunctFuture} . + * + * <p>Implementation notice: The member fields all have package-private access, because they are + * either accessed by an inner subclass or by the enclosing class. + */ + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { — End diff – Interesting idea. I think the linked implementation is not yet thread safe, because the BiFunction that adds the results to the collection is called concurrently as the different original futures complete. For this particular use case, we'd need to also preserve the order. This is easy to change by simply pre-allocating a target array and setting the results to the positions (the completion function would need to get the target index). I would actually like to do that as a separate follow-up, unless you object there.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101753992

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -292,7 +305,8 @@ public ExecutionGraph(
          this.stateTimestamps = new long[JobStatus.values().length];
          this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();

          • this.timeout = timeout;
            + this.rpcCallTimeout = timeout;
              • End diff –

          will do

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101753992 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -292,7 +305,8 @@ public ExecutionGraph( this.stateTimestamps = new long [JobStatus.values().length] ; this.stateTimestamps [JobStatus.CREATED.ordinal()] = System.currentTimeMillis(); this.timeout = timeout; + this.rpcCallTimeout = timeout; End diff – will do
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101753888

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          +
          + // we need to first add the slots to this list, to be safe on release
          + resources.add(slots);
          +
          + for (ExecutionAndSlot ens : slots)

          { + slotFutures.add(ens.slotFuture); + }

          + }
          +
          + // this future is complete once all slot futures are complete.
          + // the future fails once one slot future fails.
          + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
          — End diff –

          I thought about that, and one purpose of this change is to avoid many partial deployments / failures when not all resources are available.

          In the "FLIP-1" work, we would introduce something like "domains of tasks that schedule and fail together". We can schedule them topologically independently.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101753888 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + + // we need to first add the slots to this list, to be safe on release + resources.add(slots); + + for (ExecutionAndSlot ens : slots) { + slotFutures.add(ens.slotFuture); + } + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); — End diff – I thought about that, and one purpose of this change is to avoid many partial deployments / failures when not all resources are available. In the "FLIP-1" work, we would introduce something like "domains of tasks that schedule and fail together". We can schedule them topologically independently.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101753597

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java —
          @@ -0,0 +1,108 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.executiongraph;
          +
          +import org.apache.flink.runtime.concurrent.BiFunction;
          +import org.apache.flink.runtime.concurrent.Future;
          +import org.apache.flink.runtime.instance.SimpleSlot;
          +import org.apache.flink.util.ExceptionUtils;
          +
          +import java.util.List;
          +
          +/**
          + * Utilities for dealing with the execution graphs and scheduling.
          + */
          +public class ExecutionGraphUtils {
          +
          + /**
          + * Releases the slot represented by the given future. If the future is complete, the
          + * slot is immediately released. Otherwise, the slot is released as soon as the future
          + * is completed.
          + *
          + * <p>Note that releasing the slot means cancelling any task execution currently
          + * associated with that slot.
          + *
          + * @param slotFuture The future for the slot to release.
          + */
          + public static void releaseSlotFuture(Future<SimpleSlot> slotFuture)

          { + slotFuture.handle(ReleaseSlotFunction.INSTANCE); + }

          +
          + /**
          + * Releases the all the slots in the list of arrays of

          {@code ExecutionAndSlot}

          .
          + * For each future in that collection holds: If the future is complete, its slot is
          + * immediately released. Otherwise, the slot is released as soon as the future
          + * is completed.
          + *
          + * <p>This methods never throws any exceptions (subclasses of

          {@code Exception}

          )
          + * and continues to release the remaining slots if one slot release failed. We only
          + * catch Exceptions here (and not other throwables) because the code executed while
          + * releasing slot does not involve any dynamic
          — End diff –

          true, will fix that

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101753597 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java — @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.util.ExceptionUtils; + +import java.util.List; + +/** + * Utilities for dealing with the execution graphs and scheduling. + */ +public class ExecutionGraphUtils { + + /** + * Releases the slot represented by the given future. If the future is complete, the + * slot is immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>Note that releasing the slot means cancelling any task execution currently + * associated with that slot. + * + * @param slotFuture The future for the slot to release. + */ + public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) { + slotFuture.handle(ReleaseSlotFunction.INSTANCE); + } + + /** + * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot} . + * For each future in that collection holds: If the future is complete, its slot is + * immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>This methods never throws any exceptions (subclasses of {@code Exception} ) + * and continues to release the remaining slots if one slot release failed. We only + * catch Exceptions here (and not other throwables) because the code executed while + * releasing slot does not involve any dynamic — End diff – true, will fix that
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101753433

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java —
          @@ -0,0 +1,190 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.concurrent;
          +
          +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
          +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
          +
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.Arrays;
          +import java.util.Collections;
          +import java.util.concurrent.ExecutionException;
          +
          +import static org.junit.Assert.*;
          +
          +/**
          + * Tests for the utility methods in

          {@link FutureUtils}

          + */
          +public class FutureUtilsTest {
          +
          + @Test
          + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
          + try

          { + FutureUtils.combineAll(null); + fail(); + }

          catch (NullPointerException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + }

          catch (IllegalArgumentException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + }

          catch (NullPointerException ignored) {}
          + }
          +
          + @Test
          + public void testConjunctFutureCompletion() throws Exception {
          + // some futures that we combine
          + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
          +
          + // some future is initially completed
          + future2.complete(new Object());
          +
          + // build the conjunct future
          + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
          +
          + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
          + @Override
          + public void accept(Void value) {}
          + });
          — End diff –

          This is simply added as a test that applyied functions (and their result futures) are not called before completion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101753433 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java — @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +/** + * Tests for the utility methods in {@link FutureUtils} + */ +public class FutureUtilsTest { + + @Test + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { + try { + FutureUtils.combineAll(null); + fail(); + } catch (NullPointerException ignored) {} + + try { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + } catch (IllegalArgumentException ignored) {} + + try { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + } catch (NullPointerException ignored) {} + } + + @Test + public void testConjunctFutureCompletion() throws Exception { + // some futures that we combine + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // some future is initially completed + future2.complete(new Object()); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); — End diff – This is simply added as a test that applyied functions (and their result futures) are not called before completion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101737070

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          +
          + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
          +
          + for (Future<?> future : futures)

          { + future.handle(conjunct.completionHandler); + }

          +
          + return conjunct;
          + }
          +
          + /**
          + * A future that is complete once multiple other futures completed. The futures are not
          + * necessarily of the same type, which is why the type of this Future is

          {@code Void}

          .
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
          + *

          {@link Future#thenCombine(Future, BiFunction)}

          ) is that ConjunctFuture also tracks how
          + * many of the Futures are already complete.
          + */
          + public interface ConjunctFuture extends CompletableFuture<Void>

          { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + }

          +
          + /**
          + * The implementation of the

          {@link ConjunctFuture}

          .
          + *
          + * <p>Implementation notice: The member fields all have package-private access, because they are
          + * either accessed by an inner subclass or by the enclosing class.
          + */
          + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
          — End diff –

          I like the idea of the ConjunctFuture I was wondering whether we can generalize it a little bit more by also collecting the actual values. Then the ConjunctFuture would effectively return a collection of the common base type of all registered futures. Here is a commit where I tried it out: https://github.com/tillrohrmann/flink/commit/f1f5ab63bfe75d629230e0fc2cf37d2499d85548. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101737070 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); + + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + + for (Future<?> future : futures) { + future.handle(conjunct.completionHandler); + } + + return conjunct; + } + + /** + * A future that is complete once multiple other futures completed. The futures are not + * necessarily of the same type, which is why the type of this Future is {@code Void} . + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via + * {@link Future#thenCombine(Future, BiFunction)} ) is that ConjunctFuture also tracks how + * many of the Futures are already complete. + */ + public interface ConjunctFuture extends CompletableFuture<Void> { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + } + + /** + * The implementation of the {@link ConjunctFuture} . + * + * <p>Implementation notice: The member fields all have package-private access, because they are + * either accessed by an inner subclass or by the enclosing class. + */ + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { — End diff – I like the idea of the ConjunctFuture I was wondering whether we can generalize it a little bit more by also collecting the actual values. Then the ConjunctFuture would effectively return a collection of the common base type of all registered futures. Here is a commit where I tried it out: https://github.com/tillrohrmann/flink/commit/f1f5ab63bfe75d629230e0fc2cf37d2499d85548 . What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101739062

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          — End diff –

          With the generalized `ConjunctFuture` we could return a collection if `Future<ExecutionAndSlots>` which could then be combined to a `ConjunctFuture<ExecutionAndSlots>`. When completed it would pass a `Collection<ExecutionAndSlots>` to the handle method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101739062 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); — End diff – With the generalized `ConjunctFuture` we could return a collection if `Future<ExecutionAndSlots>` which could then be combined to a `ConjunctFuture<ExecutionAndSlots>`. When completed it would pass a `Collection<ExecutionAndSlots>` to the handle method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101746547

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          +
          + // we need to first add the slots to this list, to be safe on release
          + resources.add(slots);
          +
          + for (ExecutionAndSlot ens : slots)

          { + slotFutures.add(ens.slotFuture); + }

          + }
          +
          + // this future is complete once all slot futures are complete.
          + // the future fails once one slot future fails.
          + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
          — End diff –

          Just a thought: Right now we're waiting for all futures to complete. But couldn't we also create graph of dependencies mirroring the topological ordering by combining multiple `ConjunctFutures` where each `ConjunctFuture` represents the inputs of a given vertex. That way, we could speed up the deployment a little bit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101746547 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + + // we need to first add the slots to this list, to be safe on release + resources.add(slots); + + for (ExecutionAndSlot ens : slots) { + slotFutures.add(ens.slotFuture); + } + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); — End diff – Just a thought: Right now we're waiting for all futures to complete. But couldn't we also create graph of dependencies mirroring the topological ordering by combining multiple `ConjunctFutures` where each `ConjunctFuture` represents the inputs of a given vertex. That way, we could speed up the deployment a little bit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101739380

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
          }
          }

          + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
          + // simply take the vertices without inputs.
          + for (ExecutionJobVertex ejv : this.tasks.values()) {
          + if (ejv.getJobVertex().isInputVertex())

          { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + }

          + }
          + }
          +
          + /**
          + *
          + *
          + * @param slotProvider The resource provider from which the slots are allocated
          + * @param timeout The maximum time that the deployment may take, before a
          + * TimeoutException is thrown.
          + */
          + private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
          + checkState(state == JobStatus.RUNNING, "job is not running currently");
          +
          + // Important: reserve all the space we need up front.
          + // that way we do not have any operation that can fail between allocating the slots
          + // and adding them to the list. If we had a failure in between there, that would
          + // cause the slots to get lost
          + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
          + final boolean queued = allowQueuedScheduling;
          +
          + // we use this flag to handle failures in a 'finally' clause
          + // that allows us to not go through clumsy cast-and-rethrow logic
          + boolean successful = false;
          +
          + try {
          + // collecting all the slots may resize and fail in that operation without slots getting lost
          + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
          +
          + // allocate the slots (obtain all their futures
          + for (ExecutionJobVertex ejv : getVerticesTopologically()) {
          + // these calls are not blocking, they only return futures
          + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
          — End diff –

          However, as I see, we would have to reflect the same order in the collection because it requires a topological order.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101739380 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); — End diff – However, as I see, we would have to reflect the same order in the collection because it requires a topological order.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101746831

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java —
          @@ -0,0 +1,108 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.executiongraph;
          +
          +import org.apache.flink.runtime.concurrent.BiFunction;
          +import org.apache.flink.runtime.concurrent.Future;
          +import org.apache.flink.runtime.instance.SimpleSlot;
          +import org.apache.flink.util.ExceptionUtils;
          +
          +import java.util.List;
          +
          +/**
          + * Utilities for dealing with the execution graphs and scheduling.
          + */
          +public class ExecutionGraphUtils {
          +
          + /**
          + * Releases the slot represented by the given future. If the future is complete, the
          + * slot is immediately released. Otherwise, the slot is released as soon as the future
          + * is completed.
          + *
          + * <p>Note that releasing the slot means cancelling any task execution currently
          + * associated with that slot.
          + *
          + * @param slotFuture The future for the slot to release.
          + */
          + public static void releaseSlotFuture(Future<SimpleSlot> slotFuture)

          { + slotFuture.handle(ReleaseSlotFunction.INSTANCE); + }

          +
          + /**
          + * Releases the all the slots in the list of arrays of

          {@code ExecutionAndSlot}

          .
          + * For each future in that collection holds: If the future is complete, its slot is
          + * immediately released. Otherwise, the slot is released as soon as the future
          + * is completed.
          + *
          + * <p>This methods never throws any exceptions (subclasses of

          {@code Exception}

          )
          + * and continues to release the remaining slots if one slot release failed. We only
          + * catch Exceptions here (and not other throwables) because the code executed while
          + * releasing slot does not involve any dynamic
          — End diff –

          sentence incomplete

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101746831 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java — @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.util.ExceptionUtils; + +import java.util.List; + +/** + * Utilities for dealing with the execution graphs and scheduling. + */ +public class ExecutionGraphUtils { + + /** + * Releases the slot represented by the given future. If the future is complete, the + * slot is immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>Note that releasing the slot means cancelling any task execution currently + * associated with that slot. + * + * @param slotFuture The future for the slot to release. + */ + public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) { + slotFuture.handle(ReleaseSlotFunction.INSTANCE); + } + + /** + * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot} . + * For each future in that collection holds: If the future is complete, its slot is + * immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>This methods never throws any exceptions (subclasses of {@code Exception} ) + * and continues to release the remaining slots if one slot release failed. We only + * catch Exceptions here (and not other throwables) because the code executed while + * releasing slot does not involve any dynamic — End diff – sentence incomplete
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101747520

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java —
          @@ -0,0 +1,190 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.concurrent;
          +
          +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
          +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
          +
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.Arrays;
          +import java.util.Collections;
          +import java.util.concurrent.ExecutionException;
          +
          +import static org.junit.Assert.*;
          +
          +/**
          + * Tests for the utility methods in

          {@link FutureUtils}

          + */
          +public class FutureUtilsTest {
          +
          + @Test
          + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
          + try

          { + FutureUtils.combineAll(null); + fail(); + }

          catch (NullPointerException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + }

          catch (IllegalArgumentException ignored) {}
          +
          + try

          { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + }

          catch (NullPointerException ignored) {}
          + }
          +
          + @Test
          + public void testConjunctFutureCompletion() throws Exception {
          + // some futures that we combine
          + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
          + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
          +
          + // some future is initially completed
          + future2.complete(new Object());
          +
          + // build the conjunct future
          + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
          +
          + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
          + @Override
          + public void accept(Void value) {}
          + });
          — End diff –

          Why do we add this `AcceptFunction`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101747520 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java — @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +/** + * Tests for the utility methods in {@link FutureUtils} + */ +public class FutureUtilsTest { + + @Test + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { + try { + FutureUtils.combineAll(null); + fail(); + } catch (NullPointerException ignored) {} + + try { + FutureUtils.combineAll(Collections.<Future<?>>emptyList()); + fail(); + } catch (IllegalArgumentException ignored) {} + + try { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + } catch (NullPointerException ignored) {} + } + + @Test + public void testConjunctFutureCompletion() throws Exception { + // some futures that we combine + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // some future is initially completed + future2.complete(new Object()); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); — End diff – Why do we add this `AcceptFunction`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101747876

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java —
          @@ -41,7 +41,7 @@

          • <p>When this environment is instantiated, it uses a default parallelism of {@code 1}

            . The default

          • parallelism can be set via {@link #setParallelism(int)}

            .
            */
            -@Public
            +@Internal

              • End diff –

          :+1:

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101747876 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java — @@ -41,7 +41,7 @@ <p>When this environment is instantiated, it uses a default parallelism of {@code 1} . The default parallelism can be set via {@link #setParallelism(int)} . */ -@Public +@Internal End diff – :+1:
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101737967

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -292,7 +305,8 @@ public ExecutionGraph(
          this.stateTimestamps = new long[JobStatus.values().length];
          this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();

          • this.timeout = timeout;
            + this.rpcCallTimeout = timeout;
              • End diff –

          Maybe we could add a null check here. I think I forgot it initially.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101737967 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -292,7 +305,8 @@ public ExecutionGraph( this.stateTimestamps = new long [JobStatus.values().length] ; this.stateTimestamps [JobStatus.CREATED.ordinal()] = System.currentTimeMillis(); this.timeout = timeout; + this.rpcCallTimeout = timeout; End diff – Maybe we could add a null check here. I think I forgot it initially.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3295#discussion_r101731399

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -88,4 +100,104 @@ public RetryException(Throwable cause)

          { super(cause); }

          }
          +
          + // ------------------------------------------------------------------------
          + // composing futures
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Creates a future that is complete once multiple other futures completed.
          + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
          + * conjunction fails.
          + *
          + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
          + * completed successfully, via

          {@link ConjunctFuture#getNumFuturesCompleted()}

          .
          + *
          + * @param futures The futures that make up the conjunction. No null entries are allowed.
          + * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
          + */
          + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
          + checkNotNull(futures, "futures");
          + checkArgument(!futures.isEmpty(), "futures is empty");
          — End diff –

          Couldn't an empty futures list return a completed `ConjunctFuture`? This would resemble a little bit more the ∀ semantics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3295#discussion_r101731399 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -88,4 +100,104 @@ public RetryException(Throwable cause) { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()} . + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + checkArgument(!futures.isEmpty(), "futures is empty"); — End diff – Couldn't an empty futures list return a completed `ConjunctFuture`? This would resemble a little bit more the ∀ semantics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3295

          FLINK-5747 [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

            1. Problem Addressed

          Currently, eager scheduling immediately triggers the scheduling for all vertices and their subtasks in topological order.

          This has two problems:

          • This works only, as long as resource acquisition is "synchronous". With dynamic resource acquisition in FLIP-6, the resources are returned as Futures which may complete out of order. This results in out-of-order (not in topological order) scheduling of tasks which does not work for streaming.
          • Deploying some tasks that depend on other tasks before it is clear that the other tasks have resources as well leads to situations where many deploy/recovery cycles happen before enough resources are available to get the job running fully.
            1. Implemented Change
          • The `Execution` has separate methods to allocate a resource and to deploy the task to that resource
          • The *eager* scheduling mode allocates all resources in one chunk and then deploys once all resources are available.

          As a utility, this implements the `FutureUtils.combineAll` method that combines the Futures of the individual resources to a combined Future.

            1. Tests

          The main tests are in `ExecutionGraphSchedulingTest`. The used utilities are tested in `FutureUtilsTest` and in `ExecutionGraphUtilsTest`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink slot_scheduling

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3295.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3295


          commit 1f18cbb0d6d119fa5e5c4803201c28887b90cef5
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-02-03T19:26:23Z

          FLINK-5747 [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

          That way, strictly topological deployment can be guaranteed.

          Also, many quick deploy/not-enough-resources/fail/recover cycles can be
          avoided in the cases where resources need some time to appear.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3295 FLINK-5747 [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk Problem Addressed Currently, eager scheduling immediately triggers the scheduling for all vertices and their subtasks in topological order. This has two problems: This works only, as long as resource acquisition is "synchronous". With dynamic resource acquisition in FLIP-6, the resources are returned as Futures which may complete out of order. This results in out-of-order (not in topological order) scheduling of tasks which does not work for streaming. Deploying some tasks that depend on other tasks before it is clear that the other tasks have resources as well leads to situations where many deploy/recovery cycles happen before enough resources are available to get the job running fully. Implemented Change The `Execution` has separate methods to allocate a resource and to deploy the task to that resource The * eager * scheduling mode allocates all resources in one chunk and then deploys once all resources are available. As a utility, this implements the `FutureUtils.combineAll` method that combines the Futures of the individual resources to a combined Future. Tests The main tests are in `ExecutionGraphSchedulingTest`. The used utilities are tested in `FutureUtilsTest` and in `ExecutionGraphUtilsTest`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink slot_scheduling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3295 commit 1f18cbb0d6d119fa5e5c4803201c28887b90cef5 Author: Stephan Ewen <sewen@apache.org> Date: 2017-02-03T19:26:23Z FLINK-5747 [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk That way, strictly topological deployment can be guaranteed. Also, many quick deploy/not-enough-resources/fail/recover cycles can be avoided in the cases where resources need some time to appear.

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development