diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 75cd68a..5a88550 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -389,7 +389,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); replLoadTask.executeTask(null); - Hive.getThreadLocal().closeCurrent(); + Hive.closeCurrent(); return replLoadWork.getRootTask(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 180b41e..e185bf4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -167,35 +167,36 @@ private IMetaStoreClient metaStoreClient; private SynchronizedMetaStoreClient syncMetaStoreClient; private UserGroupInformation owner; + private boolean isAllowClose = true; // metastore calls timing information private final ConcurrentHashMap metaCallTimeMap = new ConcurrentHashMap<>(); - // Static class to store thread local Hive object and allowClose flag. + // Static class to store thread local Hive object. private static class ThreadLocalHive extends ThreadLocal { - private ThreadLocal allowClose = ThreadLocal.withInitial(() -> true); - @Override protected Hive initialValue() { return null; } @Override - public synchronized void remove() { - if (allowClose() && (this.get() != null)) { - this.get().close(); + public synchronized void set(Hive hiveObj) { + Hive currentHive = this.get(); + if (currentHive != hiveObj) { + // Remove/close current thread-local Hive object before overwriting with new Hive object. + remove(); + super.set(hiveObj); } - super.remove(); - this.allowClose.set(true); - } - - public synchronized void set(Hive hiveObj, boolean allowClose) { - super.set(hiveObj); - this.allowClose.set(allowClose); } - boolean allowClose() { - return this.allowClose.get(); + @Override + public synchronized void remove() { + Hive currentHive = this.get(); + if (currentHive != null) { + // Close the metastore connections before removing it from thread local hiveDB. + currentHive.close(false); + super.remove(); + } } } @@ -317,7 +318,12 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast Hive db = hiveDB.get(); if (db == null || !db.isCurrentUserOwner() || needsRefresh || (c != null && !isCompatible(db, c, isFastCheck))) { - db = create(c, false, db, doRegisterAllFns); + if (db != null) { + LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + + ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); + closeCurrent(); + } + db = create(c, doRegisterAllFns); } if (c != null) { db.conf = c; @@ -325,26 +331,16 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast return db; } - private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean doRegisterAllFns) - throws HiveException { - if (db != null) { - LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + - ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); - if (hiveDB.allowClose()) { - db.close(); - } - } - closeCurrent(); + private static Hive create(HiveConf c, boolean doRegisterAllFns) throws HiveException { if (c == null) { c = createHiveConf(); } c.set("fs.scheme.class", "dfs"); Hive newdb = new Hive(c, doRegisterAllFns); - hiveDB.set(newdb, true); + hiveDB.set(newdb); return newdb; } - private static HiveConf createHiveConf() { SessionState session = SessionState.get(); return (session == null) ? new HiveConf(Hive.class) : session.getConf(); @@ -360,6 +356,18 @@ private static boolean isCompatible(Hive db, HiveConf c, boolean isFastCheck) { } } + private boolean isCurrentUserOwner() throws HiveException { + try { + return owner == null || owner.equals(UserGroupInformation.getCurrentUser()); + } catch(IOException e) { + throw new HiveException("Error getting current user: " + e.getMessage(), e); + } + } + + public static Hive getThreadLocal() { + return hiveDB.get(); + } + public static Hive get() throws HiveException { return get(true); } @@ -383,21 +391,13 @@ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException { } public static void set(Hive hive) { - hiveDB.set(hive, true); - } - - public static void set(Hive hive, boolean allowClose) { - hiveDB.set(hive, allowClose); + hiveDB.set(hive); } public static void closeCurrent() { hiveDB.remove(); } - public static Hive getThreadLocal() { - return hiveDB.get(); - } - /** * Hive * @@ -411,30 +411,49 @@ private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException { } } - - private boolean isCurrentUserOwner() throws HiveException { - try { - return owner == null || owner.equals(UserGroupInformation.getCurrentUser()); - } catch(IOException e) { - throw new HiveException("Error getting current user: " + e.getMessage(), e); - } + /** + * GC is attempting to destroy the object. + * No one references this Hive anymore, so HMS connection from this Hive object can be closed. + * @throws Throwable + */ + @Override + protected void finalize() throws Throwable { + close(true); + super.finalize(); } + /** + * Marks if the given Hive object is allowed to close metastore connections. + * @param allowClose + */ + public void setAllowClose(boolean allowClose) { + isAllowClose = allowClose; + } + /** + * Gets the allowClose flag which determines if it is allowed to close metastore connections. + * @returns allowClose flag + */ + public boolean allowClose() { + return isAllowClose; + } /** - * closes the connection to metastore for the calling thread + * Closes the connection to metastore for the calling thread if allow to close. + * @param forceClose - Override the isAllowClose flag to forcefully close the MS connections. */ - private void close() { - LOG.debug("Closing current thread's connection to Hive Metastore."); - if (metaStoreClient != null) { - metaStoreClient.close(); - metaStoreClient = null; - } - // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once. - syncMetaStoreClient = null; - if (owner != null) { - owner = null; + public void close(boolean forceClose) { + if (allowClose() || forceClose) { + LOG.debug("Closing current thread's connection to Hive Metastore."); + if (metaStoreClient != null) { + metaStoreClient.close(); + metaStoreClient = null; + } + // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once. + syncMetaStoreClient = null; + if (owner != null) { + owner = null; + } } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 3d24884..f975199 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -299,6 +299,11 @@ public void runInternal() throws HiveSQLException { private BackgroundWork(UserGroupInformation currentUGI, Hive parentHive, SessionState parentSessionState, boolean asyncPrepare) { + // Note: parentHive can be shared by multiple threads and so it should be protected from any + // thread closing metastore connections when some other thread still accessing it. So, it is + // expected that allowClose flag in parentHive is set to false by caller and it will be caller's + // responsibility to close it explicitly with forceClose flag as true. + // Shall refer to sessionHive in HiveSessionImpl.java for the usage. this.currentUGI = currentUGI; this.parentHive = parentHive; this.parentSessionState = parentSessionState; @@ -310,7 +315,8 @@ public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - Hive.set(parentHive, false); + assert (!parentHive.allowClose()); + Hive.set(parentHive); // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(SessionState.getPerfLogger()); @@ -328,13 +334,11 @@ public Object run() throws HiveSQLException { LOG.error("Error running hive query: ", e); } finally { LogUtils.unregisterLoggingContext(); - Hive hiveDb = Hive.getThreadLocal(); - if (hiveDb != null && hiveDb != parentHive) { - // If new hive object is created by the child thread, then we need to close it as it might - // have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes - // HMS connection leaks otherwise. - Hive.closeCurrent(); - } + + // If new hive object is created by the child thread, then we need to close it as it might + // have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes + // HMS connection leaks otherwise. + Hive.closeCurrent(); } return null; } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 0018f68..a8bf876 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -185,11 +185,10 @@ public void open(Map sessionConfMap) throws HiveSQLException { LOG.error(msg, e); throw new HiveSQLException(msg, e); } - try { - sessionHive = Hive.get(getHiveConf()); - } catch (HiveException e) { - throw new HiveSQLException("Failed to get metastore connection", e); - } + + // Set sessionHive object created based on sessionConf. + setSessionHive(); + // Process global init file: .hiverc processGlobalInitFile(); // Set fetch size in session conf map @@ -237,6 +236,28 @@ protected int processCmd(String cmd) { } } + /** + * Sets sessionHive object created based on sessionConf. + * @throws HiveSQLException + */ + private void setSessionHive() throws HiveSQLException { + Hive newSessionHive; + try { + newSessionHive = Hive.get(getHiveConf()); + + // HMS connections from sessionHive shouldn't be closed by any query execution thread when it + // recreates the Hive object. It is allowed to be closed only when session is closed/released. + newSessionHive.setAllowClose(false); + } catch (HiveException e) { + throw new HiveSQLException("Failed to get metastore connection", e); + } + + // The previous sessionHive object might still be referred by any async query execution thread. + // So, it shouldn't be closed here explicitly. Anyways, Hive object will auto-close HMS connection + // when it is garbage collected. So, it is safe to just overwrite sessionHive here. + sessionHive = newSessionHive; + } + private void processGlobalInitFile() { IHiveFileProcessor processor = new GlobalHivercFileProcessor(); @@ -402,7 +423,20 @@ private synchronized void acquireAfterOpLock(boolean userAccess) { } // set the thread name with the logging prefix. sessionState.updateThreadName(); - Hive.set(sessionHive); + + // If the thread local Hive is different from sessionHive, it means, the previous query execution in + // master thread has re-created Hive object due to changes in MS related configurations in sessionConf. + // So, it is necessary to reset sessionHive object based on new sessionConf. Here, we cannot, + // directly set sessionHive with thread local Hive because if the previous command was REPL LOAD, then + // the config changes lives only within command execution not in session level. + // So, the safer option is to invoke Hive.get() which decides if to reuse Thread local Hive or re-create it. + if (Hive.getThreadLocal() != sessionHive) { + try { + setSessionHive(); + } catch (HiveSQLException e) { + throw new RuntimeException(e); + } + } } /** @@ -777,12 +811,20 @@ public void close() throws HiveSQLException { } if (sessionHive != null) { try { - Hive.closeCurrent(); + sessionHive.close(true); } catch (Throwable t) { LOG.warn("Error closing sessionHive", t); } sessionHive = null; } + try { + // The thread local Hive in master thread can be different from sessionHive if any query + // execution from master thread resets it to new Hive object due to changes in sessionConf. + // So, need to close it as well. If it is same as sessionHive, then it is just no-op. + Hive.closeCurrent(); + } catch (Throwable t) { + LOG.warn("Error closing thread local Hive", t); + } release(true, false); } } diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java index 5655458..be8d70b 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java @@ -280,6 +280,7 @@ public void testActiveSessionMetrics() throws Exception { @Override public void run() { try { + Hive.set(session.getSessionHive()); OperationHandle handle = session.getTables("catalog", "schema", "table", null); session.closeOperation(handle); } catch (Exception e) { @@ -334,6 +335,7 @@ public void testActiveSessionTimeMetrics() throws Exception { @Override public void run() { try { + Hive.set(session.getSessionHive()); OperationHandle handle = session.getTables("catalog", "schema", "table", null); session.closeOperation(handle); } catch (Exception e) {