diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..8650a06 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -54,7 +54,6 @@ * Hive Configuration. */ public class HiveConf extends Configuration { - protected String hiveJar; protected Properties origProp; protected String auxJars; @@ -1476,8 +1475,11 @@ "Minimum number of worker threads when in HTTP mode."), HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500, "Maximum number of worker threads when in HTTP mode."), - HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, + HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."), + HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60, + "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " + + "excess threads are killed after this time interval."), // binary transport settings HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, @@ -1500,7 +1502,9 @@ "Minimum number of Thrift worker threads"), HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500, "Maximum number of Thrift worker threads"), - + HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60, + "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " + + "excess threads are killed after this time interval."), // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 06d7595..9e3481a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -250,7 +250,7 @@ public TTransport getTransport(TTransport trans) { private static String currentUrl; private Warehouse wh; // hdfs warehouse - private final ThreadLocal threadLocalMS = + private static final ThreadLocal threadLocalMS = new ThreadLocal() { @Override protected synchronized RawStore initialValue() { @@ -265,6 +265,14 @@ protected synchronized TxnHandler initialValue() { } }; + public static RawStore getRawStore() { + return threadLocalMS.get(); + } + + public static void removeRawStore() { + threadLocalMS.remove(); + } + // Thread local configuration is needed as many threads could make changes // to the conf using the connection hook private final ThreadLocal threadLocalConf = @@ -384,6 +392,7 @@ public HiveConf getHiveConf() { } } + @Override public void init() throws MetaException { rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); initListeners = MetaStoreUtils.getMetaStoreListeners( diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 0693039..e8f16a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -252,6 +252,8 @@ private void initialize(Properties dsProps) { expressionProxy = createExpressionProxy(hiveConf); directSql = new MetaStoreDirectSql(pm); } + LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + + " created in the thread with id: " + Thread.currentThread().getId()); } /** @@ -343,6 +345,8 @@ public PersistenceManager getPersistenceManager() { @Override public void shutdown() { if (pm != null) { + LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm + + " will be shutdown"); pm.close(); } } diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 08ed2e7..53a33dd 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -66,7 +66,6 @@ private HiveConf hiveConf; private SessionManager sessionManager; - private IMetaStoreClient metastoreClient; private UserGroupInformation serviceUGI; private UserGroupInformation httpUGI; @@ -128,21 +127,23 @@ public synchronized void start() { } catch (IOException eIO) { throw new ServiceException("Error setting stage directories", eIO); } - + // Initialize and test a connection to the metastore + IMetaStoreClient metastoreClient = null; try { - // Initialize and test a connection to the metastore metastoreClient = new HiveMetaStoreClient(hiveConf); metastoreClient.getDatabases("default"); } catch (Exception e) { throw new ServiceException("Unable to connect to MetaStore!", e); } + finally { + if (metastoreClient != null) { + metastoreClient.close(); + } + } } @Override public synchronized void stop() { - if (metastoreClient != null) { - metastoreClient.close(); - } super.stop(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index de54ca1..bb6de8a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * SQLOperation. @@ -171,27 +172,23 @@ public void run() throws HiveSQLException { if (!shouldRunAsync()) { runInternal(opConfig); } else { + // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); - // current Hive object needs to be set in aysnc thread in case of remote metastore. - // The metastore client in Hive is associated with right user - final Hive sessionHive = getCurrentHive(); - // current UGI will get used by metastore when metsatore is in embedded mode - // so this needs to get passed to the new async thread + // ThreadLocal Hive object needs to be set in background thread. + // The metastore client in Hive is associated with right user. + final Hive parentHive = getSessionHive(); + // Current UGI will get used by metastore when metsatore is in embedded mode + // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); - // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { - @Override public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - - // Storing the current Hive object necessary when doAs is enabled - // User information is part of the metastore client member in Hive - Hive.set(sessionHive); + Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); try { runInternal(opConfig); @@ -202,12 +199,25 @@ public Object run() throws HiveSQLException { return null; } }; + try { ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } + } } }; try { @@ -223,6 +233,12 @@ public Object run() throws HiveSQLException { } } + /** + * Returns the current UGI on the stack + * @param opConfig + * @return UserGroupInformation + * @throws HiveSQLException + */ private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { try { return ShimLoader.getHadoopShims().getUGIForConf(opConfig); @@ -231,11 +247,16 @@ private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLExce } } - private Hive getCurrentHive() throws HiveSQLException { + /** + * Returns the ThreadLocal Hive for the current thread + * @return Hive + * @throws HiveSQLException + */ + private Hive getSessionHive() throws HiveSQLException { try { return Hive.get(); } catch (HiveException e) { - throw new HiveSQLException("Failed to get current Hive object", e); + throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); } } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index bc0a02c..a5f5f5e 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -62,6 +62,7 @@ import org.apache.hive.service.cli.operation.MetadataOperation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; /** * HiveSession @@ -95,14 +96,19 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; - // set an explicit session name to control the download directory name + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); - // use thrift transportable formatter + // Use thrift transportable formatter hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, FetchFormatter.ThriftFormatter.class.getName()); hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue()); + /** + * Create a new SessionState object that will be associated with this HiveServer2 session. + * When the server executes multiple queries in the same session, + * this SessionState object is reused across multiple queries. + */ sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); @@ -111,11 +117,9 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo @Override public void initialize(Map sessionConfMap) throws Exception { - //process global init file: .hiverc + // Process global init file: .hiverc processGlobalInitFile(); - SessionState.setCurrentSessionState(sessionState); - - //set conf properties specified by user from client side + // Set conf properties specified by user from client side if (sessionConfMap != null) { configureSession(sessionConfMap); } @@ -169,6 +173,7 @@ private void processGlobalInitFile() { } private void configureSession(Map sessionConfMap) throws Exception { + SessionState.setCurrentSessionState(sessionState); for (Map.Entry entry : sessionConfMap.entrySet()) { String key = entry.getKey(); if (key.startsWith("set:")) { @@ -211,14 +216,26 @@ public void open() { } protected synchronized void acquire() throws HiveSQLException { - // need to make sure that the this connections session state is - // stored in the thread local for sessions. + // Need to make sure that the this HiveServer2's session's session state is + // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); } + /** + * 1. We'll remove the ThreadLocal SessionState as this thread might now serve + * other requests. + * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ protected synchronized void release() { assert sessionState != null; SessionState.detachSession(); + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } } @Override @@ -468,7 +485,7 @@ public void close() throws HiveSQLException { try { acquire(); /** - * For metadata operations like getTables(), getColumns() etc, + * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be * closed at the end of the session */ diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index d573592..954fb19 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -38,6 +38,7 @@ import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; /** * SessionManager. @@ -64,22 +65,27 @@ public synchronized void init(HiveConf hiveConf) { } catch (HiveException e) { throw new RuntimeException("Error applying authorization policy on hive configuration", e); } - this.hiveConf = hiveConf; + createBackgroundOperationPool(); + addService(operationManager); + super.init(hiveConf); + } + + private void createBackgroundOperationPool() { int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize); + LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize); + LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime); + LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); // Create a thread pool with #backgroundPoolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + String threadPoolName = "HiveServer2-Background-Pool"; backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); - addService(operationManager); - super.init(hiveConf); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 37b05fc..e5ce72f 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -19,12 +19,17 @@ package org.apache.hive.service.cli.thrift; import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TThreadPoolServer; @@ -65,6 +70,11 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME); + String threadPoolName = "HiveServer2-Handler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); TServerSocket serverSocket = null; if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { @@ -84,8 +94,7 @@ public void run() { .processorFactory(processorFactory) .transportFactory(transportFactory) .protocolFactory(new TBinaryProtocol.Factory()) - .minWorkerThreads(minWorkerThreads) - .maxWorkerThreads(maxWorkerThreads); + .executorService(executorService); server = new TThreadPoolServer(sargs); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 027931e..5a77bd7 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -70,6 +70,7 @@ protected int minWorkerThreads; protected int maxWorkerThreads; + protected int workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index c380b69..21d1563 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -18,6 +18,11 @@ package org.apache.hive.service.cli.thrift; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; @@ -26,6 +31,7 @@ import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; @@ -36,7 +42,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class ThriftHttpCLIService extends ThriftCLIService { @@ -63,13 +69,17 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); + workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer = new org.eclipse.jetty.server.Server(); - QueuedThreadPool threadPool = new QueuedThreadPool(); - threadPool.setMinThreads(minWorkerThreads); - threadPool.setMaxThreads(maxWorkerThreads); + String threadPoolName = "HiveServer2-HttpHandler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName)); + + ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); SelectChannelConnector connector = new SelectChannelConnector();; diff --git a/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java b/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java new file mode 100644 index 0000000..ec19abc --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.server; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.hive.metastore.RawStore; + +/** + * A ThreadFactory for constructing new HiveServer2 threads that lets you plug + * in custom cleanup code to be called before this thread is GC-ed. + * Currently cleans up the following: + * 1. ThreadLocal RawStore object: + * In case of an embedded metastore, HiveServer2 threads (foreground & background) + * end up caching a ThreadLocal RawStore object. The ThreadLocal RawStore object has + * an instance of PersistenceManagerFactory & PersistenceManager. + * The PersistenceManagerFactory keeps a cache of PersistenceManager objects, + * which are only removed when PersistenceManager#close method is called. + * HiveServer2 uses ExecutorService for managing thread pools for foreground & background threads. + * ExecutorService unfortunately does not provide any hooks to be called, + * when a thread from the pool is terminated. + * As a solution, we're using this ThreadFactory to keep a cache of RawStore objects per thread. + * And we are doing clean shutdown in the finalizer for each thread. + */ +public class ThreadFactoryWithGarbageCleanup implements ThreadFactory { + + private static Map threadRawStoreMap = new HashMap(); + + private final String namePrefix; + + public ThreadFactoryWithGarbageCleanup(String threadPoolName) { + namePrefix = threadPoolName; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread newThread = new ThreadWithGarbageCleanup(runnable); + newThread.setName(namePrefix + ": Thread-" + newThread.getId()); + return newThread; + } + + public static Map getThreadRawStoreMap() { + return threadRawStoreMap; + } +} diff --git a/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java b/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java new file mode 100644 index 0000000..8ee9810 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.server; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.RawStore; + +/** + * A HiveServer2 thread used to construct new server threads. + * In particular, this thread ensures an orderly cleanup, + * when killed by its corresponding ExecutorService. + */ +public class ThreadWithGarbageCleanup extends Thread { + private static final Log LOG = LogFactory.getLog(ThreadWithGarbageCleanup.class); + + Map threadRawStoreMap = + ThreadFactoryWithGarbageCleanup.getThreadRawStoreMap(); + + public ThreadWithGarbageCleanup(Runnable runnable) { + super(runnable); + } + + /** + * Add any Thread specific garbage cleanup code here. + * Currently, it shuts down the RawStore object for this thread if it is not null. + */ + @Override + public void finalize() throws Throwable { + cleanRawStore(); + super.finalize(); + } + + private void cleanRawStore() { + Long threadId = this.getId(); + RawStore threadLocalRawStore = threadRawStoreMap.get(threadId); + if (threadLocalRawStore != null) { + LOG.debug("RawStore: " + threadLocalRawStore + ", for the thread: " + + this.getName() + " will be closed now."); + threadLocalRawStore.shutdown(); + threadRawStoreMap.remove(threadId); + } + } + + /** + * Cache the ThreadLocal RawStore object. Called from the corresponding thread. + */ + public void cacheThreadLocalRawStore() { + Long threadId = this.getId(); + RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); + if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) { + LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " + + this.getName() + " to threadRawStoreMap for future cleanup."); + threadRawStoreMap.put(threadId, threadLocalRawStore); + } + } +}