diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 8f45947..b4d8ffa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -76,6 +76,7 @@ private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); private static final Random rdm = new Random(); + private volatile SessionState initSessionState; private BlockingQueue defaultQueuePool; /** Priority queue sorted by expiration time of live sessions that could be expired. */ @@ -136,6 +137,8 @@ private void startInitialSession(TezSessionPoolSession sessionState) throws Exce public void startPool() throws Exception { if (initialSessions.isEmpty()) return; + // Hive SessionState available at this point. + initSessionState = SessionState.get(); int threadCount = Math.min(initialSessions.size(), HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Preconditions.checkArgument(threadCount > 0); @@ -259,13 +262,27 @@ public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { expirationThread = new Thread(new Runnable() { @Override public void run() { - runExpirationThread(); + try { + SessionState.setCurrentSessionState(initSessionState); + runExpirationThread(); + } catch (Exception e) { + LOG.warn("Exception in TezSessionPool-expiration thread. Thread will shut down", e); + } finally { + LOG.info("TezSessionPool-expiration thread exiting"); + } } }, "TezSessionPool-expiration"); restartThread = new Thread(new Runnable() { @Override public void run() { - runRestartThread(); + try { + SessionState.setCurrentSessionState(initSessionState); + runRestartThread(); + } catch (Exception e) { + LOG.warn("Exception in TezSessionPool-cleanup thread. Thread will shut down", e); + } finally { + LOG.info("TezSessionPool-cleanup thread exiting"); + } } }, "TezSessionPool-cleanup"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index ed1ba9c..036e918 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -345,6 +345,7 @@ public TezClient call() throws Exception { String user, final Configuration conf) throws IOException { // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's // no good place for that right now (HIVE-13698). + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. SessionState session = SessionState.get(); boolean isInHs2 = session != null && session.isHiveServerQuery(); Token token = null; @@ -438,6 +439,7 @@ public void endOpen() throws InterruptedException, CancellationException { private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws IOException { + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. String user = SessionState.getUserFromAuthenticator(); UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser(); String loginUser = @@ -451,6 +453,7 @@ private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, user, loginUser); if (LOG.isDebugEnabled()) { + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. LOG.debug( "Setting Tez Session access for sessionId={} with viewAclString={}, modifyStr={}", SessionState.get().getSessionId(), viewStr, modifyStr); @@ -592,6 +595,7 @@ public LocalResource getAppJarLr() { */ private Path createTezDir(String sessionId) throws IOException { // tez needs its own scratch dir (per session) + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); tezDir = new Path(tezDir, sessionId); FileSystem fs = tezDir.getFileSystem(conf);