Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0, 1.4.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: YARN
    • Labels:
      None

      Description

      While testing Flink 1.3.0 RC1, I ran into the following issue on the JobManager.

      2017-05-19 14:41:38,030 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - New leader reachable under akka.tcp://flink@permanent-qa-cluster-i7c9.c.astral-sorter-757.internal:36528/user/jobmanager:6539dc04-d7fe-4f85-a0b6-09bfb0de8a58.
      2017-05-19 14:41:38,033 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#1602741108] - leader session 6539dc04-d7fe-4f85-a0b6-09bfb0de8a58
      2017-05-19 14:41:38,033 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 1
      2017-05-19 14:41:38,781 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_1494870922226_0061_02_000002 - Remaining pending container requests: 0
      2017-05-19 14:41:38,782 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ContainerInLaunch @ 1495204898782: Container: [ContainerId: container_1494870922226_0061_02_000002, NodeId: permanent-qa-cluster-d3iz.c.astral-sorter-757.internal:8041, NodeHttpAddress: permanent-qa-cluster-d3iz.c.astral-sorter-757.internal:8042, Resource: <memory:1024, vCores:1>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.240.0.32:8041 }, ] on host permanent-qa-cluster-d3iz.c.astral-sorter-757.internal
      2017-05-19 14:41:38,788 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : permanent-qa-cluster-d3iz.c.astral-sorter-757.internal:8041
      2017-05-19 14:41:44,284 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Container container_1494870922226_0061_02_000002 failed, with a TaskManager in launch or registration. Exit status: -1000
      2017-05-19 14:41:44,284 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics for container container_1494870922226_0061_02_000002 in state COMPLETE : exitStatus=-1000 diagnostics=File does not exist: hdfs://nameservice1/user/robert/.flink/application_1494870922226_0061/cf9287fe-ac75-4066-a648-91787d946890-taskmanager-conf.yaml
      java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/user/robert/.flink/application_1494870922226_0061/cf9287fe-ac75-4066-a648-91787d946890-taskmanager-conf.yaml
      	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
      	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
      	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
      	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
      	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:415)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
      	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
      	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      The problem is the following:

      • JobManager1 starts from a yarn-session.sh
      • Job1 gets submitted to JobManager1
      • JobManager1 dies
      • YARN starts a new JM: JobManager2
      • in the meantime, errors on the yarn-session.sh appear, shutting down the session. This includes deleting the yarn staging directory in HDFS.
      • JobManager2 is unable to start a new Taskmanager because files in staging got deleted by the client.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via fd9146295b7af6ce1a2976a52a71add1c5a37f99.
          Fixed for 1.3 via 2e138f1009b30c91aa73a704cc175f9e61ca52ea.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via fd9146295b7af6ce1a2976a52a71add1c5a37f99. Fixed for 1.3 via 2e138f1009b30c91aa73a704cc175f9e61ca52ea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3981

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3981
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3981

          Merging this. My reviews on this change is covered in #3981.
          @tedyu I'll address your comments while merging, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3981 Merging this. My reviews on this change is covered in #3981. @tedyu I'll address your comments while merging, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tedyu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3981#discussion_r118304278

          — Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala —
          @@ -89,5 +92,37 @@ class YarnJobManager(
          flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
          TimeUnit.SECONDS)

          + val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
          +
          override val jobPollingInterval = YARN_HEARTBEAT_DELAY
          +
          + override def handleMessage: Receive =

          { + handleYarnShutdown orElse super.handleMessage + }

          +
          + def handleYarnShutdown: Receive = {
          + case msg:StopCluster =>
          + super.handleMessage(msg)
          +
          + // do global cleanup if the yarn files path has been set
          + yarnFilesPath match {
          + case Some(filePath) =>
          + log.info(s"Deleting yarn application files under $filePath.")
          +
          + val path = new Path(filePath)
          +
          + try {
          + val fs = path.getFileSystem
          + fs.delete(path, true)
          — End diff –

          Please check the return value from delete() call

          Show
          githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3981#discussion_r118304278 — Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala — @@ -89,5 +92,37 @@ class YarnJobManager( flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), TimeUnit.SECONDS) + val yarnFilesPath: Option [String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES)) + override val jobPollingInterval = YARN_HEARTBEAT_DELAY + + override def handleMessage: Receive = { + handleYarnShutdown orElse super.handleMessage + } + + def handleYarnShutdown: Receive = { + case msg:StopCluster => + super.handleMessage(msg) + + // do global cleanup if the yarn files path has been set + yarnFilesPath match { + case Some(filePath) => + log.info(s"Deleting yarn application files under $filePath.") + + val path = new Path(filePath) + + try { + val fs = path.getFileSystem + fs.delete(path, true) — End diff – Please check the return value from delete() call
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3981

          FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files

          Before the YarnClusterClient decided when to delete the Yarn application files.
          This is problematic because the client does not know whether a Yarn application
          is being restarted or terminated. Due to this the files where always deleted. This
          prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
          Flink's HA capabilities.

          The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
          if it receives a StopCluster message. That way, we can be sure that the yarn files
          are deleted only iff the cluster is intended to be shut down.

          cc @rmetzger

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixYarnSession

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3981.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3981


          commit 72ce39a1752cc19669f003b70cc2708852a06ac5
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-24T15:59:51Z

          FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files

          Before the YarnClusterClient decided when to delete the Yarn application files.
          This is problematic because the client does not know whether a Yarn application
          is being restarted or terminated. Due to this the files where always deleted. This
          prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
          Flink's HA capabilities.

          The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
          if it receives a StopCluster message. That way, we can be sure that the yarn files
          are deleted only iff the cluster is intended to be shut down.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3981 FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files Before the YarnClusterClient decided when to delete the Yarn application files. This is problematic because the client does not know whether a Yarn application is being restarted or terminated. Due to this the files where always deleted. This prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting Flink's HA capabilities. The PR changes the behaviour such that the YarnJobManager deletes the Yarn files if it receives a StopCluster message. That way, we can be sure that the yarn files are deleted only iff the cluster is intended to be shut down. cc @rmetzger You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixYarnSession Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3981.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3981 commit 72ce39a1752cc19669f003b70cc2708852a06ac5 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-24T15:59:51Z FLINK-6646 [yarn] Let YarnJobManager delete Yarn application files Before the YarnClusterClient decided when to delete the Yarn application files. This is problematic because the client does not know whether a Yarn application is being restarted or terminated. Due to this the files where always deleted. This prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting Flink's HA capabilities. The PR changes the behaviour such that the YarnJobManager deletes the Yarn files if it receives a StopCluster message. That way, we can be sure that the yarn files are deleted only iff the cluster is intended to be shut down.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Actually, I have to correct myself wrt to the severity of the problem. Given that the ClusterClient deletes crucial files which are needed to recover an ApplicationMaster failure which effectively thwarts Flink's HA capabilities, I would make this issue a blocker.

          Show
          till.rohrmann Till Rohrmann added a comment - Actually, I have to correct myself wrt to the severity of the problem. Given that the ClusterClient deletes crucial files which are needed to recover an ApplicationMaster failure which effectively thwarts Flink's HA capabilities, I would make this issue a blocker.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          This is actually not limited to the yarn-session but also applies to the yarn-cluster mode. The only reason why it didn't surface so far is that in the yarn-cluster mode we use a higher connection timeout 60 s compared to 10 s in the session mode.

          The underlying problem is, however, the wrong resource lifecycle management of the application files. In the current version, the ClusterClient decides when to delete the Flink cluster files even though this should be the responsibility of the YarnApplicationMaster. The YarnApplicationMaster should decide when the Yarn application has terminated and when the files can be deleted. This wrong separation of concerns also causes why the uploaded application files are never deleted in case of a detached execution.

          The correction of the resource lifecycle management is actually a bigger task and should be properly implemented with the upcoming Flip-6 work. Therefore, I propose to mitigate the current problem by increasing the connection timeout also for the yarn session similar to the yarn-cluster mode.

          Show
          till.rohrmann Till Rohrmann added a comment - This is actually not limited to the yarn-session but also applies to the yarn-cluster mode. The only reason why it didn't surface so far is that in the yarn-cluster mode we use a higher connection timeout 60 s compared to 10 s in the session mode. The underlying problem is, however, the wrong resource lifecycle management of the application files. In the current version, the ClusterClient decides when to delete the Flink cluster files even though this should be the responsibility of the YarnApplicationMaster . The YarnApplicationMaster should decide when the Yarn application has terminated and when the files can be deleted. This wrong separation of concerns also causes why the uploaded application files are never deleted in case of a detached execution. The correction of the resource lifecycle management is actually a bigger task and should be properly implemented with the upcoming Flip-6 work. Therefore, I propose to mitigate the current problem by increasing the connection timeout also for the yarn session similar to the yarn-cluster mode.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          This issue was already present in 1.2. Thus, strictly spoken this is not a regression. I try to fix it asap. Maybe it still makes it into the release.

          Show
          till.rohrmann Till Rohrmann added a comment - This issue was already present in 1.2 . Thus, strictly spoken this is not a regression. I try to fix it asap. Maybe it still makes it into the release.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Robert Metzger shouldn't this be a blocker?

          Show
          aljoscha Aljoscha Krettek added a comment - Robert Metzger shouldn't this be a blocker?

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development