Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16306

Validate YARN session state before job submission

    XMLWordPrintableJSON

Details

    Description

      To better handle not properly stopped yarn sessions, state of the session should be validated before job submission.
      Currently if execution.target: yarn-session is set in conf/flink-conf.yaml and the hidden YARN property file /tmp/.yarn-properties-root is present, FlinkSessionCli tries to submit the job regardless of the session’s state.
      Apparently, the property file cannot get cleaned up automatically when the session is killed e.g. via yarn app -kill <appID> and this behaviour is pointed out in the logs upon running via yarn-session.sh, but the contained application state could be checked before submitting to it. The current behaviour feels inconsistent with the scenario when the YARN property file actually does get cleaned up e.g. by manually deleting the file, in which case a per-job cluster is spun up before submitting to it.

      Replication steps:
      • start flink yarn session via ./bin/yarn-session.sh -d, this writes the application id to /tmp/.yarn-properties-root
      • set execution.target: yarn-session in /etc/flink/conf/flink-conf.yaml
      • kill session via yarn app -kill <appID>
      • try to submit job, e.g.: flink run -d -p 2 examples/streaming/WordCount.jar

      The logs clearly state that the FlinkYarnSessionCli tries to submit the job to the killed application:

      20/02/26 13:34:26 ERROR yarn.YarnClusterDescriptor: The application application_1582646904843_0021 doesn't run anymore. It has previously completed with final status: KILLED
      ...
      20/02/26 13:34:26 ERROR cli.CliFrontend: Error while running the command.
      org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Couldn't retrieve Yarn cluster
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
      	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
      	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:709)
      	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:258)
      	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:940)
      	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1014)
      	at java.base/java.security.AccessController.doPrivileged(Native Method)
      	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
      	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1014)
      Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve Yarn cluster
      	at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:365)
      	at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:122)
      	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1750)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1637)
      	at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:96)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
      	... 11 more
      Caused by: java.lang.RuntimeException: The Yarn application application_1582646904843_0021 doesn't run anymore.
      	at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:352)
      	... 23 more
      

      If at this point the property file gets deleted e.g. by simply running rm -f /tmp/.yarn-properties-root and the job gets resubmitted, a per-job cluster gets spun up. This behaviour could be achieved without deleting the outdated property file.

      CC: gyfora

      Attachments

        Activity

          People

            Unassigned Unassigned
            dmagyar Daniel Laszlo Magyar
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: