diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 917268f..4d5a6db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -285,8 +285,9 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen, */ if (forceCreate || nonDefaultUser || !hasInitialSessions || ((queueName != null) && !queueName.isEmpty())) { - LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser + - " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + hasInitialSessions); + LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} hasInitialSessions: {}" + + " forceCreate: {}", queueName, nonDefaultUser, defaultQueuePool, hasInitialSessions, + forceCreate); return getNewSessionState(conf, queueName, doOpen); } @@ -463,7 +464,11 @@ public void reopenSession(TezSessionState sessionState, HiveConf conf, String[] additionalFiles, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) != null) { + // user has explicitly specified queue name conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME)); + } else { + // default queue name when the initial session was created + conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); } // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2607db1..38250f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -395,6 +395,15 @@ private TezClient startSessionAndContainers(TezClient session, HiveConf conf, //ignore } isSuccessful = true; + // sessionState.getQueueName() comes from cluster wide configured queue names. + // sessionState.getConf().get("tez.queue.name") is explicitly set by user in a session. + // TezSessionPoolManager sets tez.queue.name if user has specified one or use the one from + // cluster wide queue names. + // There is no way to differentiate how this was set (user vs system). + // Unset this after opening the session so that reopening of session uses the correct queue + // names i.e, if client has not died and if the user has explicitly set a queue name + // then reopened session will use user specified queue name else default cluster queue names. + conf.unset(TezConfiguration.TEZ_QUEUE_NAME); return session; } finally { if (isOnThread && !isSuccessful) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index ec90801..2bddae9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -20,10 +20,12 @@ import static org.junit.Assert.*; +import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -144,6 +146,61 @@ public void testSessionPoolThreads() { } @Test + public void testSessionReopen() { + try { + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default,tezq1"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); + + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.getQueueName()).thenReturn("default"); + Mockito.when(session.isDefault()).thenReturn(false); + Mockito.when(session.getConf()).thenReturn(conf); + + poolManager.reopenSession(session, conf, null, false); + + Mockito.verify(session).close(false); + String[] files = null; + Mockito.verify(session).open(conf, files); + + // mocked session starts with default queue + assertEquals("default", session.getQueueName()); + + // user explicitly specified queue name + conf.set("tez.queue.name", "tezq1"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user unsets queue name, will fallback to default session queue + conf.unset("tez.queue.name"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); + + // session.open will unset the queue name from conf but Mockito intercepts the open call + // and does not call the real method, so explicitly unset the queue name here + conf.unset("tez.queue.name"); + // change session's default queue to tezq1 and rerun test sequence + Mockito.when(session.getQueueName()).thenReturn("tezq1"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user sets default queue now + conf.set("tez.queue.name", "default"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user does not specify queue so use session default + conf.unset("tez.queue.name"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test public void testLlapSessionQueuing() { try { random = new Random(1000); @@ -236,6 +293,7 @@ public void testReturn() { public void testCloseAndOpenDefault() throws Exception { poolManager = new TestTezSessionPoolManager(); TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.getQueueName()).thenReturn("default"); Mockito.when(session.isDefault()).thenReturn(false); poolManager.reopenSession(session, conf, null, false); @@ -259,6 +317,7 @@ public void testCloseAndOpenWithResources() throws Exception { poolManager = new TestTezSessionPoolManager(); TezSessionState session = Mockito.mock(TezSessionState.class); Mockito.when(session.isDefault()).thenReturn(false); + Mockito.when(session.getQueueName()).thenReturn("default"); String[] extraResources = new String[] { "file:///tmp/foo.jar" }; poolManager.reopenSession(session, conf, extraResources, false);