Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20159

[FLIP-27 source] FutureNotifier does not return a new future when Future::future() is invoked within the returned future's callback



    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Invalid
    • Affects Version/s: 1.11.2
    • Fix Version/s: 1.12.0
    • Component/s: API / DataStream
    • Labels:


      Here's the problem. FutureNotifier::future should return a new future every time the previous future was completed. That's the expectation. However, if the future is being requested from within the completion callback of the previous future, then it, instead of returning a new future, returns the existing future. This could potentially result in infinite recursions depending on how the callback method is implemented. Here's an example:


      Consumer code:
      void consumeDataOnce() {
        // get the data from the producer and check if it was empty
        Data data = producer.getData();
        // if data was empty, then grab the future and attach a callback as below
        if (data.isEmpty()) {
          producer.getCompletableFuture().thenRun(() -> consumeDataOnce());


      In the above method, let's say the producer notified the consumer (produced by FutureNofier::future), thinking that some data was available to be consumed. Now let's say the data returned from the producer was instead empty during the callback. In this case, the method goes on in an infinite loop when the future is completed. 


      Issue: If you observe FutureNotifier::notifyComplete's implementation closely, you realize that the future is completed before the futureRef is swapped with null. 

      public void notifyComplete() {
        CompletableFuture<Void> future = futureRef.get();
        // If there are multiple threads trying to complete the future, only the first one succeeds.
        if (future != null && future.complete(null)) {
          futureRef.compareAndSet(future, null);

      If we can change the ordering instead, where the future is swapped atomically first before being completed, then we can guarantee that the future returned by FutureNotifier::future will always be a new one if the previous one had completed. 


      Stephan Ewen Becket Qin Steven Zhen Wu




            • Assignee:
              sundaram Sundaram Ananthanarayanan
            • Votes:
              0 Vote for this issue
              2 Start watching this issue


              • Created: