commit 7312f8c72200fd9790948e732469550b38da700f Author: Sahil Takiar Date: Mon Mar 5 22:57:16 2018 -0600 HIVE-18831: Differentiate errors that are thrown by Spark tasks diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 56dbe78033..9cd9948bca 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1659,9 +1659,11 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ groupby3_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ - groupby_grouping_sets7.q,\ - spark_job_max_tasks.q,\ - spark_stage_max_tasks.q + groupby_grouping_sets7.q + +spark.only.query.negative.files=spark_job_max_tasks.q,\ + spark_stage_max_tasks.q,\ + spark_task_failure.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7034c38b90..b6155b60b8 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -334,9 +334,8 @@ public NegativeCliConfig() { setQueryDir("ql/src/test/queries/clientnegative"); excludesFrom(testConfigProps, "minimr.query.negative.files"); + excludesFrom(testConfigProps, "spark.only.query.negative.files"); excludeQuery("authorization_uri_import.q"); - excludeQuery("spark_job_max_tasks.q"); - excludeQuery("spark_stage_max_tasks.q"); setResultsDir("ql/src/test/results/clientnegative"); setLogDir("itests/qtest/target/qfile-results/clientnegative"); @@ -571,6 +570,7 @@ public SparkNegativeCliConfig() { setQueryDir("ql/src/test/queries/clientnegative"); includesFrom(testConfigProps, "spark.query.negative.files"); + includesFrom(testConfigProps, "spark.only.query.negative.files"); setResultsDir("ql/src/test/results/clientnegative/spark"); setLogDir("itests/qtest-spark/target/qfile-results/clientnegative/spark"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1faa50a86a..120a3054c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -580,8 +580,8 @@ //========================== 40000 range starts here ========================// - SPARK_JOB_RUNTIME_ERROR(40001, - "Spark job failed during runtime. Please check stacktrace for the root cause.") + SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true) ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index a8d851fd81..5fee133953 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -602,7 +602,7 @@ public String getJobID() { public void shutdown() { } - Throwable getException() { + public Throwable getException() { return exception; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index c2408844f5..8ce7dc90ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,14 +26,19 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import com.google.common.base.Strings; -import com.google.common.base.Throwables; +import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -71,7 +76,6 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; -import org.apache.spark.SparkException; public class SparkTask extends Task { private static final String CLASS_NAME = SparkTask.class.getName(); @@ -152,7 +156,8 @@ public int execute(DriverContext driverContext) { // Get the final state of the Spark job and parses its job info SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - getSparkJobInfo(sparkJobStatus, rc); + getSparkJobInfo(sparkJobStatus); + setSparkException(sparkJobStatus, rc); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); @@ -449,7 +454,7 @@ private void printConfigInfo() throws IOException { return counters; } - private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { + private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { try { stageIds = new ArrayList(); int[] ids = sparkJobStatus.getStageIds(); @@ -474,39 +479,53 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { succeededTaskCount = sumComplete; totalTaskCount = sumTotal; failedTaskCount = sumFailed; - if (rc != 0) { - Throwable error = sparkJobStatus.getError(); - if (error != null) { - if ((error instanceof InterruptedException) || - (error instanceof HiveException && - error.getCause() instanceof InterruptedException)) { - LOG.info("Killing Spark job since query was interrupted"); - killJob(); - } - HiveException he; - if (isOOMError(error)) { - he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); - } else { - he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); - } - setException(he); - } - } } catch (Exception e) { LOG.error("Failed to get Spark job information", e); } } - private boolean isOOMError(Throwable error) { - while (error != null) { - if (error instanceof OutOfMemoryError) { - return true; - } else if (error instanceof SparkException) { - String sts = Throwables.getStackTraceAsString(error); - return sts.contains("Container killed by YARN for exceeding memory limits"); + @VisibleForTesting + void setSparkException(SparkJobStatus sparkJobStatus, int rc) { + if (rc != 0) { + Throwable error = sparkJobStatus.getMonitorError(); + if (error != null) { + if ((error instanceof InterruptedException) || + (error instanceof HiveException && + error.getCause() instanceof InterruptedException)) { + LOG.info("Killing Spark job since query was interrupted"); + killJob(); + } else if (error instanceof HiveException && getException() == null) { + setException(error); + } + } + + SparkJobExceptionWrapper sparkJobException = sparkJobStatus.getSparkJobException(); + if (sparkJobException != null) { + HiveException he; + if (isOOMError(sparkJobException)) { + he = new HiveException(sparkJobException.getFullStackTrace(), ErrorMsg.SPARK_RUNTIME_OOM); + } else if (isTaskFailure(sparkJobException)) { + he = new HiveException(sparkJobException.getFullStackTrace(), + ErrorMsg.SPARK_TASK_RUNTIME_ERROR, + sparkJobException.getRootCause()); + } else { + he = new HiveException(sparkJobException.getFullStackTrace(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR, + sparkJobException.getRootCause()); + } + // Prefer to propagate errors from the Spark job rather than the monitor, as errors from + // the Spark job are likely to be more relevant + setException(he); } - error = error.getCause(); } - return false; + } + + private boolean isTaskFailure(SparkJobExceptionWrapper clientException) { + Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); + return taskFailedPattern.matcher(clientException.getFullStackTrace()).find(); + } + + private boolean isOOMError(SparkJobExceptionWrapper error) { + return error.getFullStackTrace().contains("Container killed by YARN for exceeding memory limits"); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 05253157d0..4ce9f53a37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -128,7 +128,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; - sparkJobStatus.setError(e); + sparkJobStatus.setMonitorError(e); } finally { if (done) { break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 3467ae4048..3d6a00f938 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -77,7 +77,7 @@ public int startMonitor() { HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, Long.toString(timeCount)); console.printError(he.getMessage()); - sparkJobStatus.setError(he); + sparkJobStatus.setMonitorError(he); running = false; done = true; rc = 2; @@ -147,29 +147,8 @@ public int startMonitor() { done = true; break; case FAILED: - String detail = sparkJobStatus.getError().getMessage(); - StringBuilder errBuilder = new StringBuilder(); - errBuilder.append("Job failed with "); - if (detail == null) { - errBuilder.append("UNKNOWN reason"); - } else { - // We SerDe the Throwable as String, parse it for the root cause - final String CAUSE_CAPTION = "Caused by: "; - int index = detail.lastIndexOf(CAUSE_CAPTION); - if (index != -1) { - String rootCause = detail.substring(index + CAUSE_CAPTION.length()); - index = rootCause.indexOf(System.getProperty("line.separator")); - if (index != -1) { - errBuilder.append(rootCause.substring(0, index)); - } else { - errBuilder.append(rootCause); - } - } else { - errBuilder.append(detail); - } - detail = System.getProperty("line.separator") + detail; - } - console.printError(errBuilder.toString(), detail); + LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed\n" + sparkJobStatus + .getSparkJobException().getFullStackTrace()); running = false; done = true; rc = 3; @@ -203,7 +182,7 @@ public int startMonitor() { } rc = 1; done = true; - sparkJobStatus.setError(finalException); + sparkJobStatus.setMonitorError(finalException); } finally { if (done) { break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 8474afcc2a..df2812ab6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; @@ -47,7 +48,9 @@ void cleanup(); - Throwable getError(); + Throwable getMonitorError(); - void setError(Throwable e); + void setMonitorError(Throwable e); + + SparkJobExceptionWrapper getSparkJobException(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 03f8a0b680..e0954e9184 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Throwables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hive.spark.client.MetricsCollection; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.counter.SparkCounters; @@ -181,25 +184,28 @@ public void cleanup() { } @Override - public Throwable getError() { - if (error != null) { - return error; - } + public Throwable getMonitorError() { + return error; + } + + @Override + public void setMonitorError(Throwable e) { + this.error = e; + } + + @Override + public SparkJobExceptionWrapper getSparkJobException() { if (future.isDone()) { try { future.get(); } catch (Throwable e) { - return e; + return new SparkJobExceptionWrapper(Throwables.getRootCause(e).getMessage(), Throwables + .getStackTraceAsString(e)); } } return null; } - @Override - public void setError(Throwable e) { - this.error = e; - } - private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ff969e048f..5db2c1d6a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -35,6 +35,7 @@ import org.apache.hive.spark.client.JobContext; import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; @@ -165,18 +166,20 @@ public void cleanup() { } @Override - public Throwable getError() { - if (error != null) { - return error; - } - return jobHandle.getError(); + public Throwable getMonitorError() { + return error; } @Override - public void setError(Throwable e) { + public void setMonitorError(Throwable e) { this.error = e; } + @Override + public SparkJobExceptionWrapper getSparkJobException() { + return jobHandle.getError(); + } + /** * Indicates whether the remote context is active. SparkJobMonitor can use this to decide whether * to stop monitoring. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java index bfbbe3a7d7..b6d66ae4a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java @@ -71,6 +71,14 @@ public HiveException(ErrorMsg errorMsg) { this(null, null, errorMsg, new String[0]); } + public HiveException(String message, ErrorMsg errorMsg) { + this(null, message, errorMsg); + } + + public HiveException(String message, ErrorMsg errorMsg, String... msgArgs) { + this(null, message, errorMsg, msgArgs); + } + /** * This is the recommended constructor to use since it helps use * canonical messages throughout and propagate remote errors. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 435c6b606b..1044bb511a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -17,35 +17,50 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; + +import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.JobHandle.State; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; + +import org.apache.spark.SparkException; + import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; public class TestSparkTask { @Test public void sparkTask_updates_Metrics() throws IOException { - Metrics mockMetrics = Mockito.mock(Metrics.class); + Metrics mockMetrics = mock(Metrics.class); SparkTask sparkTask = new SparkTask(); sparkTask.updateTaskMetrics(mockMetrics); @@ -88,7 +103,7 @@ public void removeEmptySparkTask() { @Test public void testRemoteSparkCancel() { - RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class); + RemoteSparkJobStatus jobSts = mock(RemoteSparkJobStatus.class); when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); when(jobSts.isRemoteActive()).thenReturn(true); HiveConf hiveConf = new HiveConf(); @@ -96,6 +111,111 @@ public void testRemoteSparkCancel() { Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } + @Test + public void testSetSparkExceptionWithJobError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + + SparkException se = new SparkException("Job aborted due to stage failure: Not a task or OOM error"); + ExecutionException ee = new ExecutionException("Exception thrown by job", se); + + SparkJobExceptionWrapper exceptionWrapper = new SparkJobExceptionWrapper(Throwables + .getRootCause(ee).getMessage(), Throwables.getStackTraceAsString(ee)); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(exceptionWrapper); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + Assert.assertTrue(sparkTask.getException().getMessage().contains("Not a task or OOM error")); + } + + @Test + public void testSetSparkExceptionWithTimeoutError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new HiveException(ErrorMsg + .SPARK_JOB_MONITOR_TIMEOUT, Long.toString(60))); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT); + Assert.assertTrue(sparkTask.getException().getMessage().contains("60s")); + } + + @Test + public void testSetSparkExceptionWithOOMError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + + Throwable jobError = new Throwable("Caused by: org.apache.spark.SparkException: Container " + + "killed by YARN for exceeding memory limits"); + SparkJobExceptionWrapper exceptionWrapper = new SparkJobExceptionWrapper(Throwables + .getRootCause(jobError).getMessage(), Throwables.getStackTraceAsString(jobError)); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(exceptionWrapper); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_RUNTIME_OOM); + } + + @Test + public void testSparkExceptionAndMonitorError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new RuntimeException()); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(new SparkJobExceptionWrapper("", "")); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + } + + @Test + public void testHandleInterruptedException() throws Exception { + HiveConf hiveConf = new HiveConf(); + + SparkTask sparkTask = new SparkTask(); + sparkTask.setWork(mock(SparkWork.class)); + + DriverContext mockDriverContext = mock(DriverContext.class); + + QueryState mockQueryState = mock(QueryState.class); + when(mockQueryState.getConf()).thenReturn(hiveConf); + + sparkTask.initialize(mockQueryState, null, mockDriverContext, null); + + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new InterruptedException()); + + SparkSession mockSparkSession = mock(SparkSession.class); + SparkJobRef mockSparkJobRef = mock(SparkJobRef.class); + + when(mockSparkJobRef.monitorJob()).thenReturn(2); + when(mockSparkJobRef.getSparkJobStatus()).thenReturn(mockSparkJobStatus); + when(mockSparkSession.submit(any(), any())).thenReturn(mockSparkJobRef); + + SessionState.start(hiveConf); + SessionState.get().setSparkSession(mockSparkSession); + + sparkTask.execute(mockDriverContext); + + verify(mockSparkJobRef, atLeastOnce()).cancelJob(); + + when(mockSparkJobStatus.getMonitorError()).thenReturn( + new HiveException(new InterruptedException())); + + sparkTask.execute(mockDriverContext); + + verify(mockSparkJobRef, atLeastOnce()).cancelJob(); + } private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q b/ql/src/test/queries/clientnegative/spark_task_failure.q new file mode 100644 index 0000000000..7bb8c503eb --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_task_failure.q @@ -0,0 +1,9 @@ +ADD FILE ../../data/scripts/error_script; + +EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out new file mode 100644 index 0000000000..1e680024a7 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out @@ -0,0 +1,53 @@ +PREHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Transform Operator + command: error_script + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to task failures: [Error 20003]: An error occurred when trying to close the Operator running your custom script. diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 4dbc4908bf..6e15333e00 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -101,13 +101,14 @@ final String id; final T result; - final String error; + final SparkJobExceptionWrapper error; final SparkCounters sparkCounters; JobResult(String id, T result, Throwable error, SparkCounters sparkCounters) { this.id = id; this.result = result; - this.error = error != null ? Throwables.getStackTraceAsString(error) : null; + this.error = error != null ? new SparkJobExceptionWrapper(Throwables.getRootCause(error) + .getMessage(), Throwables.getStackTraceAsString(error)) : null; this.sparkCounters = sparkCounters; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index dffd60c2fa..46d8501be5 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -63,7 +63,7 @@ /** * Return the error if the job has failed. */ - Throwable getError(); + SparkJobExceptionWrapper getError(); /** * The current state of the submitted job. diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 2881252b0e..4127b2d34a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -42,6 +42,7 @@ private final List> listeners; private volatile State state; private volatile SparkCounters sparkCounters; + private volatile SparkJobExceptionWrapper jobError; JobHandleImpl(SparkClientImpl client, Promise promise, String jobId, List> listeners) { @@ -129,8 +130,8 @@ public State getState() { } @Override - public Throwable getError() { - return promise.cause(); + public SparkJobExceptionWrapper getError() { + return this.jobError; } public void setSparkCounters(SparkCounters sparkCounters) { @@ -146,10 +147,10 @@ void setSuccess(Object result) { } } - void setFailure(Throwable error) { + void setFailure(SparkJobExceptionWrapper jobError) { // The synchronization here is not necessary, but tests depend on it. synchronized (listeners) { - promise.setFailure(error); + this.jobError = jobError; changeState(State.FAILED); } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 665ed92898..acef3f41f2 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -38,7 +38,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.io.PrintStream; import java.io.Serializable; import java.io.Writer; import java.net.URI; @@ -64,7 +63,6 @@ import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer; import org.apache.spark.SparkContext; -import org.apache.spark.SparkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -592,7 +590,7 @@ private void handle(ChannelHandlerContext ctx, JobResult msg) { if (handle != null) { LOG.info("Received result for {}", msg.id); handle.setSparkCounters(msg.sparkCounters); - Throwable error = msg.error != null ? new SparkException(msg.error) : null; + SparkJobExceptionWrapper error = msg.error; if (error == null) { handle.setSuccess(msg.result); } else { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java new file mode 100644 index 0000000000..fee9f1126a --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +public class SparkJobExceptionWrapper { + + private final String rootCause; + private final String fullStackTrace; + + public SparkJobExceptionWrapper() { + this(null, null); + } + + public SparkJobExceptionWrapper(String rootCause, String fullStackTrace) { + this.rootCause = rootCause; + this.fullStackTrace = fullStackTrace; + } + + public String getRootCause() { + return rootCause; + } + + public String getFullStackTrace() { + return fullStackTrace; + } +}