Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
When a job contains YieldingOperatorFactory instances, the time complexity of compiling the JobGraph is very high (with a complexity of O(N!)). This leads to the job compilation hanging on creating chains when there are many YieldingOperatorFactory instances (e.g., more than 30).
This is a very rare bug, but we have users who use SQL that contains many LookupJoins that use YieldingOperatorFactory in the production environment. A simple reproducible case is as follows:
@Test void test() { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); env.fromSource( new NumberSequenceSource(0, 10), WatermarkStrategy.noWatermarks(), "input") .map((x) -> x) // add 32 YieldingOperatorFactory .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .transform( "test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory<>()) .addSink(new DiscardingSink<>()); env.getStreamGraph().getJobGraph(); }
The reason is that there is no caching when determining edge chainable, leading to repeated backward traversal each time a YiedlingOperatorFactor is encountered onwards (see code: https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602).
Attachments
Issue Links
- links to