diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java index fd4527e653..f3849914a7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java @@ -81,6 +81,8 @@ private static final String dbName = "hive2038"; private static final String tblName = "tmptbl"; private static final String renamed = "tmptbl2"; + private static final String metaConfKey = "hive.metastore.partition.name.whitelist.pattern"; + private static final String metaConfVal = ""; @Override protected void setUp() throws Exception { @@ -93,9 +95,11 @@ protected void setUp() throws Exception { DummyPreListener.class.getName()); int port = MetaStoreUtils.findFreePort(); - MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge()); - hiveConf = new HiveConf(this.getClass()); + + hiveConf.setVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN, metaConfVal); + MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); @@ -455,4 +459,66 @@ public void testListener() throws Exception { assertEquals("true", event.getOldValue()); assertEquals("false", event.getNewValue()); } + + public void testMetaConfNotifyListenersClosingClient() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, "[test pattern modified]"); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), metaConfVal); + assertEquals(event.getNewValue(), "[test pattern modified]"); + closingClient.close(); + + Thread.sleep(5 * 1000); + + event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), "[test pattern modified]"); + assertEquals(event.getNewValue(), metaConfVal); + } + + public void testMetaConfNotifyListenersNonClosingClient() throws Exception { + HiveMetaStoreClient nonClosingClient = new HiveMetaStoreClient(hiveConf, null); + nonClosingClient.setMetaConf(metaConfKey, "[test pattern modified]"); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), metaConfVal); + assertEquals(event.getNewValue(), "[test pattern modified]"); + // This should also trigger meta listener notification via TServerEventHandler#deleteContext + nonClosingClient.getTTransport().close(); + + Thread.sleep(5 * 1000); + + event = (ConfigChangeEvent) DummyListener.getLastEvent(); + assertEquals(event.getOldValue(), "[test pattern modified]"); + assertEquals(event.getNewValue(), metaConfVal); + } + + public void testMetaConfDuplicateNotification() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, metaConfVal); + int beforeCloseNotificationEventCounts = DummyListener.notifyList.size(); + closingClient.close(); + + Thread.sleep(5 * 1000); + + int afterCloseNotificationEventCounts = DummyListener.notifyList.size(); + // Setting key to same value, should not trigger configChange event during shutdown + assertEquals(beforeCloseNotificationEventCounts, afterCloseNotificationEventCounts); + } + + public void testMetaConfSameHandler() throws Exception { + HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null); + closingClient.setMetaConf(metaConfKey, "[test pattern modified]"); + ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent(); + int beforeCloseNotificationEventCounts = DummyListener.notifyList.size(); + HiveMetaStore.HMSHandler beforeHandler = event.getHandler(); + closingClient.close(); + + Thread.sleep(5 * 1000); + event = (ConfigChangeEvent) DummyListener.getLastEvent(); + int afterCloseNotificationEventCounts = DummyListener.notifyList.size(); + HiveMetaStore.HMSHandler afterHandler = event.getHandler(); + // Meta-conf cleanup should trigger an event to listener + assertNotSame(beforeCloseNotificationEventCounts, afterCloseNotificationEventCounts); + // Both the handlers should be same + assertEquals(beforeHandler, afterHandler); + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 58b9044930..bbbcebef05 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -40,6 +40,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.Timer; @@ -275,6 +277,17 @@ protected Configuration initialValue() { } }; + /** + * Thread local HMSHandler used during shutdown to notify meta listeners + */ + private static final ThreadLocal threadLocalHMSHandler = new ThreadLocal<>(); + + /** + * Thread local Map to keep track of modified meta conf keys + */ + private static final ThreadLocal> threadLocalModifiedConfig = + new ThreadLocal<>(); + private static ExecutorService threadPool; public static final String AUDIT_FORMAT = @@ -344,6 +357,55 @@ protected String initialValue() { } }; + /** + * Internal function to notify listeners for meta config change events + */ + private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException { + for (MetaStoreEventListener listener : listeners) { + listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue)); + } + + if (transactionalListeners.size() > 0) { + // All the fields of this event are final, so no reason to create a new one for each + // listener + ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onConfigChange(cce); + } + } + } + + /** + * Internal function to notify listeners to revert back to old values of keys + * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore + */ + private void notifyMetaListenersOnShutDown() { + Map modifiedConf = threadLocalModifiedConfig.get(); + if (modifiedConf == null) { + // Nothing got modified + return; + } + try { + Configuration conf = threadLocalConf.get(); + if (conf == null) { + throw new MetaException("Unexpected: modifiedConf is non-null but conf is null"); + } + // Notify listeners of the changed value + for (Entry entry : modifiedConf.entrySet()) { + String key = entry.getKey(); + // curr value becomes old and vice-versa + String currVal = entry.getValue(); + String oldVal = conf.get(key); + if (!Objects.equals(oldVal, currVal)) { + notifyMetaListeners(key, oldVal, currVal); + } + } + logInfo("Meta listeners shutdown notification completed."); + } catch (MetaException e) { + LOG.error("Failed to notify meta listeners on shutdown: ", e); + } + } + public static void setThreadLocalIpAddress(String ipAddress) { threadLocalIpAddress.set(ipAddress); } @@ -513,6 +575,14 @@ private static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } + /** + * Set copy of invoking HMSHandler on thread local + */ + private static void setHMSHandler(HMSHandler handler) { + if (threadLocalHMSHandler.get() == null) { + threadLocalHMSHandler.set(handler); + } + } @Override public void setConf(Configuration conf) { threadLocalConf.set(conf); @@ -532,6 +602,15 @@ public Configuration getConf() { return conf; } + private Map getModifiedConf() { + Map modifiedConf = threadLocalModifiedConfig.get(); + if (modifiedConf == null) { + modifiedConf = new HashMap(); + threadLocalModifiedConfig.set(modifiedConf); + } + return modifiedConf; + } + public Warehouse getWh() { return wh; } @@ -549,20 +628,16 @@ public void setMetaConf(String key, String value) throws MetaException { } Configuration configuration = getConf(); String oldValue = configuration.get(key); + // Save prev val of the key on threadLocal + Map modifiedConf = getModifiedConf(); + if (!modifiedConf.containsKey(key)) { + modifiedConf.put(key, oldValue); + } + // Set invoking HMSHandler on threadLocal, this will be used later to notify + // metaListeners in HiveMetaStore#cleanupRawStore + setHMSHandler(this); configuration.set(key, value); - - for (MetaStoreEventListener listener : listeners) { - listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value)); - } - - if (transactionalListeners.size() > 0) { - // All the fields of this event are final, so no reason to create a new one for each - // listener - ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onConfigChange(cce); - } - } + notifyMetaListeners(key, oldValue, value); } @Override @@ -7509,15 +7584,21 @@ public void processContext(ServerContext serverContext, TTransport tTransport, T } private static void cleanupRawStore() { - RawStore rs = HMSHandler.getRawStore(); - if (rs != null) { - HMSHandler.logInfo("Cleaning up thread local RawStore..."); - try { + try { + RawStore rs = HMSHandler.getRawStore(); + if (rs != null) { + HMSHandler.logInfo("Cleaning up thread local RawStore..."); rs.shutdown(); - } finally { - HMSHandler.threadLocalConf.remove(); - HMSHandler.removeRawStore(); } + } finally { + HMSHandler handler = HMSHandler.threadLocalHMSHandler.get(); + if (handler != null) { + handler.notifyMetaListenersOnShutDown(); + } + HMSHandler.threadLocalHMSHandler.remove(); + HMSHandler.threadLocalConf.remove(); + HMSHandler.threadLocalModifiedConfig.remove(); + HMSHandler.removeRawStore(); HMSHandler.logInfo("Done cleaning up thread local RawStore"); } }