Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.2.1
    • Component/s: DataStream API
    • Labels:
      None

      Description

      When async operation timeout is > 0, the number of StreamRecordQueueEntry instances keeps growing.

      It can be easily reproduced with the following code:

        val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
        
        val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
          override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): Unit = {
            collector.collect(List(input))
          }
        }
        
        AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES, 1).print()
      

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          Till Rohrmann Is this a known issue?

          Show
          StephanEwen Stephan Ewen added a comment - Till Rohrmann Is this a known issue?
          Hide
          till.rohrmann Till Rohrmann added a comment -

          No this is not known to me. Will investigate it. Thanks for reporting the issue Dmitry Golubets.

          Show
          till.rohrmann Till Rohrmann added a comment - No this is not known to me. Will investigate it. Thanks for reporting the issue Dmitry Golubets .
          Hide
          till.rohrmann Till Rohrmann added a comment - - edited

          The problem is the following: When defining a timeout the AsyncWaitOperator registers a TriggerTask at the ProcessingTimeService which will complete the StreamRecordQueueEntry with a TimeoutException. Since this is a strong reference and there is currently no way to unregister timers, such a TriggerTask can prevent a StreamRecordQueueEntry from being garbage collected (until the TriggerTask has been executed). If we have now a high throughput stream, where each element is quickly processed, and a large timeout (as it is the case with the example job), then we can amass a large number of TriggerTask which refer to actually completed StreamRecordQueueEntries.

          This problem can be mitigated by using WeakReferences but the underlying problem of a potentially large set of useless timers remains. In order to properly solve the problem, I think it would be necessary to unregister timers for already completed StreamRecordQueueEntries. Alternatively, we could register a period TriggerTask which iterates over all StreamRecordQueueEntries and checks which of those have been timed out. That way, we would avoid creating for each StreamRecordQueueEntry a dedicated TriggerTask.

          Show
          till.rohrmann Till Rohrmann added a comment - - edited The problem is the following: When defining a timeout the AsyncWaitOperator registers a TriggerTask at the ProcessingTimeService which will complete the StreamRecordQueueEntry with a TimeoutException . Since this is a strong reference and there is currently no way to unregister timers, such a TriggerTask can prevent a StreamRecordQueueEntry from being garbage collected (until the TriggerTask has been executed). If we have now a high throughput stream, where each element is quickly processed, and a large timeout (as it is the case with the example job), then we can amass a large number of TriggerTask which refer to actually completed StreamRecordQueueEntries . This problem can be mitigated by using WeakReferences but the underlying problem of a potentially large set of useless timers remains. In order to properly solve the problem, I think it would be necessary to unregister timers for already completed StreamRecordQueueEntries . Alternatively, we could register a period TriggerTask which iterates over all StreamRecordQueueEntries and checks which of those have been timed out. That way, we would avoid creating for each StreamRecordQueueEntry a dedicated TriggerTask .
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Actually I think, we can simply register a completion callback on the StreamRecordQueueEntry which cancels the ScheduledFuture of the TriggerTask. Given that the ProcessingTimeSerivce removes the trigger tasks on cancelation, this should fix the problem.

          Show
          till.rohrmann Till Rohrmann added a comment - Actually I think, we can simply register a completion callback on the StreamRecordQueueEntry which cancels the ScheduledFuture of the TriggerTask . Given that the ProcessingTimeSerivce removes the trigger tasks on cancelation, this should fix the problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3264

          FLINK-5652 [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

          Whenever a StreamRecordQueueEntry has been completed we no longer need the registered timeout.
          Therefore, we have to cancel the corresponding ScheduledFuture so that the system knows that
          it can remove the associated TriggerTask. This is important since the TriggerTask contains a
          reference on the StreamRecordQueueEntry. Consequently, such a task will prevent the
          StreamRecordQueueEntry from being garbage collected.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink asyncIOFixTimers

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3264.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3264



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3264 FLINK-5652 [asyncIO] Cancel timers when completing a StreamRecordQueueEntry Whenever a StreamRecordQueueEntry has been completed we no longer need the registered timeout. Therefore, we have to cancel the corresponding ScheduledFuture so that the system knows that it can remove the associated TriggerTask. This is important since the TriggerTask contains a reference on the StreamRecordQueueEntry. Consequently, such a task will prevent the StreamRecordQueueEntry from being garbage collected. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncIOFixTimers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3264.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3264
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3264

          Travis passed. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3264 Travis passed. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3264

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3264
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.3.0: 215776b81a52cd380e8ccabd65da612f77da25e6
          1.2.1: 36c7de1aef7b349b9d66c9d92398f50ebec9d186

          Show
          till.rohrmann Till Rohrmann added a comment - 1.3.0: 215776b81a52cd380e8ccabd65da612f77da25e6 1.2.1: 36c7de1aef7b349b9d66c9d92398f50ebec9d186

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              dgolubets Dmitry Golubets
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development