diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e320dbf..6dc520f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -74,10 +74,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.PartitionDropOptions; +import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; @@ -1805,6 +1807,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that + final Map rawStoreMap = new HashMap(); for(final Path partPath : validPartitions) { // generate a full partition specification final LinkedHashMap fullPartSpec = Maps.newLinkedHashMap(partSpec); @@ -1831,6 +1834,12 @@ public Void call() throws Exception { + partsToLoad + " partitions."); } } + // Add embedded rawstore, so we can cleanup later to avoid memory leak + if (metaStoreClient.isLocalMetaStore()) { + if (!rawStoreMap.containsKey(Thread.currentThread().getId())) { + rawStoreMap.put(Thread.currentThread().getId(), HiveMetaStore.HMSHandler.getRawStore()); + } + } return null; } catch (Exception t) { LOG.error("Exception when loading partition with parameters " @@ -1852,6 +1861,10 @@ public Void call() throws Exception { for (Future future : futures) { future.get(); } + + for (RawStore rs : rawStoreMap.values()) { + rs.shutdown(); + } } catch (InterruptedException | ExecutionException e) { LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); //cancel other futures