diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 85a9565..04c46db 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -61,6 +61,10 @@ // hcatalog specific configurations, that can be put in hive-site.xml public static final String HCAT_HIVE_CLIENT_EXPIRY_TIME = "hcatalog.hive.client.cache.expiry.time"; + // config parameter that suggests to hcat that metastore clients not be cached - default is false + // this parameter allows highly-parallel hcat usescases to not gobble up too many connections that + // sit in the cache. + public static final String HCAT_HIVE_CLIENT_DISABLE_CACHE = "hcatalog.hive.client.cache.disabled"; private HCatConstants() { // restrict instantiation } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 18e3556..1036a2f 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -80,7 +80,6 @@ private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class); private static volatile HiveClientCache hiveClientCache; - private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60; public static boolean checkJobContextIfRunningFromBackend(JobContext j) { if (j.getConfiguration().get("mapred.task.id", "").equals("") && @@ -551,6 +550,11 @@ public static void copyConf(Configuration src, Configuration dest) { public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) throws MetaException, IOException { + if (hiveConf.getBoolean(HCatConstants.HCAT_HIVE_CLIENT_DISABLE_CACHE, false)){ + // If cache is disabled, don't use it. + return getNonCachedHiveClient(hiveConf); + } + // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and // using the expiry time available in hiveConf. @@ -558,7 +562,7 @@ public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) synchronized (HiveMetaStoreClient.class) { if (hiveClientCache == null) { hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, - DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS)); + HiveClientCache.DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS)); } } } @@ -569,6 +573,10 @@ public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) } } + private static HiveMetaStoreClient getNonCachedHiveClient(HiveConf hiveConf) throws MetaException{ + return new HiveMetaStoreClient(hiveConf); + } + public static void closeHiveClientQuietly(HiveMetaStoreClient client) { try { if (client != null) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java index 68f5e8f..0712a54 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java @@ -18,10 +18,17 @@ */ package org.apache.hive.hcatalog.common; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,18 +40,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; /** * A thread safe time expired cache for HiveMetaStoreClient */ class HiveClientCache { + public final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60; + final private Cache hiveCache; private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class); private final int timeout; @@ -53,6 +59,8 @@ private static final AtomicInteger nextId = new AtomicInteger(0); + ScheduledFuture cleanupHandle; // used to cleanup cache + // Since HiveMetaStoreClient is not threadsafe, hive clients are not shared across threads. // Thread local variable containing each thread's unique ID, is used as one of the keys for the cache // causing each thread to get a different client even if the hiveConf is same. @@ -90,6 +98,22 @@ public void onRemoval(RemovalNotification cleanupInterval){ + cleanupInterval = timeout; + } + + cleanupHandle = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( + cleanupThread, + cleanupInterval, cleanupInterval, TimeUnit.SECONDS); + + // Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up. // This is the best effort approach. Ignore any error while doing so. Notice that most of the clients // would get cleaned up via either the removalListener or the close() call, only the active clients @@ -100,6 +124,7 @@ public void onRemoval(RemovalNotification