Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14861

parallelism.default in flink-conf.yaml do not work which is a bug imported by[FLINK-14745]

    XMLWordPrintableJSON

Details

    Description

      I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work any more when I rebased my branch to master. I debug and find it's a bug imported  by FLINK-14745(https://issues.apache.org/jira/browse/FLINK-14745).

      Detail: 

      // ExecutionConfigAccessor#fromProgramOptions
      public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) {
         checkNotNull(options);
         checkNotNull(jobJars);
      
         final Configuration configuration = new Configuration();
      
         if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
            configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism());
         }
      
         configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode());
         configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit());
      
         ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
         ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString);
      
         SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);
      
         return new ExecutionConfigAccessor(configuration);
      }

       
      [1]. function executionConfigAccessor.getParallelism() will return 1 rather than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT because 
      when getParallelism() function will return the defaultValue of CoreOptions.DEFAULT_PARALLELISM.

       

      // ExecutionConfigAccessor.java
      public int getParallelism() {
       return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
      } 
      // Configuration.java
      public int getInteger(ConfigOption<Integer> configOption) {
       return getOptional(configOption)
       .orElseGet(configOption::defaultValue);
      }

       

      And function executionConfigAccessor.getParallelism()  still return 1 when options.getParallelism() == 1.

      So, the following code in  CliFrontend.java will never reach if user not set parallelism in flink run command line.

      // CliFrontend.java
      int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();

      [2]and another  position, I think we should keep three lines which deleted in FLINK-14745--. 

      // 
      int userParallelism = executionParameters.getParallelism();
      LOG.debug("User parallelism is set to {}", userParallelism);
      //if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
      //userParallelism = defaultParallelism;
      // }
      executeProgram(program, client, userParallelism, executionParameters.getDetachedMode());
       
      

       

      Attachments

        Issue Links

          Activity

            People

              leonard Leonard Xu
              leonard Leonard Xu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m