Uploaded image for project: 'Livy'
  1. Livy
  2. LIVY-620

Spark batch session always ends with success when configuration is master yarn and deploy-mode client

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.5.0
    • 0.7.0
    • Batch
    • None

    Description

      In AWS emr-5.23.0 with Livy 0.5.0 and the following configuration in /etc/livy/conf/livy.conf:

      livy.spark.master                yarn
      livy.spark.deploy-mode           client
      

      Batch session always ends with success because yarn always ends with status Succeeded. Even if spark fails for some reason (exceptions or whatever) batch session ends with success.
      Not sure, but the issue about yarn always ending with success when client deploy-mode might be related to this Jira (see linked comment): https://issues.apache.org/jira/browse/SPARK-11058?focusedCommentId=16052520&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16052520

      When client deploy-mode and having spark errors yarn always ends with status Succeeded but the process launched by Livy (the one running org.apache.spark.deploy.SparkSubmit) is killed and exits with no 0 return code. So even if in this case yarn always ends with success livy can find out if it ended with error and end with error itself.

      I have already implemented a patch (in master branch) that could fix this issue:

      PR: https://github.com/apache/incubator-livy/pull/192

      diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
      index 4b27058..c215a8e 100644
      --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
      +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
      @@ -93,6 +93,7 @@ object BatchSession extends Logging {
       
             val file = resolveURIs(Seq(request.file), livyConf)(0)
             val sparkSubmit = builder.start(Some(file), request.args)
       
             Utils.startDaemonThread(s"batch-session-process-$id") {
               childProcesses.incrementAndGet()
      @@ -101,6 +102,7 @@ object BatchSession extends Logging {
                   case 0 =>
                   case exitCode =>
                     warn(s"spark-submit exited with code $exitCode")
      +              s.stateChanged(SparkApp.State.KILLED)
                 }
               } finally {
                 childProcesses.decrementAndGet()
      @@ -182,6 +184,14 @@ class BatchSession(
         override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
           synchronized {
             debug(s"$this state changed from $oldState to $newState")
      +      if (_state != SessionState.Dead()) {
      +        stateChanged(newState)
      +      }
      +    }
      +  }
      +
      +  private def stateChanged(newState: SparkApp.State): Unit = {
      +    synchronized {
             newState match {
               case SparkApp.State.RUNNING =>
                 _state = SessionState.Running
      

      Attachments

        Activity

          People

            gumartinm Gustavo Martin
            gumartinm Gustavo Martin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 2.5h
                2.5h