Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1539825) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -759,9 +759,15 @@ // Configuration for async thread pool in SessionManager // Number of async threads - HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 50), + HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100), // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L), + // Size of the wait queue for async thread pool in HiveServer2. + // After hitting this limit, the async thread pool will reject new requests. + HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE("hive.server2.async.exec.wait.queue.size", 100), + // Number of seconds that an idle HiveServer2 async thread (from the thread pool) + // will wait for a new task to arrive before terminating + HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10), // HiveServer2 auth configuration Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1539825) +++ conf/hive-default.xml.template (working copy) @@ -1897,7 +1897,7 @@ hive.server2.async.exec.threads - 50 + 100 Number of threads in the async thread pool for HiveServer2 @@ -1904,11 +1904,25 @@ hive.server2.async.exec.shutdown.timeout 10 - Time (in seconds) for which HiveServer2 shutdown will wait for async + Time (in seconds) for which HiveServer2 shutdown will wait for async threads to terminate + hive.server2.async.exec.keepalive.time + 10 + Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait + for a new task to arrive before terminating + + + + hive.server2.async.exec.wait.queue.size + 100 + Size of the wait queue for async thread pool in HiveServer2. + After hitting this limit, the async thread pool will reject new requests. + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. Index: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (revision 1539825) +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (working copy) @@ -165,7 +165,8 @@ getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); - throw new HiveSQLException(rejected); + throw new HiveSQLException("All the asynchronous threads are currently busy, " + + "please retry the operation", rejected); } } } Index: service/src/java/org/apache/hive/service/cli/session/SessionManager.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java (revision 1539825) +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java (working copy) @@ -21,16 +21,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.hooks.HookUtils; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -46,7 +46,7 @@ private final Map handleToSession = new HashMap(); private OperationManager operationManager = new OperationManager(); private static final Object sessionMapLock = new Object(); - private ExecutorService backgroundOperationPool; + private ThreadPoolExecutor backgroundOperationPool; public SessionManager() { super("SessionManager"); @@ -57,8 +57,17 @@ this.hiveConf = hiveConf; operationManager = new OperationManager(); int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); - backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); + LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize); + int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); + LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize); + int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); + LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime); + // Create a thread pool with #backgroundPoolSize threads + // Threads terminate when they are idle for more than the keepAliveTime + // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); + backgroundOperationPool.allowCoreThreadTimeOut(true); addService(operationManager); super.init(hiveConf); } @@ -66,26 +75,23 @@ @Override public synchronized void start() { super.start(); - // TODO } @Override public synchronized void stop() { - // TODO super.stop(); if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); - long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); - } catch (InterruptedException exc) { + } catch (InterruptedException e) { LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + - " seconds has been exceeded. RUNNING background operations will be shut down", exc); + " seconds has been exceeded. RUNNING background operations will be shut down", e); } } } - public SessionHandle openSession(String username, String password, Map sessionConf) throws HiveSQLException { return openSession(username, password, sessionConf, false, null); @@ -129,7 +135,6 @@ session.close(); } - public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { HiveSession session; synchronized(sessionMapLock) {