commit ffb32c7ab10b7c9fd4cbcd5ef79cfc8f1baed3cc Author: Sahil Takiar Date: Mon Mar 5 22:57:16 2018 -0600 HIVE-18831: Differentiate errors that are thrown by Spark tasks diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e8aa827523..32251ac8ef 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1653,7 +1653,8 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ groupby_grouping_sets7.q,\ spark_job_max_tasks.q,\ - spark_stage_max_tasks.q + spark_stage_max_tasks.q,\ + spark_task_failure.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index d3546d4b9b..1438387c3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -582,7 +582,8 @@ //========================== 40000 range starts here ========================// SPARK_JOB_RUNTIME_ERROR(40001, - "Spark job failed during runtime. Please check stacktrace for the root cause.") + "Spark job failed during runtime. Please check stacktrace for the root cause."), + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to Spark task failures: {0}", true) ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 76f6ecc2b4..f4a2aedca5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -28,6 +28,7 @@ import java.util.Map; import com.google.common.base.Throwables; + import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; @@ -433,6 +434,8 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { HiveException he; if (isOOMError(error)) { he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); + } else if (isTaskFailure(error)) { + he = new HiveException(error, ErrorMsg.SPARK_TASK_RUNTIME_ERROR, getTaskFailure(error)); } else { he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); } @@ -444,6 +447,42 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { } } + private boolean isTaskFailure(Throwable error) { + while (error != null) { + if (error instanceof SparkException) { + String sts = Throwables.getStackTraceAsString(error); + return sts.contains("Job aborted due to stage failure"); + } + error = error.getCause(); + } + return false; + } + + private String getTaskFailure(Throwable error) { + String detail = error.getMessage(); + StringBuilder errBuilder = new StringBuilder(); + errBuilder.append("Tasks failed with "); + if (detail == null) { + errBuilder.append("UNKNOWN reason"); + } else { + // We SerDe the Throwable as String, parse it for the root cause + final String CAUSE_CAPTION = "Caused by: "; + int index = detail.lastIndexOf(CAUSE_CAPTION); + if (index != -1) { + String rootCause = detail.substring(index + CAUSE_CAPTION.length()); + index = rootCause.indexOf(System.getProperty("line.separator")); + if (index != -1) { + errBuilder.append(rootCause.substring(0, index)); + } else { + errBuilder.append(rootCause); + } + } else { + errBuilder.append(detail); + } + } + return errBuilder.toString(); + } + private boolean isOOMError(Throwable error) { while (error != null) { if (error instanceof OutOfMemoryError) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 3467ae4048..b9e3610f97 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -147,29 +147,8 @@ public int startMonitor() { done = true; break; case FAILED: - String detail = sparkJobStatus.getError().getMessage(); - StringBuilder errBuilder = new StringBuilder(); - errBuilder.append("Job failed with "); - if (detail == null) { - errBuilder.append("UNKNOWN reason"); - } else { - // We SerDe the Throwable as String, parse it for the root cause - final String CAUSE_CAPTION = "Caused by: "; - int index = detail.lastIndexOf(CAUSE_CAPTION); - if (index != -1) { - String rootCause = detail.substring(index + CAUSE_CAPTION.length()); - index = rootCause.indexOf(System.getProperty("line.separator")); - if (index != -1) { - errBuilder.append(rootCause.substring(0, index)); - } else { - errBuilder.append(rootCause); - } - } else { - errBuilder.append(detail); - } - detail = System.getProperty("line.separator") + detail; - } - console.printError(errBuilder.toString(), detail); + LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", sparkJobStatus + .getError()); running = false; done = true; rc = 3; diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q b/ql/src/test/queries/clientnegative/spark_task_failure.q new file mode 100644 index 0000000000..7bb8c503eb --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_task_failure.q @@ -0,0 +1,9 @@ +ADD FILE ../../data/scripts/error_script; + +EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src; + diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out new file mode 100644 index 0000000000..3aaa5fa53c --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out @@ -0,0 +1,53 @@ +PREHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Transform Operator + command: error_script + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to Spark task failures: Tasks failed with org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script.