commit b201e45d8409d93ce9ea5c2cc50f9a713077770d Author: Sahil Takiar Date: Mon Feb 19 13:14:59 2018 +0000 HIVE-18093: Improve logging when HoS application is killed diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 7ca89ed275..4dbc4908bf 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -49,12 +49,8 @@ final String cause; - Error(Throwable cause) { - if (cause == null) { - this.cause = ""; - } else { - this.cause = Throwables.getStackTraceAsString(cause); - } + Error(String cause) { + this.cause = cause; } Error() { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e584cbb0a7..753d800f00 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -96,6 +96,8 @@ private RemoteDriver(String[] args) throws Exception { this.shutdownLock = new Object(); localTmpDir = Files.createTempDir(); + addShutdownHook(); + SparkConf conf = new SparkConf(); String serverAddress = null; int serverPort = -1; @@ -178,6 +180,17 @@ public void rpcClosed(Rpc rpc) { } } + private void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (running) { + LOG.info("Received signal SIGTERM, attempting safe shutdown of Remote Spark Context"); + protocol.sendErrorMessage("Remote Spark Context was shutdown because it received a SIGTERM " + + "signal. Most likely due to a kill request via YARN."); + shutdown(null); + } + })); + } + private void run() throws InterruptedException { synchronized (shutdownLock) { while (running) { @@ -249,7 +262,12 @@ private String getArg(String[] args, int keyIdx) { void sendError(Throwable error) { LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error)); - clientRpc.call(new Error(error)); + clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); + } + + void sendErrorMessage(String cause) { + LOG.debug("Send error to Client: {}", cause); + clientRpc.call(new Error(cause)); } void jobFinished(String jobId, T result, @@ -390,7 +408,7 @@ public void call(JavaFutureAction future, // Catch throwables in a best-effort to report job status back to the client. It's // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). - LOG.info("Failed to run job " + req.id, t); + LOG.error("Failed to run job " + req.id, t); protocol.jobFinished(req.id, null, t, sparkCounters != null ? sparkCounters.snapshot() : null); throw new ExecutionException(t); @@ -515,8 +533,18 @@ private String getClientId(Integer jobId) { } public static void main(String[] args) throws Exception { - new RemoteDriver(args).run(); + RemoteDriver rd = new RemoteDriver(args); + try { + rd.run(); + } catch (Exception e) { + // If the main thread throws an exception for some reason, propagate the exception to the + // client and initiate a safe shutdown + if (rd.running) { + rd.protocol.sendError(e); + rd.shutdown(null); + } + throw e; + } } - }