diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fb3570b..90b983b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -760,6 +760,9 @@ HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 50), // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10), + // Number of seconds that an idle HiveServer2 async thread (from the thread pool) + // will wait for a new task to arrive before terminating + HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10), // HiveServer2 auth configuration diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index b7234b4..f447a55 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1909,6 +1909,13 @@ + hive.server2.async.exec.keepalive.time + 10 + Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait + for a new task to arrive before terminating + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index f392d62..bf77b1b 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -21,16 +21,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -46,7 +46,7 @@ private final Map handleToSession = new HashMap(); private OperationManager operationManager = new OperationManager(); private static final Object sessionMapLock = new Object(); - private ExecutorService backgroundOperationPool; + private ThreadPoolExecutor backgroundOperationPool; public SessionManager() { super("SessionManager"); @@ -57,8 +57,15 @@ public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; operationManager = new OperationManager(); int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); - backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); + LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize); + int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); + LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime); + // Create a thread pool with #backgroundPoolSize threads + // Threads terminate when they are idle for more than the keepAliveTime + // An unbounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue()); + backgroundOperationPool.allowCoreThreadTimeOut(true); addService(operationManager); super.init(hiveConf); } @@ -66,26 +73,23 @@ public synchronized void init(HiveConf hiveConf) { @Override public synchronized void start() { super.start(); - // TODO } @Override public synchronized void stop() { - // TODO super.stop(); if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); - long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); - } catch (InterruptedException exc) { + } catch (InterruptedException e) { LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + - " seconds has been exceeded. RUNNING background operations will be shut down", exc); + " seconds has been exceeded. RUNNING background operations will be shut down", e); } } } - public SessionHandle openSession(String username, String password, Map sessionConf) throws HiveSQLException { return openSession(username, password, sessionConf, false, null); @@ -129,7 +133,6 @@ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { session.close(); } - public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { HiveSession session; synchronized(sessionMapLock) {