Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5652

Memory leak in AsyncDataStream

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.3.0
    • 1.2.1, 1.3.0
    • API / DataStream
    • None

    Description

      When async operation timeout is > 0, the number of StreamRecordQueueEntry instances keeps growing.

      It can be easily reproduced with the following code:

        val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
        
        val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
          override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): Unit = {
            collector.collect(List(input))
          }
        }
        
        AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES, 1).print()
      

      Attachments

        Issue Links

          Activity

            People

              trohrmann Till Rohrmann
              dgolubets Dmitry Golubets
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: