commit 3c0c36fdf1d847142fd9ef1a01b36233e7c89d57 Author: Sahil Takiar Date: Mon Mar 5 22:57:16 2018 -0600 HIVE-18831: Differentiate errors that are thrown by Spark tasks diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 5985dcfab9..aa6a249a43 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1661,9 +1661,11 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ groupby3_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ - groupby_grouping_sets7.q,\ - spark_job_max_tasks.q,\ - spark_stage_max_tasks.q + groupby_grouping_sets7.q + +spark.only.query.negative.files=spark_job_max_tasks.q,\ + spark_stage_max_tasks.q,\ + spark_task_failure.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7034c38b90..b6155b60b8 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -334,9 +334,8 @@ public NegativeCliConfig() { setQueryDir("ql/src/test/queries/clientnegative"); excludesFrom(testConfigProps, "minimr.query.negative.files"); + excludesFrom(testConfigProps, "spark.only.query.negative.files"); excludeQuery("authorization_uri_import.q"); - excludeQuery("spark_job_max_tasks.q"); - excludeQuery("spark_stage_max_tasks.q"); setResultsDir("ql/src/test/results/clientnegative"); setLogDir("itests/qtest/target/qfile-results/clientnegative"); @@ -571,6 +570,7 @@ public SparkNegativeCliConfig() { setQueryDir("ql/src/test/queries/clientnegative"); includesFrom(testConfigProps, "spark.query.negative.files"); + includesFrom(testConfigProps, "spark.only.query.negative.files"); setResultsDir("ql/src/test/results/clientnegative/spark"); setLogDir("itests/qtest-spark/target/qfile-results/clientnegative/spark"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1faa50a86a..120a3054c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -580,8 +580,8 @@ //========================== 40000 range starts here ========================// - SPARK_JOB_RUNTIME_ERROR(40001, - "Spark job failed during runtime. Please check stacktrace for the root cause.") + SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true) ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index a8d851fd81..5fee133953 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -602,7 +602,7 @@ public String getJobID() { public void shutdown() { } - Throwable getException() { + public Throwable getException() { return exception; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index c2408844f5..80fd9b6306 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,14 +26,21 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import com.google.common.base.Strings; +import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; + +import org.apache.spark.SparkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -71,7 +78,6 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; -import org.apache.spark.SparkException; public class SparkTask extends Task { private static final String CLASS_NAME = SparkTask.class.getName(); @@ -152,7 +158,8 @@ public int execute(DriverContext driverContext) { // Get the final state of the Spark job and parses its job info SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - getSparkJobInfo(sparkJobStatus, rc); + getSparkJobInfo(sparkJobStatus); + setSparkException(sparkJobStatus, rc); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); @@ -449,7 +456,7 @@ private void printConfigInfo() throws IOException { return counters; } - private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { + private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { try { stageIds = new ArrayList(); int[] ids = sparkJobStatus.getStageIds(); @@ -474,36 +481,65 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { succeededTaskCount = sumComplete; totalTaskCount = sumTotal; failedTaskCount = sumFailed; - if (rc != 0) { - Throwable error = sparkJobStatus.getError(); - if (error != null) { - if ((error instanceof InterruptedException) || - (error instanceof HiveException && - error.getCause() instanceof InterruptedException)) { - LOG.info("Killing Spark job since query was interrupted"); - killJob(); - } - HiveException he; - if (isOOMError(error)) { - he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); - } else { - he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); - } - setException(he); - } - } } catch (Exception e) { LOG.error("Failed to get Spark job information", e); } } + @VisibleForTesting + void setSparkException(SparkJobStatus sparkJobStatus, int rc) { + if (rc != 0) { + Throwable error = sparkJobStatus.getMonitorError(); + if (error != null) { + if ((error instanceof InterruptedException) || + (error instanceof HiveException && + error.getCause() instanceof InterruptedException)) { + LOG.info("Killing Spark job since query was interrupted"); + killJob(); + } else if (error instanceof HiveException && getException() == null) { + setException(error); + } + } + + Throwable sparkJobException = sparkJobStatus.getSparkJobException(); + if (sparkJobException != null) { + HiveException he; + if (isOOMError(sparkJobException)) { + he = new HiveException(sparkJobException, ErrorMsg.SPARK_RUNTIME_OOM); + } else if (isTaskFailure(sparkJobException)) { + he = new HiveException(sparkJobException, ErrorMsg.SPARK_TASK_RUNTIME_ERROR, + Throwables.getRootCause(sparkJobException).getMessage()); + } else { + he = new HiveException(sparkJobException, ErrorMsg.SPARK_JOB_RUNTIME_ERROR, + Throwables.getRootCause(sparkJobException).getMessage()); + } + // Prefer to propagate errors from the Spark job rather than the monitor, as errors from + // the Spark job are likely to be more relevant + setException(he); + } + } + } + + private boolean isTaskFailure(Throwable error) { + Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); + while (error != null) { + if (error instanceof OutOfMemoryError) { + return true; + } else if (taskFailedPattern.matcher(error.getMessage()).find()) { + return true; + } + error = error.getCause(); + } + return false; + } + private boolean isOOMError(Throwable error) { while (error != null) { if (error instanceof OutOfMemoryError) { return true; - } else if (error instanceof SparkException) { - String sts = Throwables.getStackTraceAsString(error); - return sts.contains("Container killed by YARN for exceeding memory limits"); + } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " + + "limits")) { + return true; } error = error.getCause(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 05253157d0..4ce9f53a37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -128,7 +128,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; - sparkJobStatus.setError(e); + sparkJobStatus.setMonitorError(e); } finally { if (done) { break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index a132f74f9e..98c228b268 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -77,7 +77,7 @@ public int startMonitor() { HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, Long.toString(timeCount)); console.printError(he.getMessage()); - sparkJobStatus.setError(he); + sparkJobStatus.setMonitorError(he); running = false; done = true; rc = 2; @@ -147,29 +147,7 @@ public int startMonitor() { done = true; break; case FAILED: - String detail = sparkJobStatus.getError().getMessage(); - StringBuilder errBuilder = new StringBuilder(); - errBuilder.append("Job failed with "); - if (detail == null) { - errBuilder.append("UNKNOWN reason"); - } else { - // We SerDe the Throwable as String, parse it for the root cause - final String CAUSE_CAPTION = "Caused by: "; - int index = detail.lastIndexOf(CAUSE_CAPTION); - if (index != -1) { - String rootCause = detail.substring(index + CAUSE_CAPTION.length()); - index = rootCause.indexOf(System.getProperty("line.separator")); - if (index != -1) { - errBuilder.append(rootCause.substring(0, index)); - } else { - errBuilder.append(rootCause); - } - } else { - errBuilder.append(detail); - } - detail = System.getProperty("line.separator") + detail; - } - console.printError(errBuilder.toString(), detail); + LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", sparkJobStatus.getSparkJobException()); running = false; done = true; rc = 3; @@ -202,7 +180,7 @@ public int startMonitor() { } rc = 1; done = true; - sparkJobStatus.setError(finalException); + sparkJobStatus.setMonitorError(finalException); } finally { if (done) { break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 8474afcc2a..93406c9bc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; @@ -47,7 +48,9 @@ void cleanup(); - Throwable getError(); + Throwable getMonitorError(); - void setError(Throwable e); + void setMonitorError(Throwable e); + + Throwable getSparkJobException(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 03f8a0b680..e0954e9184 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Throwables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hive.spark.client.MetricsCollection; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.counter.SparkCounters; @@ -181,25 +184,28 @@ public void cleanup() { } @Override - public Throwable getError() { - if (error != null) { - return error; - } + public Throwable getMonitorError() { + return error; + } + + @Override + public void setMonitorError(Throwable e) { + this.error = e; + } + + @Override + public SparkJobExceptionWrapper getSparkJobException() { if (future.isDone()) { try { future.get(); } catch (Throwable e) { - return e; + return new SparkJobExceptionWrapper(Throwables.getRootCause(e).getMessage(), Throwables + .getStackTraceAsString(e)); } } return null; } - @Override - public void setError(Throwable e) { - this.error = e; - } - private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ff969e048f..7866099f04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -35,6 +35,7 @@ import org.apache.hive.spark.client.JobContext; import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; @@ -165,18 +166,20 @@ public void cleanup() { } @Override - public Throwable getError() { - if (error != null) { - return error; - } - return jobHandle.getError(); + public Throwable getMonitorError() { + return error; } @Override - public void setError(Throwable e) { + public void setMonitorError(Throwable e) { this.error = e; } + @Override + public Throwable getSparkJobException() { + return jobHandle.getError(); + } + /** * Indicates whether the remote context is active. SparkJobMonitor can use this to decide whether * to stop monitoring. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java index bfbbe3a7d7..b6d66ae4a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java @@ -71,6 +71,14 @@ public HiveException(ErrorMsg errorMsg) { this(null, null, errorMsg, new String[0]); } + public HiveException(String message, ErrorMsg errorMsg) { + this(null, message, errorMsg); + } + + public HiveException(String message, ErrorMsg errorMsg, String... msgArgs) { + this(null, message, errorMsg, msgArgs); + } + /** * This is the recommended constructor to use since it helps use * canonical messages throughout and propagate remote errors. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 435c6b606b..1044bb511a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -17,35 +17,50 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; + +import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.JobHandle.State; +import org.apache.hive.spark.client.SparkJobExceptionWrapper; + +import org.apache.spark.SparkException; + import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; public class TestSparkTask { @Test public void sparkTask_updates_Metrics() throws IOException { - Metrics mockMetrics = Mockito.mock(Metrics.class); + Metrics mockMetrics = mock(Metrics.class); SparkTask sparkTask = new SparkTask(); sparkTask.updateTaskMetrics(mockMetrics); @@ -88,7 +103,7 @@ public void removeEmptySparkTask() { @Test public void testRemoteSparkCancel() { - RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class); + RemoteSparkJobStatus jobSts = mock(RemoteSparkJobStatus.class); when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); when(jobSts.isRemoteActive()).thenReturn(true); HiveConf hiveConf = new HiveConf(); @@ -96,6 +111,111 @@ public void testRemoteSparkCancel() { Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } + @Test + public void testSetSparkExceptionWithJobError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + + SparkException se = new SparkException("Job aborted due to stage failure: Not a task or OOM error"); + ExecutionException ee = new ExecutionException("Exception thrown by job", se); + + SparkJobExceptionWrapper exceptionWrapper = new SparkJobExceptionWrapper(Throwables + .getRootCause(ee).getMessage(), Throwables.getStackTraceAsString(ee)); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(exceptionWrapper); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + Assert.assertTrue(sparkTask.getException().getMessage().contains("Not a task or OOM error")); + } + + @Test + public void testSetSparkExceptionWithTimeoutError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new HiveException(ErrorMsg + .SPARK_JOB_MONITOR_TIMEOUT, Long.toString(60))); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT); + Assert.assertTrue(sparkTask.getException().getMessage().contains("60s")); + } + + @Test + public void testSetSparkExceptionWithOOMError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + + Throwable jobError = new Throwable("Caused by: org.apache.spark.SparkException: Container " + + "killed by YARN for exceeding memory limits"); + SparkJobExceptionWrapper exceptionWrapper = new SparkJobExceptionWrapper(Throwables + .getRootCause(jobError).getMessage(), Throwables.getStackTraceAsString(jobError)); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(exceptionWrapper); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_RUNTIME_OOM); + } + + @Test + public void testSparkExceptionAndMonitorError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new RuntimeException()); + when(mockSparkJobStatus.getSparkJobException()).thenReturn(new SparkJobExceptionWrapper("", "")); + + sparkTask.setSparkException(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + } + + @Test + public void testHandleInterruptedException() throws Exception { + HiveConf hiveConf = new HiveConf(); + + SparkTask sparkTask = new SparkTask(); + sparkTask.setWork(mock(SparkWork.class)); + + DriverContext mockDriverContext = mock(DriverContext.class); + + QueryState mockQueryState = mock(QueryState.class); + when(mockQueryState.getConf()).thenReturn(hiveConf); + + sparkTask.initialize(mockQueryState, null, mockDriverContext, null); + + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getMonitorError()).thenReturn(new InterruptedException()); + + SparkSession mockSparkSession = mock(SparkSession.class); + SparkJobRef mockSparkJobRef = mock(SparkJobRef.class); + + when(mockSparkJobRef.monitorJob()).thenReturn(2); + when(mockSparkJobRef.getSparkJobStatus()).thenReturn(mockSparkJobStatus); + when(mockSparkSession.submit(any(), any())).thenReturn(mockSparkJobRef); + + SessionState.start(hiveConf); + SessionState.get().setSparkSession(mockSparkSession); + + sparkTask.execute(mockDriverContext); + + verify(mockSparkJobRef, atLeastOnce()).cancelJob(); + + when(mockSparkJobStatus.getMonitorError()).thenReturn( + new HiveException(new InterruptedException())); + + sparkTask.execute(mockDriverContext); + + verify(mockSparkJobRef, atLeastOnce()).cancelJob(); + } private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q b/ql/src/test/queries/clientnegative/spark_task_failure.q new file mode 100644 index 0000000000..7bb8c503eb --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_task_failure.q @@ -0,0 +1,9 @@ +ADD FILE ../../data/scripts/error_script; + +EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out new file mode 100644 index 0000000000..1e680024a7 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out @@ -0,0 +1,53 @@ +PREHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Transform Operator + command: error_script + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to task failures: [Error 20003]: An error occurred when trying to close the Operator running your custom script. diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 4dbc4908bf..7baf3d2ddc 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -25,7 +25,7 @@ import com.google.common.base.Throwables; -abstract class BaseProtocol extends RpcDispatcher { +public abstract class BaseProtocol extends RpcDispatcher { protected static class CancelJob implements Serializable { @@ -97,17 +97,17 @@ } - protected static class JobResult implements Serializable { + public static class JobResult implements Serializable { final String id; final T result; - final String error; + final Throwable error; final SparkCounters sparkCounters; JobResult(String id, T result, Throwable error, SparkCounters sparkCounters) { this.id = id; this.result = result; - this.error = error != null ? Throwables.getStackTraceAsString(error) : null; + this.error = error; this.sparkCounters = sparkCounters; } @@ -115,6 +115,15 @@ this(null, null, null, null); } + @Override + public String toString() { + return "JobResult{" + + "id='" + id + '\'' + + ", result=" + result + + ", error=" + error + + ", sparkCounters=" + sparkCounters + + '}'; + } } protected static class JobStarted implements Serializable { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java new file mode 100644 index 0000000000..c815f40e9c --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java @@ -0,0 +1,59 @@ +package org.apache.hive.spark.client; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.SparkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.Objects; + +public class JobResultSerializer extends Serializer> { + + private static final Logger LOG = LoggerFactory.getLogger(JobResultSerializer.class); + + private final JavaSerializer javaSerializer = new JavaSerializer(); + + @Override + public BaseProtocol.JobResult read(Kryo kryo, Input input, Class type) { + return (BaseProtocol.JobResult) javaSerializer.read(kryo, input, type); + } + + @Override + public void write(Kryo kryo, Output output, BaseProtocol.JobResult object) { + try { + javaSerializer.write(kryo, output, object); + } catch (Exception e) { + LOG.warn("Unable to serialize JobResult object " + object, e); + output.clear(); + ((ByteArrayOutputStream) output.getOutputStream()).reset(); + BaseProtocol.JobResult serializableJobResult = new BaseProtocol.JobResult<>(object.id, + object.result, convertToSerializableSparkException(object.error), object.sparkCounters); + new JavaSerializer().write(kryo, output, serializableJobResult); + } + } + + @VisibleForTesting + static SparkException convertToSerializableSparkException(Throwable error) { + SparkException serializableThrowable = new SparkException( + error instanceof SparkException ? error.getMessage() : + error.getClass().getName() + ": " + Objects.toString(error.getMessage(), ""), + error.getCause() == null ? null : convertToSerializableSparkException(error.getCause())); + serializableThrowable.setStackTrace(error.getStackTrace()); + Arrays.stream(error.getSuppressed()).forEach(serializableThrowable::addSuppressed); + return serializableThrowable; + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 665ed92898..e4f72a3531 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -38,7 +38,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.io.PrintStream; import java.io.Serializable; import java.io.Writer; import java.net.URI; @@ -64,7 +63,6 @@ import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer; import org.apache.spark.SparkContext; -import org.apache.spark.SparkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -592,7 +590,7 @@ private void handle(ChannelHandlerContext ctx, JobResult msg) { if (handle != null) { LOG.info("Received result for {}", msg.id); handle.setSparkCounters(msg.sparkCounters); - Throwable error = msg.error != null ? new SparkException(msg.error) : null; + Throwable error = msg.error; if (error == null) { handle.setSuccess(msg.result); } else { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java new file mode 100644 index 0000000000..2f6182cca7 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkJobExceptionWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +public class SparkJobExceptionWrapper extends Exception { + + private boolean serialized; + private String rootCause; + private String fullStackTrace; + + public SparkJobExceptionWrapper() { + this(null, null); + } + + public SparkJobExceptionWrapper(Throwable error) { + super(error); + this.serialized = true; + } + + public SparkJobExceptionWrapper(String rootCause, String fullStackTrace) { + this.rootCause = rootCause; + this.fullStackTrace = fullStackTrace; + } + + public String getRootCause() { + return rootCause; + } + + public String getFullStackTrace() { + return fullStackTrace; + } + + public boolean getSerialized() { + return serialized; + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index 9e789cf5be..8f7b9e0fb9 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.List; +import org.apache.hive.spark.client.BaseProtocol; +import org.apache.hive.spark.client.JobResultSerializer; import org.objenesis.strategy.StdInstantiatorStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +62,7 @@ protected Kryo initialValue() { kryo.register(klass, REG_ID_BASE + count); count++; } + kryo.addDefaultSerializer(BaseProtocol.JobResult.class, new JobResultSerializer()); kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); return kryo; } diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java new file mode 100644 index 0000000000..aa658fa3df --- /dev/null +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java @@ -0,0 +1,117 @@ +package org.apache.hive.spark.client; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.ByteBufferInputStream; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.SparkException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class TestJobResultSerializer { + + @Test + public void testSerializablableExceptionSingleBlankException() { + RuntimeException blankRuntimeException = new RuntimeException(); + + SparkException serializableException = JobResultSerializer.convertToSerializableSparkException( + blankRuntimeException); + + Assert.assertTrue(serializableException.getMessage().contains(blankRuntimeException.getClass + ().getName())); + Assert.assertArrayEquals(blankRuntimeException.getStackTrace(), + serializableException.getStackTrace()); + } + + @Test + public void testSerializablableExceptionSingleException() { + String message = "hello"; + RuntimeException blankRuntimeException = new RuntimeException(message); + + SparkException serializableException = JobResultSerializer.convertToSerializableSparkException( + blankRuntimeException); + + Assert.assertTrue(serializableException.getMessage().contains(blankRuntimeException.getMessage())); + Assert.assertArrayEquals(blankRuntimeException.getStackTrace(), + serializableException.getStackTrace()); + } + + @Test + public void testSerializablableExceptionNestedBlankException() { + RuntimeException nestedBlankRuntimeException = new RuntimeException(); + RuntimeException blankRuntimeException = new RuntimeException(nestedBlankRuntimeException); + + SparkException serializableException = JobResultSerializer.convertToSerializableSparkException( + blankRuntimeException); + + Assert.assertEquals(serializableException.getMessage(), blankRuntimeException + .getClass().getName() + ": " + blankRuntimeException.getMessage()); + Assert.assertArrayEquals(blankRuntimeException.getStackTrace(), + serializableException.getStackTrace()); + + Assert.assertEquals(nestedBlankRuntimeException.getClass().getName() + ": ", + serializableException.getCause().getMessage()); + Assert.assertArrayEquals(blankRuntimeException.getCause().getStackTrace(), + serializableException.getCause().getStackTrace()); + } + + @Test + public void testSerializeNonSerializableObjectWriteClassAndObject() { + Kryo kryo = new Kryo(); + kryo.addDefaultSerializer(BaseProtocol.JobResult.class, new JobResultSerializer()); + + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + Output output = new Output(boas); + + BaseProtocol.JobResult jobResult = new BaseProtocol.JobResult<>("1", "result", new + NonSerializableException("content"), new SparkCounters(null)); + + kryo.writeClassAndObject(output, jobResult); + output.flush(); + + Input kryoIn = new Input(new ByteArrayInputStream(boas.toByteArray())); + kryo.readClassAndObject(kryoIn); + } + + @Test + public void testSerializeNonSerializableObjectWriteObject() { + Kryo kryo = new Kryo(); + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + Output output = new Output(boas); + + BaseProtocol.JobResult jobResult = new BaseProtocol.JobResult<>("1", "result", new + NonSerializableException("content"), new SparkCounters(null)); + + new JobResultSerializer().write(kryo, output, jobResult); + output.flush(); + + new JobResultSerializer().read(kryo, new Input(new ByteArrayInputStream(boas.toByteArray())), + BaseProtocol.JobResult.class); + } + + private static final class NonSerializableException extends Exception { + + private static final long serialVersionUID = 2548414562750016219L; + + private final NonSerializableObject nonSerializableObject; + + private NonSerializableException(String content) { + this.nonSerializableObject = new NonSerializableObject(content); + } + } + + private static final class NonSerializableObject { + + private String content; + + private NonSerializableObject(String content) { + this.content = content; + } + } +} diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index fb31c933ca..89ffeec1c3 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -133,8 +133,36 @@ public void call(SparkClient client) throws Exception { handle.get(TIMEOUT, TimeUnit.SECONDS); fail("Should have thrown an exception."); } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof SparkException); - assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello")); + assertTrue(ee.getCause() instanceof IllegalStateException); + assertTrue(ee.getCause().getMessage().contains("Hello")); + } + + // Try an invalid state transition on the handle. This ensures that the actual state + // change we're interested in actually happened, since internally the handle serializes + // state changes. + assertFalse(((JobHandleImpl)handle).changeState(JobHandle.State.SENT)); + + verify(listener).onJobQueued(handle); + verify(listener).onJobStarted(handle); + verify(listener).onJobFailed(same(handle), any(Throwable.class)); + } + }); + } + + @Test + public void testErrorJobNotSerializable() throws Exception { + runTest(new TestFunction() { + @Override + public void call(SparkClient client) throws Exception { + JobHandle.Listener listener = newListener(); + List> listeners = Lists.newArrayList(listener); + JobHandle handle = client.submit(new ErrorJobNotSerializable(), listeners); + try { + handle.get(TIMEOUT, TimeUnit.SECONDS); + fail("Should have thrown an exception."); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof ErrorJobNotSerializable.NonSerializableException); + assertTrue(ee.getCause().getMessage().contains("Hello")); } // Try an invalid state transition on the handle. This ensures that the actual state @@ -331,6 +359,34 @@ public String call(JobContext jc) { } + private static class ErrorJobNotSerializable implements Job { + + private static final class NonSerializableException extends Exception { + + private static final long serialVersionUID = 2548414562750016219L; + + private final NonSerializableObject nonSerializableObject; + + private NonSerializableException(String content) { + this.nonSerializableObject = new NonSerializableObject(content); + } + } + + private static final class NonSerializableObject { + + String content; + + private NonSerializableObject(String content) { + this.content = content; + } + } + + @Override + public String call(JobContext jc) throws NonSerializableException { + throw new NonSerializableException("Hello"); + } + } + private static class SparkJob implements Job { @Override