diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 6b949d2..476c261 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; @@ -464,7 +465,9 @@ HIVE_GROUPING_FUNCTION_EXPR_NOT_IN_GROUPBY(10409, "Expression in GROUPING function not present in GROUP BY"), ALTER_TABLE_NON_PARTITIONED_TABLE_CASCADE_NOT_SUPPORTED(10410, "Alter table with non-partitioned table does not support cascade"), + //========================== 20000 range starts here ========================// + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."), @@ -492,10 +495,15 @@ FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true), WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true), + SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}." + + " Please fix and try again.", true), + SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."), + // An exception from runtime that will show the full stack to client UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true), //========================== 30000 range starts here ========================// + STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " + "There was a error to retrieve the StatsPublisher, and retrying " + "might help. If you dont want the query to fail because accurate statistics " + @@ -535,7 +543,6 @@ "to fail because of this, set hive.stats.atomic=false", true), STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true), - INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" + " file format of the destination table."), @@ -555,7 +562,37 @@ CONCATENATE_UNSUPPORTED_PARTITION_ARCHIVED(30032, "Concatenate/Merge can not be performed on archived partitions"), CONCATENATE_UNSUPPORTED_TABLE_NON_NATIVE(30033, "Concatenate/Merge can not be performed on non-native tables"), CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED(30034, "Concatenate/Merge can only be performed on managed tables"), - CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035, "Concatenate/Merge can not be performed on transactional tables") + CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035, + "Concatenate/Merge can not be performed on transactional tables"), + + SPARK_GET_JOB_INFO_TIMEOUT(30036, + "Spark job timed out after {0} seconds while getting job info", true), + SPARK_JOB_MONITOR_TIMEOUT(30037, "Job hasn't been submitted after {0}s." + + " Aborting it.\nPossible reasons include network issues, " + + "errors in remote driver or the cluster has no available resources, etc.\n" + + "Please check YARN or Spark driver's logs for further information.\n" + + "The timeout is controlled by " + HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT + ".", true), + + // Various errors when creating Spark client + SPARK_CREATE_CLIENT_TIMEOUT(30038, + "Timed out while creating Spark client for session {0}.", true), + SPARK_CREATE_CLIENT_QUEUE_FULL(30039, + "Failed to create Spark client because job queue is full: {0}.", true), + SPARK_CREATE_CLIENT_INTERRUPTED(30040, + "Interrupted while creating Spark client for session {0}", true), + SPARK_CREATE_CLIENT_ERROR(30041, + "Failed to create Spark client for Spark session {0}", true), + SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042, + "Failed to create Spark client due to invalid resource request: {0}", true), + SPARK_CREATE_CLIENT_CLOSED_SESSION(30043, + "Cannot create Spark client on a closed session {0}", true), + + SPARK_JOB_INTERRUPTED(30044, "Spark job was interrupted while executing"), + + //========================== 40000 range starts here ========================// + + SPARK_JOB_RUNTIME_ERROR(40001, + "Spark job failed during runtime. Please check stacktrace for the root cause.") ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0f5f708..6915cf1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -67,6 +69,7 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; +import org.apache.spark.SparkException; public class SparkTask extends Task { private static final String CLASS_NAME = SparkTask.class.getName(); @@ -155,7 +158,12 @@ public int execute(DriverContext driverContext) { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); LOG.error(msg, e); setException(e); - rc = 1; + if (e instanceof HiveException) { + HiveException he = (HiveException) e; + rc = he.getCanonicalErrorMsg().getErrorCode(); + } else { + rc = 1; + } } finally { startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING); // The startTime may not be set if the sparkTask finished too fast, @@ -417,11 +425,30 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { error.getCause() instanceof InterruptedException)) { killJob(); } - setException(error); + HiveException he; + if (isOOMError(error)) { + he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM); + } else { + he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR); + } + setException(he); } } } catch (Exception e) { LOG.error("Failed to get Spark job information", e); } } + + private boolean isOOMError(Throwable error) { + while (error != null) { + if (error instanceof OutOfMemoryError) { + return true; + } else if (error instanceof SparkException) { + String sts = Throwables.getStackTraceAsString(error); + return sts.contains("Container killed by YARN for exceeding memory limits"); + } + error = error.getCause(); + } + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index ba61868..0c61566 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -18,12 +18,20 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import java.io.IOException; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,15 +52,30 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); private static final String SPARK_DIR = "_spark_session_dir"; + /** Regex for different Spark session error messages */ + private static final String AM_TIMEOUT_ERR = ".*ApplicationMaster for attempt.*timed out.*"; + private static final String UNKNOWN_QUEUE_ERR = "(submitted by user.*to unknown queue:.*)\n"; + private static final String STOPPED_QUEUE_ERR = "(Queue.*is STOPPED)"; + private static final String FULL_QUEUE_ERR = "(Queue.*already has.*applications)"; + private static final String INVALILD_MEM_ERR = + "(Required executor memory.*is above the max threshold.*) of this"; + private static final String INVALID_CORE_ERR = + "(initial executor number.*must between min executor.*and max executor number.*)\n"; + + /** Pre-compiled error patterns. Shared between all Spark sessions */ + private static Map errorPatterns; + private HiveConf conf; private boolean isOpen; private final String sessionId; private HiveSparkClient hiveSparkClient; private Path scratchDir; private final Object dirLock = new Object(); + private String matchedString = null; public SparkSessionImpl() { sessionId = makeSessionId(); + initErrorPatterns(); } @Override @@ -64,9 +87,13 @@ public void open(HiveConf conf) throws HiveException { hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. - String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId : - "Spark Session " + sessionId + " is closed before Spark client is created"; - throw new HiveException(msg, e); + HiveException he; + if (isOpen) { + he = getHiveException(e); + } else { + he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } + throw he; } LOG.info("Spark session {} is successfully opened", sessionId); } @@ -152,6 +179,67 @@ private Path createScratchDir() throws IOException { return sparkDir; } + private static void initErrorPatterns() { + errorPatterns = Maps.newHashMap( + new ImmutableMap.Builder() + .put(AM_TIMEOUT_ERR, Pattern.compile(AM_TIMEOUT_ERR)) + .put(UNKNOWN_QUEUE_ERR, Pattern.compile(UNKNOWN_QUEUE_ERR)) + .put(STOPPED_QUEUE_ERR, Pattern.compile(STOPPED_QUEUE_ERR)) + .put(FULL_QUEUE_ERR, Pattern.compile(FULL_QUEUE_ERR)) + .put(INVALILD_MEM_ERR, Pattern.compile(INVALILD_MEM_ERR)) + .put(INVALID_CORE_ERR, Pattern.compile(INVALID_CORE_ERR)) + .build() + ); + } + + @VisibleForTesting + HiveException getHiveException(Throwable e) { + Throwable oe = e; + while (e != null) { + if (e instanceof TimeoutException) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + } else if (e instanceof InterruptedException) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId); + } else if (e instanceof RuntimeException) { + String sts = Throwables.getStackTraceAsString(e); + if (matches(sts, AM_TIMEOUT_ERR)) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + } else if (matches(sts, UNKNOWN_QUEUE_ERR) || matches(sts, STOPPED_QUEUE_ERR)) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString); + } else if (matches(sts, FULL_QUEUE_ERR)) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString); + } else if (matches(sts, INVALILD_MEM_ERR) || matches(sts, INVALID_CORE_ERR)) { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, + matchedString); + } else { + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + } + } + e = e.getCause(); + } + + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + } + + @VisibleForTesting + String getMatchedString() { + return matchedString; + } + + private boolean matches(String input, String regex) { + if (!errorPatterns.containsKey(regex)) { + LOG.warn("No error pattern found for regex: {}", regex); + return false; + } + Pattern p = errorPatterns.get(regex); + Matcher m = p.matcher(input); + boolean result = m.find(); + if (result && m.groupCount() == 1) { + this.matchedString = m.group(1); + } + return result; + } + private void cleanScratchDir() throws IOException { if (scratchDir != null) { FileSystem fs = scratchDir.getFileSystem(conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 37b8363..f94ad0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -23,9 +23,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.spark.client.JobHandle; import org.apache.spark.JobExecutionStatus; @@ -70,11 +72,11 @@ public int startMonitor() { case QUEUED: long timeCount = (System.currentTimeMillis() - startTime) / 1000; if ((timeCount > monitorTimeoutInterval)) { - console.printError("Job hasn't been submitted after " + timeCount + "s." + - " Aborting it.\nPossible reasons include network issues, " + - "errors in remote driver or the cluster has no available resources, etc.\n" + - "Please check YARN or Spark driver's logs for further information."); + HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, + Long.toString(timeCount)); + console.printError(he.getMessage()); console.printError("Status: " + state); + sparkJobStatus.setError(he); running = false; done = true; rc = 2; @@ -181,6 +183,10 @@ public int startMonitor() { Thread.sleep(checkInterval); } } catch (Exception e) { + Exception finalException = e; + if (e instanceof InterruptedException) { + finalException = new HiveException(e, ErrorMsg.SPARK_JOB_INTERRUPTED); + } String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; msg = "Failed to monitor Job[" + sparkJobStatus.getJobId() + "]" + msg; @@ -190,7 +196,7 @@ public int startMonitor() { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; - sparkJobStatus.setError(e); + sparkJobStatus.setError(finalException); } finally { if (done) { break; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 67db303..d93bd8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -174,7 +175,8 @@ private SparkJobInfo getSparkJobInfo() throws HiveException { return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Failed to get job info.", e); - throw new HiveException(e); + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT, + Long.toString(sparkClientTimeoutInSeconds)); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 47d2437..291ed85 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; @@ -129,6 +131,75 @@ public void testForceConfCloning() throws Exception { checkSparkConf(conf, sparkCloneConfiguration, "true"); } + @Test + public void testGetHiveException() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + SparkSessionManager ssm = SparkSessionManagerImpl.getInstance(); + SparkSessionImpl ss = (SparkSessionImpl) ssm.getSession( + null, conf, true); + + Throwable e; + + e = new TimeoutException(); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + + e = new InterruptedException(); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED); + + e = new RuntimeException("\t diagnostics: Application application_1508358311878_3322732 " + + "failed 1 times due to ApplicationMaster for attempt " + + "appattempt_1508358311878_3322732_000001 timed out. Failing the application."); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + + e = new RuntimeException("\t diagnostics: Application application_1508358311878_3330000 " + + "submitted by user hive to unknown queue: foo"); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, + "submitted by user hive to unknown queue: foo"); + + e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException: " + + "Queue root.foo is STOPPED. Cannot accept submission of application: " + + "application_1508358311878_3369187"); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, + "Queue root.foo is STOPPED"); + + e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException: " + + "Queue root.foo already has 10 applications, cannot accept submission of application: " + + "application_1508358311878_3384544"); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, + "Queue root.foo already has 10 applications"); + + e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException: " + + "Required executor memory (7168+10240 MB) is above the max threshold (16384 MB) of this " + + "cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + + "'yarn.nodemanager.resource.memory-mb'."); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, + "Required executor memory (7168+10240 MB) is above the max threshold (16384 MB)"); + + e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException: " + + "requirement failed: initial executor number 5 must between min executor number10 " + + "and max executor number 50"); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, + "initial executor number 5 must between min executor number10 and max executor number 50"); + + // Other exceptions which defaults to SPARK_CREATE_CLIENT_ERROR + e = new Exception(); + checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR); + } + + private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { + checkHiveException(ss, e, expectedErrMsg, null); + } + + private void checkHiveException(SparkSessionImpl ss, Throwable e, + ErrorMsg expectedErrMsg, String expectedMatchedStr) { + HiveException he = ss.getHiveException(e); + assertEquals(expectedErrMsg, he.getCanonicalErrorMsg()); + if (expectedMatchedStr != null) { + assertEquals(expectedMatchedStr, ss.getMatchedString()); + } + } + /** * Force a Spark config to be generated and check that a config value has the expected value * @param conf the Hive config to use as a base