commit 0ccfdf35355767371757fb1fa3b6effd6748c954 Author: Sahil Takiar Date: Mon Mar 5 22:57:16 2018 -0600 HIVE-18831: Differentiate errors that are thrown by Spark tasks diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 4cf12e4d1c..92c2f70686 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1657,7 +1657,8 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ groupby_grouping_sets7.q,\ spark_job_max_tasks.q,\ - spark_stage_max_tasks.q + spark_stage_max_tasks.q,\ + spark_task_failure.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1faa50a86a..d0a7ac978a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -580,8 +580,9 @@ //========================== 40000 range starts here ========================// - SPARK_JOB_RUNTIME_ERROR(40001, - "Spark job failed during runtime. Please check stacktrace for the root cause.") + SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to Spark task failures: Tasks failed with" + + " {0}", true) ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index a8d851fd81..5fee133953 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -602,7 +602,7 @@ public String getJobID() { public void shutdown() { } - Throwable getException() { + public Throwable getException() { return exception; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 76f6ecc2b4..efb73d451e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,8 +26,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; + import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; @@ -396,7 +399,8 @@ private void printConfigInfo() throws IOException { return counters; } - private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { + @VisibleForTesting + void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { try { stageIds = new ArrayList(); int[] ids = sparkJobStatus.getStageIds(); @@ -433,8 +437,12 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { HiveException he; if (isOOMError(error)) { he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); + } else if (isTaskFailure(error)) { + he = new HiveException(error, ErrorMsg.SPARK_TASK_RUNTIME_ERROR, + getNestedFailureMessage(error)); } else { - he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR, + getNestedFailureMessage(error)); } setException(he); } @@ -444,6 +452,42 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { } } + private boolean isTaskFailure(Throwable error) { + Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); + while (error != null) { + if (error instanceof SparkException) { + String sts = Throwables.getStackTraceAsString(error); + return taskFailedPattern.matcher(sts).find(); + } + error = error.getCause(); + } + return false; + } + + private String getNestedFailureMessage(Throwable error) { + String detail = error.getMessage(); + StringBuilder errBuilder = new StringBuilder(); + if (detail == null) { + errBuilder.append("UNKNOWN"); + } else { + // We SerDe the Throwable as String, parse it for the root cause + final String CAUSE_CAPTION = "Caused by: "; + int index = detail.lastIndexOf(CAUSE_CAPTION); + if (index != -1) { + String rootCause = detail.substring(index + CAUSE_CAPTION.length()); + index = rootCause.indexOf(System.getProperty("line.separator")); + if (index != -1) { + errBuilder.append(rootCause.substring(0, index)); + } else { + errBuilder.append(rootCause); + } + } else { + errBuilder.append(detail); + } + } + return errBuilder.toString(); + } + private boolean isOOMError(Throwable error) { while (error != null) { if (error instanceof OutOfMemoryError) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 3467ae4048..b9e3610f97 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -147,29 +147,8 @@ public int startMonitor() { done = true; break; case FAILED: - String detail = sparkJobStatus.getError().getMessage(); - StringBuilder errBuilder = new StringBuilder(); - errBuilder.append("Job failed with "); - if (detail == null) { - errBuilder.append("UNKNOWN reason"); - } else { - // We SerDe the Throwable as String, parse it for the root cause - final String CAUSE_CAPTION = "Caused by: "; - int index = detail.lastIndexOf(CAUSE_CAPTION); - if (index != -1) { - String rootCause = detail.substring(index + CAUSE_CAPTION.length()); - index = rootCause.indexOf(System.getProperty("line.separator")); - if (index != -1) { - errBuilder.append(rootCause.substring(0, index)); - } else { - errBuilder.append(rootCause); - } - } else { - errBuilder.append(detail); - } - detail = System.getProperty("line.separator") + detail; - } - console.printError(errBuilder.toString(), detail); + LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", sparkJobStatus + .getError()); running = false; done = true; rc = 3; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 435c6b606b..b2908e03cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -17,35 +17,36 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hive.spark.client.JobHandle.State; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; public class TestSparkTask { @Test public void sparkTask_updates_Metrics() throws IOException { - Metrics mockMetrics = Mockito.mock(Metrics.class); + Metrics mockMetrics = mock(Metrics.class); SparkTask sparkTask = new SparkTask(); sparkTask.updateTaskMetrics(mockMetrics); @@ -88,7 +89,7 @@ public void removeEmptySparkTask() { @Test public void testRemoteSparkCancel() { - RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class); + RemoteSparkJobStatus jobSts = mock(RemoteSparkJobStatus.class); when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); when(jobSts.isRemoteActive()).thenReturn(true); HiveConf hiveConf = new HiveConf(); @@ -96,6 +97,19 @@ public void testRemoteSparkCancel() { Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } + @Test + public void testGetSparkJobInfoWithJobError() { + SparkTask sparkTask = new SparkTask(); + SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class); + when(mockSparkJobStatus.getError()).thenReturn(new Throwable("Caused by: " + + "org.apache.spark.SparkException: Job aborted due to stage failure: " + + "Not a task or OOM error")); + sparkTask.getSparkJobInfo(mockSparkJobStatus, 3); + + Assert.assertTrue(sparkTask.getException() instanceof HiveException); + Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(), + ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + } private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q b/ql/src/test/queries/clientnegative/spark_task_failure.q new file mode 100644 index 0000000000..7bb8c503eb --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_task_failure.q @@ -0,0 +1,9 @@ +ADD FILE ../../data/scripts/error_script; + +EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out new file mode 100644 index 0000000000..3aaa5fa53c --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out @@ -0,0 +1,53 @@ +PREHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Transform Operator + command: error_script + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to Spark task failures: Tasks failed with org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script.