diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index b287d43..4313e12 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -148,6 +148,19 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { cleaner.setTimeToLive(MetastoreConf.getTimeVar(getConf(), MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, TimeUnit.SECONDS)); } + + if (key.equals(ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL.toString()) || + key.equals(ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL.getHiveName())) { + // This weirdness of setting it in our conf and then reading back does two things. + // One, it handles the conversion of the TimeUnit. Two, it keeps the value around for + // later in case we need it again. + long time = MetastoreConf.convertTimeStr(tableEvent.getNewValue(), TimeUnit.SECONDS, + TimeUnit.SECONDS); + MetastoreConf.setTimeVar(getConf(), MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, time, + TimeUnit.SECONDS); + cleaner.setCleanupInterval(MetastoreConf.getTimeVar(getConf(), + MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS)); + } } /** @@ -913,13 +926,15 @@ private void process(NotificationEvent event, ListenerEvent listenerEvent) throw private static class CleanerThread extends Thread { private RawStore rs; private int ttl; - static private long sleepTime = 60000; + private long sleepTime; CleanerThread(Configuration conf, RawStore rs) { super("DB-Notification-Cleaner"); this.rs = rs; setTimeToLive(MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_TTL, TimeUnit.SECONDS)); + setCleanupInterval(MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, + TimeUnit.MILLISECONDS)); setDaemon(true); } @@ -955,5 +970,9 @@ public void setTimeToLive(long configTtl) { } } + public void setCleanupInterval(long configInterval) { + sleepTime = configInterval; + } + } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index b05d975..a00ea17 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.concurrent.TimeUnit; import java.lang.reflect.Field; import java.nio.file.Paths; import java.util.ArrayList; @@ -60,6 +61,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; @@ -259,16 +261,7 @@ public static void connectToMetastore() throws Exception { conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName()); - Class dbNotificationListener = - Class.forName("org.apache.hive.hcatalog.listener.DbNotificationListener"); - Class[] classes = dbNotificationListener.getDeclaredClasses(); - for (Class c : classes) { - if (c.getName().endsWith("CleanerThread")) { - Field sleepTimeField = c.getDeclaredField("sleepTime"); - sleepTimeField.setAccessible(true); - sleepTimeField.set(null, CLEANUP_SLEEP_TIME * 1000); - } - } + MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANUP_SLEEP_TIME, TimeUnit.SECONDS); conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); SessionState.start(new CliSessionState(conf)); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index f180a15..1795ef7 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -525,6 +525,9 @@ public static ConfVars getMetaConf(String name) { EVENT_DB_LISTENER_TTL("metastore.event.db.listener.timetolive", "hive.metastore.event.db.listener.timetolive", 86400, TimeUnit.SECONDS, "time after which events will be removed from the database listener queue"), + EVENT_DB_LISTENER_CLEAN_INTERVAL("metastore.event.db.listener.clean.interval", + "hive.metastore.event.db.listener.clean.interval", 7200, TimeUnit.SECONDS, + "sleep interval between each run for cleanup of events from the database listener queue"), EVENT_DB_NOTIFICATION_API_AUTH("metastore.metastore.event.db.notification.api.auth", "hive.metastore.event.db.notification.api.auth", true, "Should metastore do authorization against database notification related APIs such as get_next_notification.\n" +