Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.3.0
-
None
Description
When async operation timeout is > 0, the number of StreamRecordQueueEntry instances keeps growing.
It can be easily reproduced with the following code:
val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): Unit = {
collector.collect(List(input))
}
}
AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES, 1).print()
Attachments
Issue Links
- links to