diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index f836065..87d80a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -114,7 +114,7 @@ public int execute(DriverContext driverContext) { this.jobID = jobRef.getSparkJobStatus().getAppID(); rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - getSparkJobInfo(sparkJobStatus); + getSparkJobInfo(sparkJobStatus, rc); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { @@ -139,6 +139,7 @@ public int execute(DriverContext driverContext) { // org.apache.commons.lang.StringUtils console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); LOG.error(msg, e); + setException(e); rc = 1; } finally { startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING); @@ -196,6 +197,7 @@ private int close(int rc) { String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'"; console.printError(mesg, "\n" + StringUtils.stringifyException(e)); + setException(e); } } return rc; @@ -330,7 +332,7 @@ private void printConfigInfo() throws IOException { return counters; } - private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { + private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { try { stageIds = new ArrayList(); int[] ids = sparkJobStatus.getStageIds(); @@ -355,6 +357,12 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { succeededTaskCount = sumComplete; totalTaskCount = sumTotal; failedTaskCount = sumFailed; + if (rc != 0) { + Throwable error = sparkJobStatus.getError(); + if (error != null) { + setException(error); + } + } } catch (Exception e) { LOG.error("Failed to get Spark job information", e); }