commit 9f5ae117a64aa089028d5c603cc047a13a1fb6e0 Author: Sahil Takiar Date: Thu Mar 29 17:10:36 2018 -0700 HIVE-18916: SparkClientImpl doesn't error out if spark-submit fails diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index cf6d19a593..1d67909065 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1672,7 +1672,9 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ spark.only.query.negative.files=spark_job_max_tasks.q,\ spark_stage_max_tasks.q,\ - spark_task_failure.q + spark_task_failure.q,\ + spark_submit_negative_executor_cores.q,\ + spark_submit_negative_executor_memory.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java index 52d9668e28..86c0475139 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java @@ -239,6 +239,8 @@ public LineProcessingResult processLine(String line) { ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9_]+"), "attempt_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#")); + ppm.add(new PatternReplacementPair(Pattern.compile("for Spark session.*?:"), + "#SPARK_SESSION_ID#:")); partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); } /* This list may be modified by specific cli drivers to mask strings that change on every test */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 803877162d..1c137bde84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -186,12 +186,7 @@ public int execute(DriverContext driverContext) { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'"; - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - LOG.error(msg, e); + LOG.error("Failed to execute Spark task \"" + getId() + "\"", e); setException(e); if (e instanceof HiveException) { HiveException he = (HiveException) e; @@ -533,7 +528,7 @@ void setSparkException(SparkJobStatus sparkJobStatus, int rc) { private boolean isTaskFailure(Throwable error) { Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); while (error != null) { - if (taskFailedPattern.matcher(error.getMessage()).find()) { + if (error.getMessage() != null && taskFailedPattern.matcher(error.getMessage()).find()) { return true; } error = error.getCause(); @@ -545,8 +540,8 @@ private boolean isOOMError(Throwable error) { while (error != null) { if (error instanceof OutOfMemoryError) { return true; - } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " + - "limits")) { + } else if (error.getMessage() != null && error.getMessage().contains("Container killed by " + + "YARN for exceeding memory limits")) { return true; } error = error.getCause(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index c8cb1ac08c..0f24938a45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -214,13 +214,18 @@ HiveException getHiveException(Throwable e) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, sessionId, matchedString.toString()); } else { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, getRootCause(oe)); } } e = e.getCause(); } - return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(oe).getMessage()); + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, getRootCause(oe)); + } + + private String getRootCause(Throwable e) { + Throwable rootCause = Throwables.getRootCause(e); + return rootCause.getClass().getName() + ": " + rootCause.getMessage(); } private boolean matches(String input, String regex, StringBuilder matchedString) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 98c228b268..0d8e29601e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -76,7 +76,6 @@ public int startMonitor() { if ((timeCount > monitorTimeoutInterval)) { HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, Long.toString(timeCount)); - console.printError(he.getMessage()); sparkJobStatus.setMonitorError(he); running = false; done = true; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java new file mode 100644 index 0000000000..d518cedfe1 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java @@ -0,0 +1,63 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TestSparkInvalidFileFormat { + + @Test + public void readTextFileAsParquet() throws IOException { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local"); + + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkInvalidFileFormat-tmp"); + + File testFile = new File(conf.get("test.data.files"), "kv1.txt"); + + SessionState.start(conf); + + IDriver driver = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, + driver.run("CREATE TABLE test_table (key STRING, value STRING)").getResponseCode()); + Assert.assertEquals(0, driver.run( + "LOAD DATA LOCAL INPATH '" + testFile + "' INTO TABLE test_table").getResponseCode()); + Assert.assertEquals(0, + driver.run("ALTER TABLE test_table SET FILEFORMAT parquet").getResponseCode()); + Throwable exception = driver.run( + "SELECT * FROM test_table ORDER BY key LIMIT 10").getException(); + Assert.assertTrue(exception instanceof HiveException); + Assert.assertTrue(exception.getMessage().contains("Spark job failed due to task failures")); + Assert.assertTrue(exception.getMessage().contains("kv1.txt is not a Parquet file. expected " + + "magic number at tail [80, 65, 82, 49] but found [95, 57, 55, 10]")); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("DROP TABLE IF EXISTS test_table").getResponseCode()); + driver.destroy(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +} diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q new file mode 100644 index 0000000000..5a92390007 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.cores=-1; + +select * from src order by key limit 10; diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q new file mode 100644 index 0000000000..55bc4b8ae2 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.memory=-1; + +select * from src order by key limit 10; diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out new file mode 100644 index 0000000000..12d6738efb --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 39fdd5cc-59e0-4dbe-b61b-c708b384905a: java.lang.NumberFormatException: For input string: "hello" diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out new file mode 100644 index 0000000000..47ac8b29fb --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor cores must be a positive number" diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out new file mode 100644 index 0000000000..4aa1337f17 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor Memory cores must be a positive number" diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index f8b5d19e7a..b1b74ab275 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -115,7 +115,7 @@ } else { errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } - LOG.error(errorMsg, e); + driverThread.interrupt(); try { driverThread.join(); @@ -123,7 +123,7 @@ // Give up. LOG.warn("Interrupted before driver thread was finished.", ie); } - throw Throwables.propagate(e); + throw new RuntimeException(errorMsg, e); } LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); @@ -484,18 +484,21 @@ public void run() { try { int exitCode = child.waitFor(); if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - Iterator iter = childErrorLog.iterator(); - while(iter.hasNext()){ - errStr.append(iter.next()); - errStr.append('\n'); + List errorMessages = new ArrayList<>(); + synchronized (childErrorLog) { + Iterator iter = childErrorLog.iterator(); + while (iter.hasNext()) { + String line = iter.next(); + if (StringUtils.containsIgnoreCase(line, "Error")) { + errorMessages.add("\"" + line + "\""); + } } } - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); + String errStr = errorMessages.isEmpty() ? "?" : Joiner.on(',').join(errorMessages); + + rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " + + "with exit code " + exitCode + " and error " + errStr)); } } catch (InterruptedException ie) { LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index f1383d6f18..1b60e33d8c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -203,9 +203,9 @@ public void operationComplete(Promise p) { /** * Tells the RPC server to cancel the connection from an existing pending client * @param clientId The identifier for the client - * @param msg The error message about why the connection should be canceled + * @param failure The error about why the connection should be canceled */ - public void cancelClient(final String clientId, final String msg) { + public void cancelClient(final String clientId, final Throwable failure) { final ClientInfo cinfo = pendingClients.remove(clientId); if (cinfo == null) { // Nothing to be done here. @@ -213,11 +213,20 @@ public void cancelClient(final String clientId, final String msg) { } cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { - cinfo.promise.setFailure(new RuntimeException( - String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + cinfo.promise.setFailure(failure); } } + /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + cancelClient(clientId, new RuntimeException(String.format( + "Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + } + /** * Creates a secret for identifying a client connection. */