Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.2.1
    • Component/s: DataStream API
    • Labels:
      None

      Description

      When async operation timeout is > 0, the number of StreamRecordQueueEntry instances keeps growing.

      It can be easily reproduced with the following code:

        val src: DataStream[Int] = env.fromCollection((1 to Int.MaxValue).iterator)
        
        val asyncFunction = new AsyncFunction[Int, Int] with Serializable {
          override def asyncInvoke(input: Int, collector: AsyncCollector[Int]): Unit = {
            collector.collect(List(input))
          }
        }
        
        AsyncDataStream.unorderedWait(src, asyncFunction, 1, TimeUnit.MINUTES, 1).print()
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                till.rohrmann Till Rohrmann
                Reporter:
                dgolubets Dmitry Golubets
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: