Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32513

Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.15.3, 1.16.1, 1.17.1
    • 1.18.2, 1.20.0, 1.19.1
    • None
    • All modes (local, k8s session, k8s application, ...)

      Flink 1.15.3
      Flink 1.16.1
      Flink 1.17.1

    Description

      Flink job executed in BATCH mode with a significant number of transformations (more than 30 in my case) takes very long time to start due to the method StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of the method, a lot of memory is consumed, which causes the GC to fire frequently.

      Thread Dump:

      "main@1" prio=5 tid=0x1 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at java.util.ArrayList.addAll(ArrayList.java:702)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
            at org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
            at org.apache.flink.streaming.api.graph.StreamGraphGenerator.lambda$existsUnboundedSource$1(StreamGraphGenerator.java:509)
            at org.apache.flink.streaming.api.graph.StreamGraphGenerator$$Lambda$1988.1989814391.test(Unknown Source:-1)
            at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
            at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
            at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
            at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
            at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
            at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
            at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
            at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
            at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
            at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
            at org.apache.flink.streaming.api.graph.StreamGraphGenerator.existsUnboundedSource(StreamGraphGenerator.java:506)
            at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:487)
            at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:313)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)

      Attachments

        1. image-2023-07-10-17-26-46-544.png
          29 kB
          Vladislav Keda

        Issue Links

          Activity

            People

              jeyhunkarimov Jeyhun Karimov
              vladislav.keda Vladislav Keda
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: