diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 76541de..b717d21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -170,7 +170,10 @@ // metastore calls timing information private final ConcurrentHashMap metaCallTimeMap = new ConcurrentHashMap<>(); - private static ThreadLocal hiveDB = new ThreadLocal() { + // Static class to store thread local Hive object and allowClose flag. + private static class ThreadLocalHive extends ThreadLocal { + private ThreadLocal allowClose = ThreadLocal.withInitial(() -> true); + @Override protected Hive initialValue() { return null; @@ -178,12 +181,24 @@ protected Hive initialValue() { @Override public synchronized void remove() { - if (this.get() != null) { + if (allowClose() && (this.get() != null)) { this.get().close(); } super.remove(); + this.allowClose.set(true); + } + + public synchronized void set(Hive hiveObj, boolean allowClose) { + super.set(hiveObj); + this.allowClose.set(allowClose); } - }; + + boolean allowClose() { + return this.allowClose.get(); + } + } + + private static ThreadLocalHive hiveDB = new ThreadLocalHive(); // Note that while this is an improvement over static initialization, it is still not, // technically, valid, cause nothing prevents us from connecting to several metastores in @@ -314,7 +329,9 @@ private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean do if (db != null) { LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh + ", db.isCurrentUserOwner = " + db.isCurrentUserOwner()); - db.close(); + if (hiveDB.allowClose()) { + db.close(); + } } closeCurrent(); if (c == null) { @@ -322,7 +339,7 @@ private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean do } c.set("fs.scheme.class", "dfs"); Hive newdb = new Hive(c, doRegisterAllFns); - hiveDB.set(newdb); + hiveDB.set(newdb, true); return newdb; } @@ -365,7 +382,11 @@ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException { } public static void set(Hive hive) { - hiveDB.set(hive); + hiveDB.set(hive, true); + } + + public static void set(Hive hive, boolean allowClose) { + hiveDB.set(hive, allowClose); } public static void closeCurrent() { @@ -2376,10 +2397,10 @@ private void constructOneLBLocationMap(FileStatus fSta, final SessionState parentSession = SessionState.get(); final List> futures = Lists.newLinkedList(); + // for each dynamically created DP directory, construct a full partition spec + // and load the partition based on that + final Map rawStoreMap = new ConcurrentHashMap<>(); try { - // for each dynamically created DP directory, construct a full partition spec - // and load the partition based on that - final Map rawStoreMap = new ConcurrentHashMap<>(); for(final Path partPath : validPartitions) { // generate a full partition specification final LinkedHashMap fullPartSpec = Maps.newLinkedHashMap(partSpec); @@ -2410,12 +2431,6 @@ public Void call() throws Exception { + partsToLoad + " partitions."); } } - // Add embedded rawstore, so we can cleanup later to avoid memory leak - if (getMSC().isLocalMetaStore()) { - if (!rawStoreMap.containsKey(Thread.currentThread().getId())) { - rawStoreMap.put(Thread.currentThread().getId(), HiveMetaStore.HMSHandler.getRawStore()); - } - } return null; } catch (Exception t) { LOG.error("Exception when loading partition with parameters " @@ -2427,6 +2442,20 @@ public Void call() throws Exception { + " isAcid=" + isAcid + ", " + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); throw t; + } finally { + // Add embedded rawstore, so we can cleanup later to avoid memory leak + if (getMSC().isLocalMetaStore()) { + Long threadId = Thread.currentThread().getId(); + RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); + if (threadLocalRawStore == null) { + // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap. + rawStoreMap.remove(threadId); + } else { + // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore. + // So, overwrite the old one if exists in rawStoreMap. + rawStoreMap.put(threadId, threadLocalRawStore); + } + } } } })); @@ -2437,8 +2466,6 @@ public Void call() throws Exception { for (Future future : futures) { future.get(); } - - rawStoreMap.forEach((k, rs) -> rs.shutdown()); } catch (InterruptedException | ExecutionException e) { LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); //cancel other futures @@ -2448,6 +2475,8 @@ public Void call() throws Exception { throw new HiveException("Exception when loading " + partsToLoad + " in table " + tbl.getTableName() + " with loadPath=" + loadPath, e); + } finally { + rawStoreMap.forEach((k, rs) -> rs.shutdown()); } try { diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 6df9eda..0e6bd4d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -306,7 +306,7 @@ public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { - Hive.set(parentHive); + Hive.set(parentHive, false); // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(SessionState.getPerfLogger());