Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4540

Detached job execution may prevent cluster shutdown

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.2.0, 1.1.2
    • Fix Version/s: None
    • Component/s: YARN
    • Labels:
      None

      Description

      There is a problem with the detached execution of jobs. This can prevent cluster shutdown 1) when eager jobs are executed, i.e. the job calls `collect()/count()`, and 2) when the user jar doesn't contain a job.

      1) For example, ./flink -d -m yarn-cluster -yn 1 ../examples/batch/WordCount.jar will throw an exception and only disconnect the YarnClusterClient afterwards. In detached mode, the code assumes the cluster is shutdown through the shutdownAfterJob method which ensures that the YarnJobManager shuts down after the job completes. Due to the exception thrown when executing eager jobs, the jobmanager never receives a job and thus never shuts down the cluster.

      2) The same problem also occurs in detached execution when the user jar doesn't contain a job.

      A good solution would be to defer cluster startup until the job has been fully assembled.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user MayerRoman opened a pull request:

          https://github.com/apache/flink/pull/3287

          FLINK-4540[yarn] Delayed cluster startup until the job has been fully assembled, in the case of launching detached jobs

          FLINK-4540 Detached job execution may prevent cluster shutdown.

          *workflow description:*
          In the process of launching detached jobs on YARN `FlikYarnSession#createCluster` method call `AbstractYarnClusterDescriptor#deploy`, this method triggers deployment of a flink-cluster on YARN.
          After that `CliFrontend#executeProgram` calls `ClusterClient#run`, causing the start of the job preparation process.
          Further method `YarnClusterClient#submitJob` calls `YarnClusterClient#stopAfterJob` which ensures that the YarnJobManager shuts down after the job completes, and calls `ClusterClient#runDetached` that sends job on the cluster.

          *how the bug occurs:*
          If the error will be thrown after `AbstractYarnClusterDescriptor#deploy`, but before sending job on the cluster, the jobmanager never shuts down the cluster.

          *changes:*
          This pull request defers cluster startup until the job has been fully assembled.

          *additional Information:*

          • these changes do not affect the work of FLIP-6
          • explanation for rows 330-331 in `FlinkYarnSessionCli`:

          After` "flink run ..."` in command line Flink creates instance of `CliFronted`.
          Befor creating instance of `CliFrontend` static block in this class is executed. It creates instances of `FlinkYarnSessionCli`, `FlinkYarnCli`, `DefaultCli `and puts it in static LinkedList.

          In the case of real work, every time a new JVM starts.

          In the case of executing tests in flink-yarn-tests, launchig job in different modes occures in one JVM and static block in `CliFrontend ` is executed only once.
          Therefore once created instance of `FlinkYarnSessionCli ` is reused by all tests in class.

          `YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnCluster` and `#testDetachedPerJobYarnClusterWithStreamingJob` during execution change
          `private boolean detachedMode` in `FlinkYarnSessionCli `to true.

          If after them will run `YARNSessionCapacitySchedulerITCase#perJobYarnCluster` or `#perJobYarnClusterWithParallelism`, because of the changed `detachedMode `in `FlinkYarnSessionCli `they will go the wrong execution way.

          A previous version of this part of the code was based on the fact that the field is a priori false and only checks whether it is necessary to change it to true.
          ```
          if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt()))

          { this.detachedMode = true; yarnClusterDescriptor.setDetachedMode(true); }

          ```
          The new version of this part of the code changes `detachedMode `anyway, whereby if it was true and it isn't launch of detached Job, it will be changed into false.

          ```
          this.detachedMode = cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
          yarnClusterDescriptor.setDetachedMode(this.detachedMode);
          ```

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/MayerRoman/flink FLINK-4540

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3287.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3287


          commit cca5c31767a76a560e66193f28e023210d592dbf
          Author: Roman Maier <roman_maier@epam.com>
          Date: 2017-02-08T08:04:15Z

          FLINK-4540[yarn] Delayed cluster startup until the job has been fully assembled, in the case of launching detached jobs


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/3287 FLINK-4540 [yarn] Delayed cluster startup until the job has been fully assembled, in the case of launching detached jobs FLINK-4540 Detached job execution may prevent cluster shutdown. * workflow description: * In the process of launching detached jobs on YARN `FlikYarnSession#createCluster` method call `AbstractYarnClusterDescriptor#deploy`, this method triggers deployment of a flink-cluster on YARN. After that `CliFrontend#executeProgram` calls `ClusterClient#run`, causing the start of the job preparation process. Further method `YarnClusterClient#submitJob` calls `YarnClusterClient#stopAfterJob` which ensures that the YarnJobManager shuts down after the job completes, and calls `ClusterClient#runDetached` that sends job on the cluster. * how the bug occurs: * If the error will be thrown after `AbstractYarnClusterDescriptor#deploy`, but before sending job on the cluster, the jobmanager never shuts down the cluster. * changes: * This pull request defers cluster startup until the job has been fully assembled. * additional Information: * these changes do not affect the work of FLIP-6 explanation for rows 330-331 in `FlinkYarnSessionCli`: After` "flink run ..."` in command line Flink creates instance of `CliFronted`. Befor creating instance of `CliFrontend` static block in this class is executed. It creates instances of `FlinkYarnSessionCli`, `FlinkYarnCli`, `DefaultCli `and puts it in static LinkedList. In the case of real work, every time a new JVM starts. In the case of executing tests in flink-yarn-tests, launchig job in different modes occures in one JVM and static block in `CliFrontend ` is executed only once. Therefore once created instance of `FlinkYarnSessionCli ` is reused by all tests in class. `YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnCluster` and `#testDetachedPerJobYarnClusterWithStreamingJob` during execution change `private boolean detachedMode` in `FlinkYarnSessionCli `to true. If after them will run `YARNSessionCapacitySchedulerITCase#perJobYarnCluster` or `#perJobYarnClusterWithParallelism`, because of the changed `detachedMode `in `FlinkYarnSessionCli `they will go the wrong execution way. A previous version of this part of the code was based on the fact that the field is a priori false and only checks whether it is necessary to change it to true. ``` if (cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) { this.detachedMode = true; yarnClusterDescriptor.setDetachedMode(true); } ``` The new version of this part of the code changes `detachedMode `anyway, whereby if it was true and it isn't launch of detached Job, it will be changed into false. ``` this.detachedMode = cmd.hasOption(DETACHED.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt()); yarnClusterDescriptor.setDetachedMode(this.detachedMode); ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK-4540 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3287.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3287 commit cca5c31767a76a560e66193f28e023210d592dbf Author: Roman Maier <roman_maier@epam.com> Date: 2017-02-08T08:04:15Z FLINK-4540 [yarn] Delayed cluster startup until the job has been fully assembled, in the case of launching detached jobs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3287

          @rmetzger I think you would be the best person to look at this change...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3287 @rmetzger I think you would be the best person to look at this change...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman closed the pull request at:

          https://github.com/apache/flink/pull/3287

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman closed the pull request at: https://github.com/apache/flink/pull/3287
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user DmytroShkvyra commented on the issue:

          https://github.com/apache/flink/pull/3287

          @MayerRoman Thanks Roman for your efforts!

          Show
          githubbot ASF GitHub Bot added a comment - Github user DmytroShkvyra commented on the issue: https://github.com/apache/flink/pull/3287 @MayerRoman Thanks Roman for your efforts!

            People

            • Assignee:
              roman_maier Roman Maier
              Reporter:
              mxm Maximilian Michels
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development