diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index b86e335..5d62596 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; @@ -43,6 +45,7 @@ public class LocalSparkJobStatus implements SparkJobStatus { private final JavaSparkContext sparkContext; + private static final Log LOG = LogFactory.getLog(LocalSparkJobStatus.class.getName()); private int jobId; // After SPARK-2321, we only use JobMetricsListener to get job metrics // TODO: remove it when the new API provides equivalent functionality @@ -69,16 +72,20 @@ public int getJobId() { @Override public JobExecutionStatus getState() { + SparkJobInfo sparkJobInfo = getJobInfo(); // For spark job with empty source data, it's not submitted actually, so we would never // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current // job state. - if (future.isDone()) { + if (sparkJobInfo == null && future.isDone()) { + try { + future.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + jobId, e); + return JobExecutionStatus.FAILED; + } return JobExecutionStatus.SUCCEEDED; - } else { - // SparkJobInfo may not be available yet - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? null : sparkJobInfo.status(); } + return sparkJobInfo == null ? null : sparkJobInfo.status(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index a8ac482..c336413 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -175,7 +175,15 @@ public SparkJobInfo call(JobContext jc) throws Exception { if (list != null && list.size() == 1) { JavaFutureAction futureAction = list.get(0); if (futureAction.isDone()) { - jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED); + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); } } }