diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 6b949d2..982fc41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; @@ -464,7 +465,9 @@ HIVE_GROUPING_FUNCTION_EXPR_NOT_IN_GROUPBY(10409, "Expression in GROUPING function not present in GROUP BY"), ALTER_TABLE_NON_PARTITIONED_TABLE_CASCADE_NOT_SUPPORTED(10410, "Alter table with non-partitioned table does not support cascade"), + //========================== 20000 range starts here ========================// + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."), @@ -492,10 +495,15 @@ FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true), WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true), + SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}." + + " Please fix and try again.", true), + SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."), + // An exception from runtime that will show the full stack to client UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true), //========================== 30000 range starts here ========================// + STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " + "There was a error to retrieve the StatsPublisher, and retrying " + "might help. If you dont want the query to fail because accurate statistics " + @@ -535,7 +543,6 @@ "to fail because of this, set hive.stats.atomic=false", true), STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true), - INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" + " file format of the destination table."), @@ -555,7 +562,29 @@ CONCATENATE_UNSUPPORTED_PARTITION_ARCHIVED(30032, "Concatenate/Merge can not be performed on archived partitions"), CONCATENATE_UNSUPPORTED_TABLE_NON_NATIVE(30033, "Concatenate/Merge can not be performed on non-native tables"), CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED(30034, "Concatenate/Merge can only be performed on managed tables"), - CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035, "Concatenate/Merge can not be performed on transactional tables") + CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035, "Concatenate/Merge can not be performed on transactional tables"), + + SPARK_GET_JOB_INFO_TIMEOUT(30036, "Spark job timed out after {0} seconds while getting job info", true), + SPARK_JOB_MONITOR_TIMEOUT(30037, "Job hasn't been submitted after {0}s." + + " Aborting it.\nPossible reasons include network issues, " + + "errors in remote driver or the cluster has no available resources, etc.\n" + + "Please check YARN or Spark driver's logs for further information.\n" + + "The timeout is controlled by " + HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT + ".", true), + + // Various errors when creating Spark client + SPARK_CREATE_CLIENT_TIMEOUT(30038, "Timed out while creating Spark client for session {0}.", true), + SPARK_CREATE_CLIENT_QUEUE_FULL(30039, "Failed to create Spark client because job queue is full: {0}.", true), + SPARK_CREATE_CLIENT_INTERRUPTED(30040, "Interrupted while creating Spark client for session {0}", true), + SPARK_CREATE_CLIENT_ERROR(30041, "Failed to create Spark client for Spark session {0}", true), + SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042, "Failed to create Spark client due to invalid " + + "resource request: {0}", true), + SPARK_CREATE_CLIENT_CLOSED_SESSION(30043, "Cannot create Spark client on a closed session {0}", true), + + SPARK_JOB_INTERRUPTED(30044, "Spark job was interrupted while executing"), + + //========================== 40000 range starts here ========================// + + SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed during runtime. Please check stacktrace for the root cause.") ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0f5f708..0083ef3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,7 +26,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; +import com.google.common.base.Throwables; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; @@ -34,6 +37,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -67,6 +71,7 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; +import org.apache.spark.SparkException; public class SparkTask extends Task { private static final String CLASS_NAME = SparkTask.class.getName(); @@ -155,7 +160,12 @@ public int execute(DriverContext driverContext) { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); LOG.error(msg, e); setException(e); - rc = 1; + if (e instanceof HiveException) { + HiveException he = (HiveException) e; + rc = he.getCanonicalErrorMsg().getErrorCode(); + } else { + rc = 1; + } } finally { startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING); // The startTime may not be set if the sparkTask finished too fast, @@ -417,11 +427,30 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { error.getCause() instanceof InterruptedException)) { killJob(); } - setException(error); + HiveException he; + if (isOOMError(error)) { + he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); + } else { + he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + } + setException(he); } } } catch (Exception e) { LOG.error("Failed to get Spark job information", e); } } + + private boolean isOOMError(Throwable error) { + while (error != null) { + if (error instanceof OutOfMemoryError) { + return true; + } else if (error instanceof SparkException) { + String sts = Throwables.getStackTraceAsString(error); + return sts.contains("Container killed by YARN for exceeding memory limits"); + } + error = error.getCause(); + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index ba61868..b2e5947 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -19,11 +19,16 @@ import java.io.IOException; import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +55,7 @@ private HiveSparkClient hiveSparkClient; private Path scratchDir; private final Object dirLock = new Object(); + private String matchedString = null; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -64,9 +70,13 @@ public void open(HiveConf conf) throws HiveException { hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. - String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId : - "Spark Session " + sessionId + " is closed before Spark client is created"; - throw new HiveException(msg, e); + HiveException he; + if (isOpen) { + he = getHiveException(e); + } else { + he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } + throw he; } LOG.info("Spark session {} is successfully opened", sessionId); } @@ -152,6 +162,47 @@ private Path createScratchDir() throws IOException { return sparkDir; } + private HiveException getHiveException(Throwable e) { + Throwable oe = e; + while (e != null) { + if (e instanceof TimeoutException) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + } else if (e instanceof RuntimeException) { + String sts = Throwables.getStackTraceAsString(e); + if (matches(sts,".*ApplicationMaster for attempt.*timed out.*")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + } else if (matches(sts, "submitted by user.*to unknown queue:.*\n")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString); + } else if (matches(sts, "Queue.*is STOPPED")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString); + } else if (matches(sts, "Queue.*already has.*applications")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString); + } else if (matches(sts, "Required executor memory.*is above the max threshold")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, matchedString); + } else if (matches(sts, "initial executor number.*must between min executor.*and max executor number.*\n")) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, matchedString); + } else { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + } + } else if (e instanceof InterruptedException) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + } + e = e.getCause(); + } + + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + } + + private boolean matches(String input, String regex) { + Pattern p = Pattern.compile(regex); + Matcher m = p.matcher(input); + boolean result = m.find(); + if (result) { + this.matchedString = m.group(); + } + return result; + } + private void cleanScratchDir() throws IOException { if (scratchDir != null) { FileSystem fs = scratchDir.getFileSystem(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 37b8363..f94ad0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -23,9 +23,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.spark.client.JobHandle; import org.apache.spark.JobExecutionStatus; @@ -70,11 +72,11 @@ public int startMonitor() { case QUEUED: long timeCount = (System.currentTimeMillis() - startTime) / 1000; if ((timeCount > monitorTimeoutInterval)) { - console.printError("Job hasn't been submitted after " + timeCount + "s." + - " Aborting it.\nPossible reasons include network issues, " + - "errors in remote driver or the cluster has no available resources, etc.\n" + - "Please check YARN or Spark driver's logs for further information."); + HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, + Long.toString(timeCount)); + console.printError(he.getMessage()); console.printError("Status: " + state); + sparkJobStatus.setError(he); running = false; done = true; rc = 2; @@ -181,6 +183,10 @@ public int startMonitor() { Thread.sleep(checkInterval); } } catch (Exception e) { + Exception finalException = e; + if (e instanceof InterruptedException) { + finalException = new HiveException(e, ErrorMsg.SPARK_JOB_INTERRUPTED); + } String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; msg = "Failed to monitor Job[" + sparkJobStatus.getJobId() + "]" + msg; @@ -190,7 +196,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; - sparkJobStatus.setError(e); + sparkJobStatus.setError(finalException); } finally { if (done) { break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 67db303..7c449d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -20,6 +20,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -174,7 +177,8 @@ private SparkJobInfo getSparkJobInfo() throws HiveException { return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Failed to get job info.", e); - throw new HiveException(e); + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT, + Long.toString(sparkClientTimeoutInSeconds)); } }