diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index fae6393..d7148bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -322,23 +322,39 @@ private TezSessionState getNewSessionState(HiveConf conf, public void returnSession(TezSessionState tezSessionState, boolean llap) throws Exception { - if (llap && (this.numConcurrentLlapQueries > 0)) { - llapQueue.release(); - } - if (tezSessionState.isDefault() && tezSessionState instanceof TezSessionPoolSession) { - LOG.info("The session " + tezSessionState.getSessionId() - + " belongs to the pool. Put it back in"); - SessionState sessionState = SessionState.get(); - if (sessionState != null) { - sessionState.setTezSession(null); + // Ignore the interrupt status while returning the session, but set it back + // on the thread in case anything else needs to deal with it. + boolean isInterrupted = Thread.interrupted(); + + try { + if (isInterrupted) { + LOG.info("returnSession invoked with interrupt status set"); + } + if (llap && (this.numConcurrentLlapQueries > 0)) { + llapQueue.release(); } - TezSessionPoolSession poolSession = (TezSessionPoolSession)tezSessionState; - if (poolSession.returnAfterUse()) { - defaultQueuePool.put(poolSession); + if (tezSessionState.isDefault() && + tezSessionState instanceof TezSessionPoolSession) { + LOG.info("The session " + tezSessionState.getSessionId() + + " belongs to the pool. Put it back in"); + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + sessionState.setTezSession(null); + } + TezSessionPoolSession poolSession = + (TezSessionPoolSession) tezSessionState; + if (poolSession.returnAfterUse()) { + defaultQueuePool.put(poolSession); + } + } + // non default session nothing changes. The user can continue to use the existing + // session in the SessionState + } finally { + // Reset the interrupt status. + if (isInterrupted) { + Thread.currentThread().interrupt(); } } - // non default session nothing changes. The user can continue to use the existing - // session in the SessionState } public static void closeIfNotDefault( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 0efca68..7479b85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -190,12 +190,18 @@ public int execute(DriverContext driverContext) { counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); } catch (Exception err) { // Don't fail execution due to counters - just don't print summary info - LOG.error("Failed to get counters: " + err, err); + LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete. " + err, err); counters = null; } } finally { // We return this to the pool even if it's unusable; reopen is supposed to handle this. - TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); + try { + TezSessionPoolManager.getInstance() + .returnSession(session, getWork().getLlapMode()); + } catch (Exception e) { + LOG.error("Failed to return session: {} to pool", session, e); + throw e; + } } if (LOG.isInfoEnabled() && counters != null