Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33768

FLIP-379: Support dynamic source parallelism inference for batch jobs

    XMLWordPrintableJSON

Details

    • Hide
      In Flink 1.19, we have supported dynamic source parallelism inference for batch jobs, which allows source connectors to dynamically infer the parallelism based on the actual amount of data to consume. This feature is a significant improvement over previous versions, which only assigned a fixed default parallelism to source vertices.

      Source connectors need to implement the inference interface to enable dynamic parallelism inference. Currently, the FileSource connector has already been developed with this functionality in place.

      Additionally, the configuration `execution.batch.adaptive.auto-parallelism.default-source-parallelism` will be used as the upper bound of source parallelism inference. And now it will not default to 1. Instead, if it is not set, the upper bound of allowed parallelism set via `execution.batch.adaptive.auto-parallelism.max-parallelism` will be used. If that configuration is also not set, the default parallelism set via `parallelism.default` or StreamExecutionEnvironment#setParallelism() will be used instead.
      Show
      In Flink 1.19, we have supported dynamic source parallelism inference for batch jobs, which allows source connectors to dynamically infer the parallelism based on the actual amount of data to consume. This feature is a significant improvement over previous versions, which only assigned a fixed default parallelism to source vertices. Source connectors need to implement the inference interface to enable dynamic parallelism inference. Currently, the FileSource connector has already been developed with this functionality in place. Additionally, the configuration `execution.batch.adaptive.auto-parallelism.default-source-parallelism` will be used as the upper bound of source parallelism inference. And now it will not default to 1. Instead, if it is not set, the upper bound of allowed parallelism set via `execution.batch.adaptive.auto-parallelism.max-parallelism` will be used. If that configuration is also not set, the default parallelism set via `parallelism.default` or StreamExecutionEnvironment#setParallelism() will be used instead.

    Description

      Currently, for JobVertices without parallelism configured, the AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the volume of input data. Specifically, for Source vertices, it uses the value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the fixed parallelism. If this is not set by the user, the default value of 1  is used as the source parallelism, which is actually a temporary implementation solution.

      We aim to support dynamic source parallelism inference for batch jobs. More details see FLIP-379.

      Attachments

        Issue Links

          Activity

            People

              xiasun xingbe
              xiasun xingbe
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: