Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6031

Add parameter for per job yarn clusters to control whether the user code jar is included into the system classloader.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: YARN
    • Labels:
      None

      Description

      FLINK-4913 added the user jar into the system classloader, when starting a Flink per job YARN cluster.
      Some users were experiencing issues with the changed behavior.

      I suggest to introduce a new yarn specific configuration parameter (for the flink-conf.yaml file) to control if the user jar is added into system classloader.

        Issue Links

          Activity

          Show
          rmetzger Robert Metzger added a comment - Merged for master (1.4) in http://git-wip-us.apache.org/repos/asf/flink/commit/4a314a80 Merged for 1.3 in: http://git-wip-us.apache.org/repos/asf/flink/commit/ca3e403e
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3931

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3931
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3931

          I'll merge the change now ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3931 I'll merge the change now ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3931

          (for both 1.3 and 1.4)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3931 (for both 1.3 and 1.4)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3931

          Thank you for addressing my comments so quickly.

          The change is good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3931 Thank you for addressing my comments so quickly. The change is good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3931

          @rmetzger I've addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3931 @rmetzger I've addressed your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3931#discussion_r117204729

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java —
          @@ -665,58 +685,52 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient
          1));
          }

          + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
          + YarnConfigOptions.UserJarInclusion userJarInclusion;
          + try

          { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + }

          catch (IllegalArgumentException e) {
          + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).",
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
          + configuredUserJarInclusion,
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          — End diff –

          this shouldn't be here actually.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3931#discussion_r117204729 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java — @@ -665,58 +685,52 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient 1)); } + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + YarnConfigOptions.UserJarInclusion userJarInclusion; + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); — End diff – this shouldn't be here actually.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3931#discussion_r117204403

          — Diff: docs/setup/yarn_setup.md —
          @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env

          Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!

          +### User jars & Classpath
          +
          +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter.
          — End diff –

          The configuration parameter is actually called `yarn.per-job-cluster.include-user-jar`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3931#discussion_r117204403 — Diff: docs/setup/yarn_setup.md — @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter. — End diff – The configuration parameter is actually called `yarn.per-job-cluster.include-user-jar`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3931#discussion_r117198446

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java —
          @@ -200,6 +214,17 @@ public void setTaskManagerMemory(int memoryMb) {

          public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
          this.flinkConfiguration = conf;
          +
          + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
          + try

          { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + }

          catch (IllegalArgumentException e) {
          + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).",
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
          + configuredUserJarInclusion,
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          + }
          — End diff –

          This code and the code above looks like exactly the same.
          If this is duplicate code, it should be extracted into one method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3931#discussion_r117198446 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java — @@ -200,6 +214,17 @@ public void setTaskManagerMemory(int memoryMb) { public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; + + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + } — End diff – This code and the code above looks like exactly the same. If this is duplicate code, it should be extracted into one method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3931#discussion_r117199375

          — Diff: docs/setup/yarn_setup.md —
          @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env

          Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!

          +### User jars & Classpath
          +
          +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter.
          +
          +When setting this to `DISABLED` Flink will include the jar in the user classpath instead.
          +
          +The user-jars position in the class path can be controlled by setting the parameter to one of the following:
          +
          +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order.
          +- `FIRST`: Adds the jar to the beginning of the system class path.
          +- `LAST`: Adds the jar to the end of the system class path.
          +
          — End diff –

          Ideally, you put the configuration property also into the `setup/config.md` page, where all config options are listed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3931#discussion_r117199375 — Diff: docs/setup/yarn_setup.md — @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-job-jar` parameter. + +When setting this to `DISABLED` Flink will include the jar in the user classpath instead. + +The user-jars position in the class path can be controlled by setting the parameter to one of the following: + +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order. +- `FIRST`: Adds the jar to the beginning of the system class path. +- `LAST`: Adds the jar to the end of the system class path. + — End diff – Ideally, you put the configuration property also into the `setup/config.md` page, where all config options are listed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3931#discussion_r117199176

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java —
          @@ -665,58 +685,52 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient
          1));
          }

          + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
          + YarnConfigOptions.UserJarInclusion userJarInclusion;
          + try

          { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + }

          catch (IllegalArgumentException e) {
          + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).",
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
          + configuredUserJarInclusion,
          + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
          — End diff –

          This also seems very similar to the other duplicates.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3931#discussion_r117199176 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java — @@ -665,58 +685,52 @@ public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient 1)); } + String configuredUserJarInclusion = flinkConfiguration.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + YarnConfigOptions.UserJarInclusion userJarInclusion; + try { + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), + configuredUserJarInclusion, + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + userJarInclusion = YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); — End diff – This also seems very similar to the other duplicates.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3931

          FLINK-6031[yarn] Add config parameter for user-jar inclusion in cla…

          This PR adds a config parameter to control how user-jars are being handled in regards ot the system class path for per-job yarn clusters.

          The parameter allows:

          • to disable the inclusion in the system classpath and use the user classpath instead ("DISABLE")
          • prepend the user jars to the system class path ("FIRST")
          • append the user jars to the system class path ("LAST")
          • (default) add the user jars to the system class path based on the lexicographic order ("ORDER")

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 6031_yarn_userjars

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3931.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3931



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3931 FLINK-6031 [yarn] Add config parameter for user-jar inclusion in cla… This PR adds a config parameter to control how user-jars are being handled in regards ot the system class path for per-job yarn clusters. The parameter allows: to disable the inclusion in the system classpath and use the user classpath instead ("DISABLE") prepend the user jars to the system class path ("FIRST") append the user jars to the system class path ("LAST") (default) add the user jars to the system class path based on the lexicographic order ("ORDER") You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6031_yarn_userjars Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3931
          Hide
          rmetzger Robert Metzger added a comment -

          I responded on the BEAM issue.

          Back to this one:
          From the ML discussion, it seems that we need to have the following three options for controlling the user jar inclusion:

          • "user code first" or "user code last" in the classpath
          • no user code in the classpath
          Show
          rmetzger Robert Metzger added a comment - I responded on the BEAM issue. Back to this one: From the ML discussion, it seems that we need to have the following three options for controlling the user jar inclusion: "user code first" or "user code last" in the classpath no user code in the classpath
          Hide
          aljoscha Aljoscha Krettek added a comment - - edited

          Robert Metzger do you think this issue in the Beam Flink Runner could be cause by this: BEAM-1640?

          Show
          aljoscha Aljoscha Krettek added a comment - - edited Robert Metzger do you think this issue in the Beam Flink Runner could be cause by this: BEAM-1640 ?

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development