diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 85a9565..2ee50b3 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -61,6 +61,10 @@ // hcatalog specific configurations, that can be put in hive-site.xml public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time"; + // config parameter that suggests to hcat that metastore clients not be cached - default is false + // this parameter allows highly-parallel hcat usescases to not gobble up too many connections that + // sit in the cache, while not in use. + public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = "hcatalog.hive.client.cache.disabled"; private HCatConstants() { // restrict instantiation } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 18e3556..3fe7fb9 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -80,7 +80,6 @@ private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class); private static volatile HiveClientCache hiveClientCache; - private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60; public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("") && @@ -551,14 +550,16 @@ public static void copyConf(Configuration src, Configuration dest) { public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) throws MetaException, IOException { - // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and - // using the expiry time available in hiveConf. + if (hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_DISABLE_CACHE, false)){ + // If cache is disabled, don't use it. + return HiveClientCache.getNonCachedHiveClient(hiveConf); + } + // Singleton behaviour: create the cache instance if required. if (hiveClientCache == null) { synchronized (HiveMetaStoreClient.class) { if (hiveClientCache == null) { - hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, - DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS)); + hiveClientCache = new HiveClientCache(hiveConf); } } } @@ -569,6 +570,10 @@ public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) } } + private static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException{ + return new HiveMetaStoreClient(hiveConf); + } + public static void closeHiveClientQuietly(HiveMetaStoreClient client) { try { if (client != null) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java index 68f5e8f..730b6ef 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java @@ -18,10 +18,17 @@ */ package org.apache.hive.hcatalog.common; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,18 +40,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; /** * A thread safe time expired cache for HiveMetaStoreClient */ class HiveClientCache { + public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60; + final private Cache hiveCache; private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class); private final int timeout; @@ -53,6 +59,8 @@ private static final AtomicInteger nextId = new AtomicInteger(0); + private ScheduledFuture cleanupHandle; // used to cleanup cache + // Since HiveMetaStoreClient is not threadsafe, hive clients are not shared across threads. // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache // causing each thread to get a different client even if the hiveConf is same. @@ -68,6 +76,14 @@ private int getThreadId() { return threadId.get(); } + public static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException { + return new HiveMetaStoreClient(hiveConf); + } + + public HiveClientCache(HiveConf hiveConf) { + this(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, + DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS)); + } /** * @param timeout the length of time in seconds after a client is created that it should be automatically removed */ @@ -90,6 +106,39 @@ public void onRemoval(RemovalNotification cleanupInterval){ + cleanupInterval = timeout; + } + + /** + * Create the cleanup handle. In addition to cleaning up every cleanupInterval, we add + * a slight offset, so that the very first time it runs, it runs with a slight delay, so + * as to catch any other connections that were closed when the first timeout happened. + * As a result, the time we can expect an unused connection to be reaped is + * 5 seconds after the first timeout, and then after that, it'll check for whether or not + * it can be cleaned every max(DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS,timeout) seconds + */ + cleanupHandle = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( + cleanupThread, + timeout + 5, cleanupInterval, TimeUnit.SECONDS); + + // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up. // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients // would get cleaned up via either the removalListener or the close() call, only the active clients @@ -100,6 +149,7 @@ public void onRemoval(RemovalNotification