Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31114

Batch job fails with IllegalStateException when using adaptive batch scheduler

    XMLWordPrintableJSON

Details

    Description

      This is caused by FLINK-30942. Currently, if two job vertices have the same input and the same parallelism(even the parallelism is -1), they will share partitions. However after FLINK-30942, the scheduler may change the job vertices' parallelism before scheduling, resulting in two job vertices having the same parallelism in compilation phase (in which case will share partitions), but different parallelism in the scheduling phase, and then cause the following exception:

      Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Consumers must have the same max parallelism.
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
              at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
              at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
              ... 37 more
      Caused by: java.lang.IllegalStateException: Consumers must have the same max parallelism.
              at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
              at org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
              at org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
              at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
              at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
              at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
              at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
              ... 38 more
      

      Putting the following test into AdaptiveBatchSchedulerITCase can reproduce the problem:

          @Test
          void testDifferentConsumerParallelism() throws Exception {
              final Configuration configuration = createConfiguration();
              final StreamExecutionEnvironment env =
                      StreamExecutionEnvironment.createLocalEnvironment(configuration);
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);
              env.setParallelism(8);
      
              final DataStream<Long> source1 =
                      env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                              .setParallelism(8)
                              .name("source1")
                              .slotSharingGroup("group1");
      
              final DataStream<Long> source2 =
                      env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                              .setParallelism(8)
                              .name("source2")
                              .slotSharingGroup("group2");
      
              source1.forward()
                      .union(source2)
                      .map(new NumberCounter())
                      .name("map1")
                      .slotSharingGroup("group3");
      
              source2.map(new NumberCounter()).name("map2").slotSharingGroup("group4");
      
              env.execute();
          }
      

      Attachments

        Issue Links

          Activity

            People

              wanglijie Lijie Wang
              wanglijie Lijie Wang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: