Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35766

When the job contains many YieldingOperatorFactory instances, compiling the JobGraph hangs

    XMLWordPrintableJSON

Details

    Description

      When a job contains YieldingOperatorFactory instances, the time complexity of compiling the JobGraph is very high (with a complexity of O(N!)). This leads to the job compilation hanging on creating chains when there are many YieldingOperatorFactory instances (e.g., more than 30).

      This is a very rare bug, but we have users who use SQL that contains many LookupJoins that use YieldingOperatorFactory in the production environment. A simple reproducible case is as follows:

      @Test
      void test() {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
              env.fromSource(
                              new NumberSequenceSource(0, 10), WatermarkStrategy.noWatermarks(), "input")
                      .map((x) -> x)
                      // add 32 YieldingOperatorFactory
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .transform(
                              "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>())
                      .addSink(new DiscardingSink<>());
      env.getStreamGraph().getJobGraph();
      } 

      The reason is that there is no caching when determining edge chainable, leading to repeated backward traversal each time a YiedlingOperatorFactor is encountered onwards (see code: https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602).

      Attachments

        Issue Links

          Activity

            People

              JunRuiLi Junrui Li
              JunRuiLi Junrui Li
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: