diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b13f73b9ea..9fdcc85702 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -180,7 +180,11 @@ public boolean isOpening() { return false; } try { - session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + TezClient session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + if (session == null) { + return false; + } + this.session = session; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; @@ -202,7 +206,11 @@ public boolean isOpen() { return false; } try { - session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + TezClient session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + if (session == null) { + return false; + } + this.session = session; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; @@ -363,12 +371,23 @@ protected void openInternal(String[] additionalFilesNotFromConf, FutureTask sessionFuture = new FutureTask<>(new Callable() { @Override public TezClient call() throws Exception { + TezClient result = null; try { - return startSessionAndContainers(session, conf, commonLocalResources, tezConfig, true); + result = startSessionAndContainers( + session, conf, commonLocalResources, tezConfig, true); } catch (Throwable t) { + // The caller has already stopped the session. LOG.error("Failed to start Tez session", t); throw (t instanceof Exception) ? (Exception)t : new Exception(t); } + // Check interrupt at the last moment in case we get cancelled quickly. + // This is not bulletproof but should allow us to close session in most cases. + if (Thread.interrupted()) { + LOG.info("Interrupted while starting Tez session"); + closeAndIgnoreExceptions(result); + return null; + } + return result; } }); new Thread(sessionFuture, "Tez session start thread").start(); @@ -471,7 +490,11 @@ public void endOpen() throws InterruptedException, CancellationException { return; } try { - this.session = this.sessionFuture.get(); + TezClient session = this.sessionFuture.get(); + if (session == null) { + throw new RuntimeException("Initialization was interrupted"); + } + this.session = session; } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -643,7 +666,7 @@ void close(boolean keepDagFilesDir) throws Exception { appJarLr = null; try { - if (getSession() != null) { + if (session != null) { LOG.info("Closing Tez Session"); closeClient(session); session = null;