diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index b4d8ffa..dabca3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -23,12 +23,13 @@ import java.io.IOException; import java.net.URISyntaxException; - import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -77,7 +78,7 @@ private static final Random rdm = new Random(); private volatile SessionState initSessionState; - private BlockingQueue defaultQueuePool; + private BlockingDeque defaultQueuePool; /** Priority queue sorted by expiration time of live sessions that could be expired. */ private PriorityBlockingQueue expirationQueue; @@ -204,7 +205,7 @@ public void setupPool(HiveConf conf) throws InterruptedException { int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - defaultQueuePool = new ArrayBlockingQueue(numSessionsTotal); + defaultQueuePool = new LinkedBlockingDeque(numSessionsTotal); } numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); @@ -426,7 +427,7 @@ public void returnSession(TezSessionState tezSessionState, boolean llap) TezSessionPoolSession poolSession = (TezSessionPoolSession) tezSessionState; if (poolSession.returnAfterUse()) { - defaultQueuePool.put(poolSession); + defaultQueuePool.putFirst(poolSession); } } // non default session nothing changes. The user can continue to use the existing diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index cbd2bfe..5a3eba3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -88,24 +88,49 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(); + // this is now a LIFO operation + + // draw 1 and replace TezSessionState sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState, false); sessionState = poolManager.getSession(null, conf, true, false); - assertEquals("b", sessionState.getQueueName()); + assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true, false); - assertEquals("c", sessionState.getQueueName()); - poolManager.returnSession(sessionState, false); + // [a,b,c,a,b,c] - sessionState = poolManager.getSession(null, conf, true, false); - if (sessionState.getQueueName().compareTo("a") != 0) { - fail(); - } + // draw 2 and return in order - further run should return last returned + TezSessionState first = poolManager.getSession(null, conf, true, false); + TezSessionState second = poolManager.getSession(null, conf, true, false); + assertEquals("a", first.getQueueName()); + assertEquals("b", second.getQueueName()); + poolManager.returnSession(first, false); + poolManager.returnSession(second, false); + TezSessionState third = poolManager.getSession(null, conf, true, false); + assertEquals("b", third.getQueueName()); + poolManager.returnSession(third, false); - poolManager.returnSession(sessionState, false); + // [b,a,c,a,b,c] + + first = poolManager.getSession(null, conf, true, false); + second = poolManager.getSession(null, conf, true, false); + third = poolManager.getSession(null, conf, true, false); + + assertEquals("b", first.getQueueName()); + assertEquals("a", second.getQueueName()); + assertEquals("c", third.getQueueName()); + + poolManager.returnSession(first, false); + poolManager.returnSession(second, false); + poolManager.returnSession(third, false); + + // [c,a,b,a,b,c] + + first = poolManager.getSession(null, conf, true, false); + assertEquals("c", third.getQueueName()); + poolManager.returnSession(first, false); } catch (Exception e) { e.printStackTrace();