Description
In AWS emr-5.23.0 with Livy 0.5.0 and the following configuration in /etc/livy/conf/livy.conf:
livy.spark.master yarn livy.spark.deploy-mode client
Batch session always ends with success because yarn always ends with status Succeeded. Even if spark fails for some reason (exceptions or whatever) batch session ends with success.
Not sure, but the issue about yarn always ending with success when client deploy-mode might be related to this Jira (see linked comment): https://issues.apache.org/jira/browse/SPARK-11058?focusedCommentId=16052520&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16052520
When client deploy-mode and having spark errors yarn always ends with status Succeeded but the process launched by Livy (the one running org.apache.spark.deploy.SparkSubmit) is killed and exits with no 0 return code. So even if in this case yarn always ends with success livy can find out if it ended with error and end with error itself.
I have already implemented a patch (in master branch) that could fix this issue:
PR: https://github.com/apache/incubator-livy/pull/192
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 4b27058..c215a8e 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -93,6 +93,7 @@ object BatchSession extends Logging { val file = resolveURIs(Seq(request.file), livyConf)(0) val sparkSubmit = builder.start(Some(file), request.args) Utils.startDaemonThread(s"batch-session-process-$id") { childProcesses.incrementAndGet() @@ -101,6 +102,7 @@ object BatchSession extends Logging { case 0 => case exitCode => warn(s"spark-submit exited with code $exitCode") + s.stateChanged(SparkApp.State.KILLED) } } finally { childProcesses.decrementAndGet() @@ -182,6 +184,14 @@ class BatchSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this state changed from $oldState to $newState") + if (_state != SessionState.Dead()) { + stateChanged(newState) + } + } + } + + private def stateChanged(newState: SparkApp.State): Unit = { + synchronized { newState match { case SparkApp.State.RUNNING => _state = SessionState.Running