diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java index fd4527e653..d9b2a5786e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java @@ -81,6 +81,8 @@ private static final String dbName = "hive2038"; private static final String tblName = "tmptbl"; private static final String renamed = "tmptbl2"; + private static final String metaConfKey = "hive.metastore.partition.name.whitelist.pattern"; + private static final String metaConfVal = ""; @Override protected void setUp() throws Exception { @@ -93,9 +95,11 @@ protected void setUp() throws Exception { DummyPreListener.class.getName()); int port = MetaStoreUtils.findFreePort(); - MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge()); - hiveConf = new HiveConf(this.getClass()); + + hiveConf.setVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN, metaConfVal); + MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); @@ -455,4 +459,61 @@ public void testListener() throws Exception { assertEquals("true", event.getOldValue()); assertEquals("false", event.getNewValue()); } + + public void testMetaConfNotifyListenersClosingClient() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, "[test pattern modified]"); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), ""); + assertEquals(event.getNewValue(), "[test pattern modified]"); + closingClient.close(); + + Thread.sleep(5 * 1000); + + event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), "[test pattern modified]"); + assertEquals(event.getNewValue(), ""); + } + + public void testMetaConfNotifyListenersNonClosingClient() throws Exception { + HiveMetaStoreClient nonClosingClient = new HiveMetaStoreClient(hiveConf, null); + nonClosingClient.setMetaConf(metaConfKey, "[test pattern modified]"); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), metaConfVal); + assertEquals(event.getNewValue(), "[test pattern modified]"); + // This should also trigger meta listener notification via TServerEventHandler#deleteContext + nonClosingClient.getTTransport().close(); + + Thread.sleep(5 * 1000); + + event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), "[test pattern modified]"); + assertEquals(event.getNewValue(), metaConfVal); + } + + public void testMetaConfDuplicateNotification() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, metaConfVal); + int beforeCloseNotificationEventCounts = DummyListener.notifyList.size(); + closingClient.close(); + + Thread.sleep(5 * 1000); + + int afterCloseNotificationEventCounts = DummyListener.notifyList.size(); + // Setting key to same value, should not trigger configChange event during shutdown + assertEquals(beforeCloseNotificationEventCounts, afterCloseNotificationEventCounts); + } + + public void testMetaConfSameHandler() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, metaConfVal); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + HiveMetaStore.HMSHandler beforeHandler = event.getHandler(); + closingClient.close(); + + Thread.sleep(5 * 1000); + HiveMetaStore.HMSHandler afterHandler = event.getHandler(); + // Both the handlers should be same + assertEquals(beforeHandler, afterHandler); + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 58b9044930..832086374f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -40,6 +40,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.Timer; @@ -275,6 +277,28 @@ protected Configuration initialValue() { } }; + /** + * Thread local HMSHandler used during shutdown to notify meta listeners + */ + private static final ThreadLocal threadLocalHMSHandler = + new ThreadLocal() { + @Override + protected HMSHandler initialValue() { + return null; + } + }; + + /** + * Thread local Map to keep track of modified meta conf keys + */ + private static final ThreadLocal threadLocalModifiedMetaConfKeys = + new ThreadLocal() { + @Override + protected Map initialValue() { + return new HashMap(); + } + }; + private static ExecutorService threadPool; public static final String AUDIT_FORMAT = @@ -344,6 +368,62 @@ protected String initialValue() { } }; + /** + * Internal function to notify listeners for meta config change events + */ + private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException { + for (MetaStoreEventListener listener : listeners) { + listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue)); + } + + if (transactionalListeners.size() > 0) { + // All the fields of this event are final, so no reason to create a new one for each + // listener + ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onConfigChange(cce); + } + } + } + + /** + * Internal function to notify listeners to revert back to old values of keys + * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore + */ + private void notifyMetaListenersOnShutDown() { + try { + Configuration conf = threadLocalConf.get(); + Map m = threadLocalModifiedMetaConfKeys.get(); + + // Notify listeners of the changed value + for (Entry entry : m.entrySet()) { + String key = entry.getKey(); + // curr value becomes old and vice-versa + String currVal = entry.getValue(); + String oldVal = conf.get(key); + + if (!Objects.equals(oldVal, currVal)) { + notifyMetaListeners(key, oldVal, currVal); + } + } + } catch (MetaException e) { + LOG.error("Failed to notify meta listeners on shutdown: ", e); + } finally { + threadLocalModifiedMetaConfKeys.remove(); + logInfo("Meta listeners shutdown notification completed."); + } + } + + /** + * Save prev value of modified meta-conf key on threadLocalMap + */ + private void saveKey(String key, String val) { + Map m = threadLocalModifiedMetaConfKeys.get(); + if (!m.containsKey(key)) { + m.put(key, val); + } + } + public static void setThreadLocalIpAddress(String ipAddress) { threadLocalIpAddress.set(ipAddress); } @@ -513,6 +593,14 @@ private static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } + /** + * Set copy of invoking HMSHandler on thread local + */ + private static void setHMSHandler(HMSHandler handler) { + if (threadLocalHMSHandler.get() == null) { + threadLocalHMSHandler.set(handler); + } + } @Override public void setConf(Configuration conf) { threadLocalConf.set(conf); @@ -549,20 +637,13 @@ public void setMetaConf(String key, String value) throws MetaException { } Configuration configuration = getConf(); String oldValue = configuration.get(key); + // Save prev val of the key on threadLocal + saveKey(key, oldValue); + // Set invoking HMSHandler on threadLocal, this will be used later to notify + // metaListeners in HiveMetaStore#cleanupRawStore + setHMSHandler(this); configuration.set(key, value); - - for (MetaStoreEventListener listener : listeners) { - listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); - } - - if (transactionalListeners.size() > 0) { - // All the fields of this event are final, so no reason to create a new one for each - // listener - ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onConfigChange(cce); - } - } + notifyMetaListeners(key, oldValue, value); } @Override @@ -7509,15 +7590,20 @@ public void processContext(ServerContext serverContext, TTransport tTransport, T } private static void cleanupRawStore() { - RawStore rs = HMSHandler.getRawStore(); - if (rs != null) { - HMSHandler.logInfo("Cleaning up thread local RawStore..."); - try { + try { + RawStore rs = HMSHandler.getRawStore(); + if (rs != null) { + HMSHandler.logInfo("Cleaning up thread local RawStore..."); rs.shutdown(); - } finally { - HMSHandler.threadLocalConf.remove(); - HMSHandler.removeRawStore(); } + } finally { + HMSHandler handler = HMSHandler.threadLocalHMSHandler.get(); + if (handler != null) { + handler.notifyMetaListenersOnShutDown(); + HMSHandler.threadLocalHMSHandler.remove(); + } + HMSHandler.threadLocalConf.remove(); + HMSHandler.removeRawStore(); HMSHandler.logInfo("Done cleaning up thread local RawStore"); } }