Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6708

Don't let the FlinkYarnSessionCli fail if it cannot retrieve the ClusterStatus

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.4.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: YARN
    • Labels:
      None

      Description

      The FlinkYarnSessionCli should not fail if it cannot retrieve the GetClusterStatusResponse. This would harden Flink's Yarn session.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Thanks for the fix Till!

          Fixed for master via fd9146295b7af6ce1a2976a52a71add1c5a37f99.
          Fixed for 1.3 via 2e138f1009b30c91aa73a704cc175f9e61ca52ea.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the fix Till! Fixed for master via fd9146295b7af6ce1a2976a52a71add1c5a37f99. Fixed for 1.3 via 2e138f1009b30c91aa73a704cc175f9e61ca52ea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3982

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3982
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3982

          LGTM! I'll rebase and merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3982 LGTM! I'll rebase and merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3982

          Verified that Flink YARN sessions + HA works nicely now.

          1. Started non-detached YARN session
          2. Submit job
          3. Kill JobManager
          4. YARN session correctly messages disassociation with first JobManager
          5. Failure of cluster status retrieval is correctly ignored, and session does not abort.
          6. New JobManager correctly registered, TMs correctly re-connect.
          7. Job remains intact.

          Logs are sane and nice (minus my comment on the "retrying" working).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3982 Verified that Flink YARN sessions + HA works nicely now. 1. Started non-detached YARN session 2. Submit job 3. Kill JobManager 4. YARN session correctly messages disassociation with first JobManager 5. Failure of cluster status retrieval is correctly ignored, and session does not abort. 6. New JobManager correctly registered, TMs correctly re-connect. 7. Job remains intact. Logs are sane and nice (minus my comment on the "retrying" working).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3982#discussion_r118499283

          — Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala —
          @@ -89,5 +92,37 @@ class YarnJobManager(
          flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
          TimeUnit.SECONDS)

          + val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
          — End diff –

          Just to be sure that my understanding is correct:
          this should always exist, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3982#discussion_r118499283 — Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala — @@ -89,5 +92,37 @@ class YarnJobManager( flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), TimeUnit.SECONDS) + val yarnFilesPath: Option [String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES)) — End diff – Just to be sure that my understanding is correct: this should always exist, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3982#discussion_r118497398

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -413,14 +413,18 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read
          while (true) {
          // ------------------ check if there are updates by the cluster -----------

          • GetClusterStatusResponse status = yarnCluster.getClusterStatus();
          • LOG.debug("Received status message: {}", status);
            + try {
            + GetClusterStatusResponse status = yarnCluster.getClusterStatus();
            + LOG.debug("Received status message: {}", status);
          • if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
          • System.err.println("Number of connected TaskManagers changed to " +
            + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + status.numRegisteredTaskManagers() + ". " + - "Slots available: " + status.totalNumberOfSlots()); - numTaskmanagers = status.numRegisteredTaskManagers(); + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + }

            + } catch (Exception e) {
            + LOG.warn("Could not retrieve the current cluster status. Retrying...", e);

              • End diff –

          "Skipping" might be a better term here, because we aren't actually retrying to get the cluster status, just ignoring it for this loop attempt.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3982#discussion_r118497398 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -413,14 +413,18 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read while (true) { // ------------------ check if there are updates by the cluster ----------- GetClusterStatusResponse status = yarnCluster.getClusterStatus(); LOG.debug("Received status message: {}", status); + try { + GetClusterStatusResponse status = yarnCluster.getClusterStatus(); + LOG.debug("Received status message: {}", status); if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { System.err.println("Number of connected TaskManagers changed to " + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + status.numRegisteredTaskManagers() + ". " + - "Slots available: " + status.totalNumberOfSlots()); - numTaskmanagers = status.numRegisteredTaskManagers(); + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + } catch (Exception e) { + LOG.warn("Could not retrieve the current cluster status. Retrying...", e); End diff – "Skipping" might be a better term here, because we aren't actually retrying to get the cluster status, just ignoring it for this loop attempt.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3982

          FLINK-6708 [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

          This PR is based on #3981.

          This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
          retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
          an exception is thrown, the Cli won't fail but retry it the next time.

          cc @rmetzger.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink hardenYarnSession

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3982.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3982


          commit 72ce39a1752cc19669f003b70cc2708852a06ac5
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-24T15:59:51Z

          FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files

          Before the YarnClusterClient decided when to delete the Yarn application files.
          This is problematic because the client does not know whether a Yarn application
          is being restarted or terminated. Due to this the files where always deleted. This
          prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
          Flink's HA capabilities.

          The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
          if it receives a StopCluster message. That way, we can be sure that the yarn files
          are deleted only iff the cluster is intended to be shut down.

          commit 9227539f97e6dbc77c5367b8c555b4ba0b2ad06d
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-24T16:26:57Z

          FLINK-6708 [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

          This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
          retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
          an exception is thrown, the Cli won't fail but retry it the next time.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3982 FLINK-6708 [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions This PR is based on #3981. This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when retrieving the GetClusterStatusResponse. If no such response is retrieved and instead an exception is thrown, the Cli won't fail but retry it the next time. cc @rmetzger. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink hardenYarnSession Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3982.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3982 commit 72ce39a1752cc19669f003b70cc2708852a06ac5 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-24T15:59:51Z FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files Before the YarnClusterClient decided when to delete the Yarn application files. This is problematic because the client does not know whether a Yarn application is being restarted or terminated. Due to this the files where always deleted. This prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting Flink's HA capabilities. The PR changes the behaviour such that the YarnJobManager deletes the Yarn files if it receives a StopCluster message. That way, we can be sure that the yarn files are deleted only iff the cluster is intended to be shut down. commit 9227539f97e6dbc77c5367b8c555b4ba0b2ad06d Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-24T16:26:57Z FLINK-6708 [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when retrieving the GetClusterStatusResponse. If no such response is retrieved and instead an exception is thrown, the Cli won't fail but retry it the next time.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development