diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 8cbee23..1abd2ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -104,7 +104,8 @@ public int execute(DriverContext driverContext) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); addToHistory(jobRef); - rc = jobRef.monitorJob(); + SparkJobRef.JobResult jobResult = jobRef.monitorJob(); + rc = jobResult.rc; SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (rc == 0) { sparkCounters = sparkJobStatus.getCounter(); @@ -118,6 +119,7 @@ public int execute(DriverContext driverContext) { } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. jobRef.cancelJob(); } + setException(jobResult.error); sparkJobStatus.cleanup(); } catch (Exception e) { String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index d109c6f..332a549 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -138,4 +138,8 @@ public int startMonitor() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); return rc; } + + public Throwable getError() { + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 6990e80..e76a2e4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; +import java.io.Serializable; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; @@ -32,12 +33,14 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private JobHandle jobHandler; private RemoteSparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; - public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { + public RemoteSparkJobMonitor(HiveConf hiveConf, JobHandle jobHandler, RemoteSparkJobStatus sparkJobStatus) { super(hiveConf); + this.jobHandler = jobHandler; this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; } @@ -110,7 +113,7 @@ public int startMonitor() { done = true; break; case FAILED: - console.printError("Status: Failed"); + console.printError("Status: Failed: " + jobHandler.getLastError()); running = false; done = true; rc = 3; @@ -152,4 +155,8 @@ private void printAppInfo() { } } } + +public Throwable getError() { + return jobHandler.getLastError(); +} } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index fcf5368..f8c27b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -25,5 +25,16 @@ public boolean cancelJob(); - public int monitorJob(); + public JobResult monitorJob(); + + public static class JobResult { + public int rc; + public Throwable error; + + public JobResult(int rc, Throwable error) { + this.rc = rc; + this.error = error; + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java index ce4d932..b90527b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java @@ -60,8 +60,10 @@ public boolean cancelJob() { } @Override - public int monitorJob() { + public JobResult monitorJob() { LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus); - return localSparkJobMonitor.startMonitor(); + int rc = localSparkJobMonitor.startMonitor(); + Throwable error = localSparkJobMonitor.getError(); + return new JobResult(rc, error); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java index 4c0993c..01cf00d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java @@ -55,8 +55,10 @@ public boolean cancelJob() { } @Override - public int monitorJob() { - RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus); - return remoteSparkJobMonitor.startMonitor(); + public JobResult monitorJob() { + RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobHandler, sparkJobStatus); + int rc = remoteSparkJobMonitor.startMonitor(); + Throwable error = remoteSparkJobMonitor.getError(); + return new JobResult(rc, error); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index 44aa255..fa2f594 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -67,6 +67,8 @@ * @param l The listener to add. */ void addListener(Listener l); + + Throwable getLastError(); /** * The current state of the submitted job. diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 17c8f40..ea3a069 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -234,4 +234,9 @@ protected void finalize() { } } + @Override + public Throwable getLastError() { + return promise.cause(); + } + }