diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index ddfaacfbc565ad63cad0e93f94e388823de06461..d90085b20d43da3f0cdfb2503e5defc44ec1ad11 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import java.lang.reflect.Field; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -33,6 +34,8 @@ import junit.framework.TestCase; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -3211,6 +3214,54 @@ public void testRetriableClientWithConnLifetime() throws Exception { client.close(); } + public void testJDOPersistanceManagerCleanup() throws Exception { + if (isThriftClient == false) { + return; + } + + int numObjectsBeforeClose = getJDOPersistanceManagerCacheSize(); + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf); + closingClient.getAllDatabases(); + closingClient.close(); + Thread.sleep(5 * 1000); // give HMS time to handle close request + int numObjectsAfterClose = getJDOPersistanceManagerCacheSize(); + Assert.assertTrue(numObjectsBeforeClose == numObjectsAfterClose); + + HiveMetaStoreClient nonClosingClient = new HiveMetaStoreClient(hiveConf); + nonClosingClient.getAllDatabases(); + // Drop connection without calling close. HMS thread deleteContext + // will trigger cleanup + nonClosingClient.getTTransport().close(); + Thread.sleep(5 * 1000); + int numObjectsAfterDroppedConnection = getJDOPersistanceManagerCacheSize(); + Assert.assertTrue(numObjectsAfterClose == numObjectsAfterDroppedConnection); + } + + private static int getJDOPersistanceManagerCacheSize() { + JDOPersistenceManagerFactory jdoPmf; + Set pmCacheObj; + Field pmCache; + Field pmf; + try { + pmf = ObjectStore.class.getDeclaredField("pmf"); + if (pmf != null) { + pmf.setAccessible(true); + jdoPmf = (JDOPersistenceManagerFactory) pmf.get(null); + pmCache = JDOPersistenceManagerFactory.class.getDeclaredField("pmCache"); + if (pmCache != null) { + pmCache.setAccessible(true); + pmCacheObj = (Set) pmCache.get(jdoPmf); + if (pmCacheObj != null) { + return pmCacheObj.size(); + } + } + } + } catch (Exception ex) { + System.out.println(ex); + } + return -1; + } + private HiveMetaHookLoader getHookLoader() { HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { @Override diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java index 6da516558f6cf9a643d33f55cce31def9b7abc91..ef02968e22363d537f58b6054266bf9bc87033ae 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java @@ -25,6 +25,7 @@ public class TestRemoteHiveMetaStore extends TestHiveMetaStore { private static boolean isServerStarted = false; + private static int port; public TestRemoteHiveMetaStore() { super(); @@ -37,21 +38,22 @@ protected void setUp() throws Exception { if (isServerStarted) { assertNotNull("Unable to connect to the MetaStore server", client); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); return; } - int port = MetaStoreUtils.findFreePort(); + port = MetaStoreUtils.findFreePort(); System.out.println("Starting MetaStore Server on port " + port); MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); isServerStarted = true; // This is default case with setugi off for both client and server - createClient(false, port); + createClient(false); } - protected void createClient(boolean setugi, int port) throws Exception { + protected void createClient(boolean setugi) throws Exception { hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); hiveConf.setBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI,setugi); client = new HiveMetaStoreClient(hiveConf); } -} +} \ No newline at end of file diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyClient.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyClient.java index 2c6d56782219fba26a878497860e2b5e047beafa..29768c1d660aac937c0cd1fa15fb70b571007d14 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyClient.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyClient.java @@ -21,8 +21,8 @@ public class TestSetUGIOnOnlyClient extends TestRemoteHiveMetaStore{ @Override - protected void createClient(boolean setugi, int port) throws Exception { + protected void createClient(boolean setugi) throws Exception { // turn it on for client. - super.createClient(true, port); + super.createClient(true); } } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyServer.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyServer.java index 6c3fbf62c927644087478c4c7cce26cc144da501..4a46f7537f3ceb16c45010b88786907109fd1090 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyServer.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestSetUGIOnOnlyServer.java @@ -21,8 +21,8 @@ public class TestSetUGIOnOnlyServer extends TestSetUGIOnBothClientServer { @Override - protected void createClient(boolean setugi, int port) throws Exception { + protected void createClient(boolean setugi) throws Exception { // It is turned on for both client and server because of super class. Turn it off for client. - super.createClient(false, port); + super.createClient(false); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c6c1e11429e7a4b4389d3d9ec12e02dadb986049..62ad476648128397fd1c595e6a2eb224fb24713e 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -284,7 +284,7 @@ protected Formatter initialValue() { } }; - private final void logAuditEvent(String cmd) { + private static final void logAuditEvent(String cmd) { if (cmd == null) { return; } @@ -307,7 +307,7 @@ private final void logAuditEvent(String cmd) { address, cmd).toString()); } - String getIPAddress() { + private static String getIPAddress() { if (useSasl) { if (saslServer != null && saslServer.getRemoteAddress() != null) { return saslServer.getRemoteAddress().getHostAddress(); @@ -750,7 +750,7 @@ private void addAdminUsers_core() throws MetaException { } } - private void logInfo(String m) { + private static void logInfo(String m) { LOG.info(threadLocalId.get().toString() + ": " + m); logAuditEvent(m); } @@ -823,17 +823,7 @@ public fb_status getStatus() { @Override public void shutdown() { - logInfo("Metastore shutdown started..."); - RawStore ms = threadLocalMS.get(); - if (ms != null) { - try { - ms.shutdown(); - } finally { - threadLocalConf.remove(); - threadLocalMS.remove(); - } - } - logInfo("Metastore shutdown complete."); + cleanupRawStore(); } @Override @@ -6833,6 +6823,9 @@ public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TPro } catch (Exception e) { LOG.warn("Error Reporting Metastore close connection to Metrics system", e); } + // If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already + // cleaned up thread local RawStore. Otherwise, do it now. + cleanupRawStore(); } @Override @@ -6860,6 +6853,20 @@ public void processContext(ServerContext serverContext, TTransport tTransport, T } } + private static void cleanupRawStore() { + RawStore rs = HMSHandler.getRawStore(); + if (rs != null) { + HMSHandler.logInfo("Cleaning up thread local RawStore..."); + try { + rs.shutdown(); + } finally { + HMSHandler.removeRawStore(); + threadLocalConf.remove(); + } + HMSHandler.logInfo("Done cleaning up thread local RawStore"); + } + } + private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, final Condition startCondition, final AtomicBoolean startedServing) { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 44d73d482263bc7f3c210503ba9440a6cd793a72..909d8ebdf7431aa9e6c7acfbf967400d95ef7651 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -326,6 +327,11 @@ private void promoteRandomMetaStoreURI() { metastoreUris[index] = tmp; } + @VisibleForTesting + public TTransport getTTransport() { + return transport; + } + @Override public boolean isLocalMetaStore() { return localMetaStore;