commit 70bd58a68d1885b3b5850fb61816c4ccfab954cd Author: Sahil Takiar Date: Thu Mar 29 17:10:36 2018 -0700 HIVE-18916: SparkClientImpl doesn't error out if spark-submit fails diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 7d33fa3892..516b0047d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -576,7 +576,7 @@ SPARK_CREATE_CLIENT_INTERRUPTED(30040, "Interrupted while creating Spark client for session {0}", true), SPARK_CREATE_CLIENT_ERROR(30041, - "Failed to create Spark client for Spark session {0}", true), + "Failed to create Spark client for Spark session {0} with error {1}", true), SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042, "Failed to create Spark client due to invalid resource request: {0}", true), SPARK_CREATE_CLIENT_CLOSED_SESSION(30043, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index bfa2da68e1..8dc6334dfa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -186,12 +186,7 @@ public int execute(DriverContext driverContext) { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - LOG.error(msg, e); + LOG.error("Failed to execute spark task", e); setException(e); if (e instanceof HiveException) { HiveException he = (HiveException) e; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 2d5d03ee71..88ccbe5971 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -212,13 +212,13 @@ HiveException getHiveException(Throwable e) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, matchedString); } else { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, e.getMessage()); } } e = e.getCause(); } - return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId); + return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, oe.getMessage()); } @VisibleForTesting diff --git a/ql/src/test/queries/clientnegative/spark_submit_fail.q b/ql/src/test/queries/clientnegative/spark_submit_fail.q new file mode 100644 index 0000000000..63193026b8 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_submit_fail.q @@ -0,0 +1,3 @@ +set spark.executor.cores=-1; + +select * from src order by key; diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_fail.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_fail.q.out new file mode 100644 index 0000000000..fcba34ba6e --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_submit_fail.q.out @@ -0,0 +1,5 @@ +PREHOOK: query: select * from src order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 64c50cf6-e87f-4a5e-a170-849ed312bdcf with error java.util.concurrent.ExecutionException: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor cores must be a positive number" diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index e4f72a3531..c19cc8a8ef 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -112,10 +112,8 @@ errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " + "such as in case of query cancellation." + "\nPlease refer to HiveServer2 logs for further information."; - } else { - errorMsg = "Error while waiting for client to connect."; } - LOG.error(errorMsg, e); + driverThread.interrupt(); try { driverThread.join(); @@ -123,6 +121,9 @@ // Give up. LOG.warn("Interrupted before driver thread was finished.", ie); } + if (errorMsg != null) { + throw new RuntimeException(errorMsg, e); + } throw Throwables.propagate(e); } @@ -476,18 +477,21 @@ public void run() { try { int exitCode = child.waitFor(); if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - Iterator iter = childErrorLog.iterator(); - while(iter.hasNext()){ - errStr.append(iter.next()); - errStr.append('\n'); + List errorMessages = new ArrayList<>(); + synchronized (childErrorLog) { + Iterator iter = childErrorLog.iterator(); + while (iter.hasNext()) { + String line = iter.next(); + if (StringUtils.containsIgnoreCase(line, "Error")) { + errorMessages.add("\"" + line + "\""); + } } } - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); + String errStr = errorMessages.isEmpty() ? "?" : Joiner.on(',').join(errorMessages); + + rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " + + "with exit code " + exitCode + " and error " + errStr)); } } catch (InterruptedException ie) { LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 6c6ab74ce7..f761b22f8d 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -197,9 +197,9 @@ public void operationComplete(Promise p) { /** * Tells the RPC server to cancel the connection from an existing pending client * @param clientId The identifier for the client - * @param msg The error message about why the connection should be canceled + * @param failure The error about why the connection should be canceled */ - public void cancelClient(final String clientId, final String msg) { + public void cancelClient(final String clientId, final Throwable failure) { final ClientInfo cinfo = pendingClients.remove(clientId); if (cinfo == null) { // Nothing to be done here. @@ -207,11 +207,20 @@ public void cancelClient(final String clientId, final String msg) { } cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { - cinfo.promise.setFailure(new RuntimeException( - String.format("Cancel client '%s'. Error: " + msg, clientId))); + cinfo.promise.setFailure(failure); } } + /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + cancelClient(clientId, + new RuntimeException(String.format("Cancel client '%s'. Error: " + msg, clientId))); + } + /** * Creates a secret for identifying a client connection. */