Details

    • Type: Improvement
    • Status: Closed
    • Priority: Trivial
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Labels:
      None

      Description

      The ConjunctFuture allows to combine multiple Futures into one. At the moment it does not return the collection of results of the individuals futures. In some cases this information is helpful and should, thus, be returned.

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0: c081201fd6c3c97a932c09d971f24bf42102650f
          1.3.0: 9c6c9654e11dd31fa2323977fc02961811d6e518

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0: c081201fd6c3c97a932c09d971f24bf42102650f 1.3.0: 9c6c9654e11dd31fa2323977fc02961811d6e518
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3873

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3873
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3873

          Updated the PR to incorporate the PR review. Thanks for the review @StephanEwen.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3873 Updated the PR to incorporate the PR review. Thanks for the review @StephanEwen.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3873

          You're right with the thread safety. I will change it. I will introduce a `WaitingFuture` which will simply wait on the completion of all its futures and discard all future values, thus, returning `null` as a result.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3873 You're right with the thread safety. I will change it. I will introduce a `WaitingFuture` which will simply wait on the completion of all its futures and discard all future values, thus, returning `null` as a result.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3873#discussion_r116169564

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures)

          • <p>Implementation notice: The member fields all have package-private access, because they are
          • either accessed by an inner subclass or by the enclosing class.
            */
          • private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
            + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {

          /** The total number of futures in the conjunction */
          final int numTotal;

          /** The number of futures in the conjunction that are already complete */
          final AtomicInteger numCompleted = new AtomicInteger();

          + final ArrayList<T> results;
          +
          /** The function that is attached to all futures in the conjunction. Once a future

          • is complete, this function tracks the completion or fails the conjunct.
            */
          • final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
            + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {

          @Override

          • public Void apply(Object o, Throwable throwable) {
            + public Void apply(T o, Throwable throwable) {
            if (throwable != null) { completeExceptionally(throwable); - }
          • else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + }

            else {
            + results.add(o);

              • End diff –

          True, I wanted to add an atomic integer to determine the index but forgot about it. Thanks for catching it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3873#discussion_r116169564 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) <p>Implementation notice: The member fields all have package-private access, because they are either accessed by an inner subclass or by the enclosing class. */ private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> { /** The total number of futures in the conjunction */ final int numTotal; /** The number of futures in the conjunction that are already complete */ final AtomicInteger numCompleted = new AtomicInteger(); + final ArrayList<T> results; + /** The function that is attached to all futures in the conjunction. Once a future is complete, this function tracks the completion or fails the conjunct. */ final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() { @Override public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + results.add(o); End diff – True, I wanted to add an atomic integer to determine the index but forgot about it. Thanks for catching it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3873#discussion_r116062457

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java —
          @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures)

          • <p>Implementation notice: The member fields all have package-private access, because they are
          • either accessed by an inner subclass or by the enclosing class.
            */
          • private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
            + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {

          /** The total number of futures in the conjunction */
          final int numTotal;

          /** The number of futures in the conjunction that are already complete */
          final AtomicInteger numCompleted = new AtomicInteger();

          + final ArrayList<T> results;
          +
          /** The function that is attached to all futures in the conjunction. Once a future

          • is complete, this function tracks the completion or fails the conjunct.
            */
          • final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
            + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {

          @Override

          • public Void apply(Object o, Throwable throwable) {
            + public Void apply(T o, Throwable throwable) {
            if (throwable != null) { completeExceptionally(throwable); - }
          • else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + }

            else {
            + results.add(o);

              • End diff –

          Is this thread safe? My assumption is that many of the completion handlers can be called at the same time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3873#discussion_r116062457 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java — @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) <p>Implementation notice: The member fields all have package-private access, because they are either accessed by an inner subclass or by the enclosing class. */ private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> { /** The total number of futures in the conjunction */ final int numTotal; /** The number of futures in the conjunction that are already complete */ final AtomicInteger numCompleted = new AtomicInteger(); + final ArrayList<T> results; + /** The function that is attached to all futures in the conjunction. Once a future is complete, this function tracks the completion or fails the conjunct. */ final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() { @Override public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + results.add(o); End diff – Is this thread safe? My assumption is that many of the completion handlers can be called at the same time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3873

          FLINK-6555 [futures] Generalize ConjunctFuture to return results

          The ConjunctFuture now returns the set of values of the individual futures it is composed of once it is completed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink generalizeConjunctFuture

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3873.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3873


          commit a6fc20d9f8cda04a835459f38ed885e87f3d478b
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-11T15:36:17Z

          FLINK-6555 [futures] Generalize ConjunctFuture to return results

          The ConjunctFuture now returns the set of future values once it is completed.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3873 FLINK-6555 [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of values of the individual futures it is composed of once it is completed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink generalizeConjunctFuture Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3873 commit a6fc20d9f8cda04a835459f38ed885e87f3d478b Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-11T15:36:17Z FLINK-6555 [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development