Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Done
-
None
Description
Documentation: https://github.com/apache/flink/pull/18757
Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
1. Whether the automatically decided parallelism is correct
2. Whether the job result is correct
For example:
final Configuration configuration = new Configuration(); configuration.set( JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch); configuration.setInteger( JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4); configuration.set( JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK, MemorySize.parse("8kb")); configuration.setInteger("parallelism.default", -1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.fromSequence(0, 1000).setParallelism(1) .keyBy(num -> num % 10) .sum(0) .addSink(new PrintSinkFunction<>()); env.execute();
You can run above job and check:
1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3. Jobmanager logs show following logs:
Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed (20ba6b65f97481d5570070de90e4e791) is decided to be 3.
2. The job result should be:
50500 49600 49700 49800 49900 50000 50100 50200 50300 50400
You can change the amout of data produced by source and config options of adaptive batch scheduler according your wishes.
Attachments
Issue Links
- is related to
-
FLINK-26517 Normalize the decided parallelism to power of 2 when using adaptive batch scheduler
- Closed