diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e540d023bd..74033da37c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3024,6 +3024,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", false, "Whether to check user access to explicitly specified YARN queues. " + "yarn.resourcemanager.webapp.address must be configured to use this."), + HIVE_SERVER2_TEZ_PARALLEL_DEFAULT_SESSIONS_REUSE("hive.server2.tez.parallel.default.sessions.reuse", + false, + "Whether to allow parallel default sessions reused. If the session is still being used\n" + + "(e.g. hasn't been returned to the pool), this session will be skipped and an unused\n" + + "session will be returned from the pool."), HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h", new TimeValidator(TimeUnit.HOURS), "The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 2633390861..0ccccff351 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -447,8 +447,14 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf LOG.info("Current queue name is " + queueName + " incoming queue name is " + confQueueName); return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName); } else { - // this session should never be a default session unless something has messed up. - throw new HiveException("The pool session " + session + " should have been returned to the pool"); + boolean parallelDefaultSessionsReuse = conf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_PARALLEL_DEFAULT_SESSIONS_REUSE); + if (parallelDefaultSessionsReuse) { + LOG.info("Skipping a currently used session from user " + session.getUser()); + return false; + } else { + // this session should never be a default session unless parallel default sessions reuse is turned on + throw new HiveException("The pool session " + session + " should have been returned to the pool"); + } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d5b683f788..faa834ef61 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -260,6 +260,43 @@ public void testLlapSessionQueuing() { } } + @Test + public void testLlapDefaultSessionReuse() { + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default,llap,llap0,llap1"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_PARALLEL_DEFAULT_SESSIONS_REUSE, true); + poolManager = new TestTezSessionPoolManager(); + try { + poolManager.setupPool(conf); + poolManager.startPool(); + + } catch (Exception e) { + LOG.error("Initialization error", e); + fail(); + } + + TezSessionState session0 = null; + try { + // Given: a default session being used + session0 = poolManager.getSession(null, conf, true, true); + assertEquals("Session0 should be default", true, session0.isDefault()); + + // When: the same session is reused but hasn't been returned to the pool + TezSessionState session1 = poolManager.getSession(session0, conf, true, true); + assertEquals("Session1 should be default", true, session1.isDefault()); + + // Then: a new unused session should be returned from the pool + assertNotEquals("The new session should be a different session", + session1.getSessionId(), session0.getSessionId()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public class SessionThread implements Runnable { private boolean llap = false;