commit 16239e1beca33fd95426b89d9c1e07b0d7472cf3 Author: Sahil Takiar Date: Thu Mar 29 17:10:36 2018 -0700 HIVE-18916: SparkClientImpl doesn't error out if spark-submit fails diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index a3ddbda27f..14e2f0a3af 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1671,7 +1671,9 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ spark.only.query.negative.files=spark_job_max_tasks.q,\ spark_stage_max_tasks.q,\ - spark_task_failure.q + spark_task_failure.q,\ + spark_submit_negative_executor_cores.q,\ + spark_submit_negative_executor_memory.q spark.perf.disabled.query.files=query14.q,\ query64.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java index 359f02759f..1e4cddd4ec 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java @@ -273,6 +273,8 @@ public LineProcessingResult processLine(String line, boolean maskStats, boolean ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9_]+"), "attempt_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#")); + ppm.add(new PatternReplacementPair(Pattern.compile("for Spark session.*?:"), + "#SPARK_SESSION_ID#:")); partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); } /* This list may be modified by specific cli drivers to mask strings that change on every test */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 02613f2ca3..ad5049a3e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -180,7 +180,7 @@ public int execute(DriverContext driverContext) { killJob(); } else if (rc == 4) { LOG.info("The Spark job or one stage of it has too many tasks" + - ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID); killJob(); } @@ -189,12 +189,7 @@ public int execute(DriverContext driverContext) { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'"; - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - LOG.error(msg, e); + LOG.error("Failed to execute Spark task \"" + getId() + "\"", e); setException(e); if (e instanceof HiveException) { HiveException he = (HiveException) e; @@ -609,7 +604,7 @@ void setSparkException(SparkJobStatus sparkJobStatus, int rc) { private boolean isTaskFailure(Throwable error) { Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times"); while (error != null) { - if (taskFailedPattern.matcher(error.getMessage()).find()) { + if (error.getMessage() != null && taskFailedPattern.matcher(error.getMessage()).find()) { return true; } error = error.getCause(); @@ -621,8 +616,8 @@ private boolean isOOMError(Throwable error) { while (error != null) { if (error instanceof OutOfMemoryError) { return true; - } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " + - "limits")) { + } else if (error.getMessage() != null && error.getMessage().contains("Container killed by " + + "YARN for exceeding memory limits")) { return true; } error = error.getCause(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index c8cb1ac08c..6e37d93f67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -214,13 +214,19 @@ HiveException getHiveException(Throwable e) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, sessionId, matchedString.toString()); } else { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, + getRootCause(oe)); } } e = e.getCause(); } - return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(oe).getMessage()); + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, getRootCause(oe)); + } + + private String getRootCause(Throwable e) { + Throwable rootCause = Throwables.getRootCause(e); + return rootCause.getClass().getName() + ": " + rootCause.getMessage(); } private boolean matches(String input, String regex, StringBuilder matchedString) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 004b50ba95..560fb58670 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -76,7 +76,6 @@ public int startMonitor() { if ((timeCount > monitorTimeoutInterval)) { HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT, Long.toString(timeCount)); - console.printError(he.getMessage()); sparkJobStatus.setMonitorError(he); running = false; done = true; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java new file mode 100644 index 0000000000..bcc0924a08 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TestSparkInvalidFileFormat { + + @Test + public void readTextFileAsParquet() throws IOException { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local"); + + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkInvalidFileFormat-tmp"); + + File testFile = new File(conf.get("test.data.files"), "kv1.txt"); + + SessionState.start(conf); + + IDriver driver = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, + driver.run("CREATE TABLE test_table (key STRING, value STRING)").getResponseCode()); + Assert.assertEquals(0, driver.run( + "LOAD DATA LOCAL INPATH '" + testFile + "' INTO TABLE test_table").getResponseCode()); + Assert.assertEquals(0, + driver.run("ALTER TABLE test_table SET FILEFORMAT parquet").getResponseCode()); + Throwable exception = driver.run( + "SELECT * FROM test_table ORDER BY key LIMIT 10").getException(); + Assert.assertTrue(exception instanceof HiveException); + Assert.assertTrue(exception.getMessage().contains("Spark job failed due to task failures")); + Assert.assertTrue(exception.getMessage().contains("kv1.txt is not a Parquet file. expected " + + "magic number at tail [80, 65, 82, 49] but found [95, 57, 55, 10]")); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("DROP TABLE IF EXISTS test_table").getResponseCode()); + driver.destroy(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +} diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q new file mode 100644 index 0000000000..5a92390007 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.cores=-1; + +select * from src order by key limit 10; diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q new file mode 100644 index 0000000000..55bc4b8ae2 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q @@ -0,0 +1,5 @@ +--! qt:dataset:src + +set spark.executor.memory=-1; + +select * from src order by key limit 10; diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out new file mode 100644 index 0000000000..12d6738efb --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_invalid_executor_instances.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 39fdd5cc-59e0-4dbe-b61b-c708b384905a: java.lang.NumberFormatException: For input string: "hello" diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out new file mode 100644 index 0000000000..47ac8b29fb --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor cores must be a positive number" diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out new file mode 100644 index 0000000000..4aa1337f17 --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor Memory cores must be a positive number" diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index ed9222cfec..b2b5201c98 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -24,7 +24,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Resources; @@ -41,8 +40,6 @@ import java.io.Writer; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -125,17 +122,19 @@ protected AbstractSparkClient(RpcServer rpcServer, Map conf, Hiv } else { errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } - LOG.error(errorMsg, e); - driverFuture.cancel(true); - try { - driverFuture.get(); - } catch (InterruptedException ie) { - // Give up. - LOG.warn("Interrupted before driver thread was finished.", ie); - } catch (ExecutionException ee) { - LOG.error("Driver thread failed", ee); + if (driverFuture.isDone()) { + try { + driverFuture.get(); + } catch (InterruptedException ie) { + // Give up. + LOG.warn("Interrupted before driver thread was finished.", ie); + } catch (ExecutionException ee) { + LOG.error("Driver thread failed", ee); + } + } else { + driverFuture.cancel(true); } - throw Throwables.propagate(e); + throw new RuntimeException(errorMsg, e); } LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java index 31e89b8fa0..7a6e77bdc6 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java @@ -32,6 +32,8 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import org.apache.commons.lang3.StringUtils; + import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -200,19 +202,19 @@ private String getSparkJobCredentialProviderPassword() { try { int exitCode = child.waitFor(); if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - for (Object aChildErrorLog : childErrorLog) { - errStr.append(aChildErrorLog); - errStr.append('\n'); + List errorMessages = new ArrayList<>(); + synchronized (childErrorLog) { + for (String line : childErrorLog) { + if (StringUtils.containsIgnoreCase(line, "Error")) { + errorMessages.add("\"" + line + "\""); + } } } - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); - } else { - LOG.info("Child process (spark-submit) exited successfully."); + String errStr = errorMessages.isEmpty() ? "?" : Joiner.on(',').join(errorMessages); + + rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " + + "with exit code " + exitCode + " and error " + errStr)); } } catch (InterruptedException ie) { LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index babcb54b6f..0c67ffd813 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -202,11 +202,12 @@ public void operationComplete(Promise p) { } /** - * Tells the RPC server to cancel the connection from an existing pending client + * Tells the RPC server to cancel the connection from an existing pending client. + * * @param clientId The identifier for the client - * @param msg The error message about why the connection should be canceled + * @param failure The error about why the connection should be canceled */ - public void cancelClient(final String clientId, final String msg) { + public void cancelClient(final String clientId, final Throwable failure) { final ClientInfo cinfo = pendingClients.remove(clientId); if (cinfo == null) { // Nothing to be done here. @@ -214,11 +215,21 @@ public void cancelClient(final String clientId, final String msg) { } cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { - cinfo.promise.setFailure(new RuntimeException( - String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + cinfo.promise.setFailure(failure); } } + /** + * Tells the RPC server to cancel the connection from an existing pending client. + * + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + cancelClient(clientId, new RuntimeException(String.format( + "Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); + } + /** * Creates a secret for identifying a client connection. */