diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index b4d8ffa..dabca3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -23,12 +23,13 @@ import java.io.IOException; import java.net.URISyntaxException; - import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -77,7 +78,7 @@ private static final Random rdm = new Random(); private volatile SessionState initSessionState; - private BlockingQueue defaultQueuePool; + private BlockingDeque defaultQueuePool; /** Priority queue sorted by expiration time of live sessions that could be expired. */ private PriorityBlockingQueue expirationQueue; @@ -204,7 +205,7 @@ public void setupPool(HiveConf conf) throws InterruptedException { int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - defaultQueuePool = new ArrayBlockingQueue(numSessionsTotal); + defaultQueuePool = new LinkedBlockingDeque(numSessionsTotal); } numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); @@ -426,7 +427,7 @@ public void returnSession(TezSessionState tezSessionState, boolean llap) TezSessionPoolSession poolSession = (TezSessionPoolSession) tezSessionState; if (poolSession.returnAfterUse()) { - defaultQueuePool.put(poolSession); + defaultQueuePool.putFirst(poolSession); } } // non default session nothing changes. The user can continue to use the existing