Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
Description
This is caused by FLINK-30942. Currently, if two job vertices have the same input and the same parallelism(even the parallelism is -1), they will share partitions. However after FLINK-30942, the scheduler may change the job vertices' parallelism before scheduling, resulting in two job vertices having the same parallelism in compilation phase (in which case will share partitions), but different parallelism in the scheduling phase, and then cause the following exception:
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Consumers must have the same max parallelism. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 37 more Caused by: java.lang.IllegalStateException: Consumers must have the same max parallelism. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219) at org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501) at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472) at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431) at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) ... 38 more
Putting the following test into AdaptiveBatchSchedulerITCase can reproduce the problem:
@Test void testDifferentConsumerParallelism() throws Exception { final Configuration configuration = createConfiguration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(8); final DataStream<Long> source1 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(8) .name("source1") .slotSharingGroup("group1"); final DataStream<Long> source2 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(8) .name("source2") .slotSharingGroup("group2"); source1.forward() .union(source2) .map(new NumberCounter()) .name("map1") .slotSharingGroup("group3"); source2.map(new NumberCounter()).name("map2").slotSharingGroup("group4"); env.execute(); }
Attachments
Issue Links
- is caused by
-
FLINK-30942 Fix the bug that the decided parallelism by adaptive batch scheduler may be larger than the max parallelism
- Closed
- links to