diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4df2758..8a7fc6d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -529,4 +529,13 @@ private String encodeFileUri(String fileUriStr, String fileChecksum) { return fileUriStr; } } + + @Override + public void shutdown() throws MetaException { + if (rs != null) { + rs.shutdown(); + rs = null; + } + } + } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 640b567..1bac42c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -31,11 +31,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; @@ -73,6 +75,8 @@ import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -1232,6 +1236,49 @@ public void sqlInsertPartition() throws Exception { assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); } + /** + * Tests that closing the HMS client cleans up cached JDOPersistanceManager + * @throws Exception + */ + @Test + public void testJDOPMFCacheCleanup() throws Exception { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, + DbNotificationListener.class.getName()); + int jdoPmCacheSizeInit = getJDOPersistanceManagerCacheSize(); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + int jdoPmCacheSizeIncreased = getJDOPersistanceManagerCacheSize(); + assertTrue(jdoPmCacheSizeIncreased > jdoPmCacheSizeInit); + msClient.close(); + int jdoPmCacheSizeAfterCleanup = getJDOPersistanceManagerCacheSize(); + assertTrue(jdoPmCacheSizeAfterCleanup < jdoPmCacheSizeIncreased); + } + + private int getJDOPersistanceManagerCacheSize() { + JDOPersistenceManagerFactory jdoPmf; + Set pmCacheObj; + Field pmCache; + Field pmf; + try { + pmf = ObjectStore.class.getDeclaredField("pmf"); + if (pmf != null) { + pmf.setAccessible(true); + jdoPmf = (JDOPersistenceManagerFactory) pmf.get(null); + pmCache = JDOPersistenceManagerFactory.class.getDeclaredField("pmCache"); + if (pmCache != null) { + pmCache.setAccessible(true); + pmCacheObj = (Set) pmCache.get(jdoPmf); + if (pmCacheObj != null) { + return pmCacheObj.size(); + } + } + } + } catch (Exception ex) { + System.out.println(ex); + } + return -1; + } + private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception { // Parse the message field InsertMessage insertMsg = md.getInsertMessage(event.getMessage()); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 13d0aab..c2a8b4d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -835,8 +835,26 @@ public fb_status getStatus() { @Override public void shutdown() { cleanupRawStore(); + cleanupEventListeners(); } + /* + * Perform cleanup related to configured MetaStoreEventListener + */ + private void cleanupEventListeners() { + try { + for (MetaStoreEventListener listener : listeners) { + listener.shutdown(); + } + for (MetaStoreEventListener listener : transactionalListeners) { + listener.shutdown(); + } + } catch (MetaException e) { + LOG.info("Unable to shutdown MetaStoreEventListener", e); + } + } + + @Override public AbstractMap getCounters() { AbstractMap counters = super.getCounters(); @@ -7167,7 +7185,6 @@ private static void cleanupRawStore() { HMSHandler.logInfo("Done cleaning up thread local RawStore"); } } - private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, final Condition startCondition, final AtomicBoolean startedServing) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b0defb5..8fe8221 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -171,6 +171,13 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { } + /** + * Perform cleanup related to this instance of the MetaStoreEventListener + */ + public void shutdown() throws MetaException { + + } + @Override public Configuration getConf() { return this.conf;