diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index b6d128b..a678228 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -128,6 +128,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; + sparkJobStatus.setError(e); } finally { if (done) { break; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 77038fc..ef3d8f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -158,6 +158,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; + sparkJobStatus.setError(e); } finally { if (done) { break; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 72ce439..1ebb1ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -46,4 +46,6 @@ void cleanup(); Throwable getError(); + + void setError(Throwable e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index a94d4ed..ab8a9cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -49,6 +49,7 @@ private SparkCounters sparkCounters; private JavaFutureAction future; private Set cachedRDDIds; + private Throwable error; public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, @@ -59,6 +60,7 @@ public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, this.sparkCounters = sparkCounters; this.cachedRDDIds = cachedRDDIds; this.future = future; + this.error = null; } @Override @@ -161,6 +163,9 @@ public void cleanup() { @Override public Throwable getError() { + if (error != null) { + return error; + } if (future.isDone()) { try { future.get(); @@ -171,6 +176,11 @@ public Throwable getError() { return null; } + @Override + public void setError(Throwable e) { + this.error = e; + } + private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e87a21a..0e3e541 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -50,11 +50,13 @@ private static final Logger LOG = LoggerFactory.getLogger(RemoteSparkJobStatus.class.getName()); private final SparkClient sparkClient; private final JobHandle jobHandle; + private Throwable error; private final transient long sparkClientTimeoutInSeconds; public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle, long timeoutInSeconds) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; + this.error = null; this.sparkClientTimeoutInSeconds = timeoutInSeconds; } @@ -138,9 +140,17 @@ public void cleanup() { @Override public Throwable getError() { + if (error != null) { + return error; + } return jobHandle.getError(); } + @Override + public void setError(Throwable e) { + this.error = e; + } + private SparkJobInfo getSparkJobInfo() throws HiveException { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null;