diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4de0389..8435066 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -171,31 +171,43 @@ // metastore calls timing information private final ConcurrentHashMap metaCallTimeMap = new ConcurrentHashMap<>(); + // Counter for number of threads referring to this Hive object via thread local hiveDB + private final AtomicInteger referenceThreadCount = new AtomicInteger(0); + // Static class to store thread local Hive object and allowClose flag. private static class ThreadLocalHive extends ThreadLocal { - private ThreadLocal allowClose = ThreadLocal.withInitial(() -> true); - @Override protected Hive initialValue() { return null; } @Override - public synchronized void remove() { - if (allowClose() && (this.get() != null)) { - this.get().close(); - } - super.remove(); - this.allowClose.set(true); - } + public synchronized void set(Hive hiveObj) { + Hive currentHive = super.get(); + if ((currentHive == null) || (currentHive != hiveObj)) { + // Remove current thread-local Hive from reference before overwriting with new Hive object. + remove(); + + super.set(hiveObj); - public synchronized void set(Hive hiveObj, boolean allowClose) { - super.set(hiveObj); - this.allowClose.set(allowClose); + // Add current thread to the reference count of Hive object. + hiveObj.addReferenceThread(); + } } - boolean allowClose() { - return this.allowClose.get(); + @Override + public synchronized void remove() { + Hive currentHive = super.get(); + if (currentHive != null) { + // Current threads doesn't refer to currentHive anymore. So, remove it from reference count. + int refCount = currentHive.removeReferenceThread(); + if (refCount <= 0) { + // If no other thread is referring to currentHive object, then it should be closed or else + // just remove from thread local HiveDb + currentHive.close(); + } + super.remove(); + } } } @@ -317,7 +329,11 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast Hive db = hiveDB.get(); if (db == null || !db.isCurrentUserOwner() || needsRefresh || (c != null && !isCompatible(db, c, isFastCheck))) { - db = create(c, false, db, doRegisterAllFns); + if (db != null) { + LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + + ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); + } + db = create(c, doRegisterAllFns); } if (c != null) { db.conf = c; @@ -325,22 +341,14 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast return db; } - private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean doRegisterAllFns) - throws HiveException { - if (db != null) { - LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + - ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); - if (hiveDB.allowClose()) { - db.close(); - } - } + private static Hive create(HiveConf c, boolean doRegisterAllFns) throws HiveException { closeCurrent(); if (c == null) { c = createHiveConf(); } c.set("fs.scheme.class", "dfs"); Hive newdb = new Hive(c, doRegisterAllFns); - hiveDB.set(newdb, true); + hiveDB.set(newdb); return newdb; } @@ -383,21 +391,13 @@ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException { } public static void set(Hive hive) { - hiveDB.set(hive, true); - } - - public static void set(Hive hive, boolean allowClose) { - hiveDB.set(hive, allowClose); + hiveDB.set(hive); } public static void closeCurrent() { hiveDB.remove(); } - public static Hive getThreadLocal() { - return hiveDB.get(); - } - /** * Hive * @@ -411,7 +411,6 @@ private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException { } } - private boolean isCurrentUserOwner() throws HiveException { try { return owner == null || owner.equals(UserGroupInformation.getCurrentUser()); @@ -420,7 +419,21 @@ private boolean isCurrentUserOwner() throws HiveException { } } + /** + * Current thread refers to Hive object. Add it to reference count. + * @return Number of threads referring to the Hive object after adding. + */ + private int addReferenceThread() { + return this.referenceThreadCount.incrementAndGet(); + } + /** + * Current thread removes reference to Hive object. Remove it from reference count. + * @return Number of threads referring to the Hive object. + */ + private int removeReferenceThread() { + return this.referenceThreadCount.decrementAndGet(); + } /** * closes the connection to metastore for the calling thread @@ -436,6 +449,7 @@ private void close() { if (owner != null) { owner = null; } + this.referenceThreadCount.set(0); } /** diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 3d24884..e31a255 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -310,7 +310,7 @@ public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - Hive.set(parentHive, false); + Hive.set(parentHive); // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(SessionState.getPerfLogger()); @@ -328,13 +328,11 @@ public Object run() throws HiveSQLException { LOG.error("Error running hive query: ", e); } finally { LogUtils.unregisterLoggingContext(); - Hive hiveDb = Hive.getThreadLocal(); - if (hiveDb != null && hiveDb != parentHive) { - // If new hive object is created by the child thread, then we need to close it as it might - // have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes - // HMS connection leaks otherwise. - Hive.closeCurrent(); - } + + // If new hive object is created by the child thread, then we need to close it as it might + // have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes + // HMS connection leaks otherwise. + Hive.closeCurrent(); } return null; }