diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dc333ca369..a57ec1e4a7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -511,7 +511,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Turn on ChangeManager, so delete files will go to cmrootdir."), REPLCMDIR("hive.repl.cmrootdir","/user/${system:user.name}/cmroot/", "Root dir for ChangeManager, used for deleted files."), - REPLCMRETIAN("hive.repl.cm.retain","7d", + REPLCMRETIAN("hive.repl.cm.retain","10d", new TimeValidator(TimeUnit.DAYS), "Time to retain removed files in cmrootdir."), REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot", @@ -1310,7 +1310,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal @Deprecated METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), - "time after which events will be removed from the database listener queue"), + "time after which events will be removed from the database listener queue when repl.cm.enabled \n" + + "is set to false. When repl.cm.enabled is set to true, repl.event.db.listener.timetolive is used instead"), /** * @deprecated Use MetastoreConf.EVENT_DB_NOTIFICATION_API_AUTH diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index d7757e6e37..cd3f6dc42b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -30,11 +30,13 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants; import org.apache.hadoop.hive.metastore.RawStore; @@ -161,6 +163,15 @@ private static synchronized void init(Configuration conf) throws MetaException { } } + @VisibleForTesting + public static synchronized void resetCleaner(HiveConf conf) throws Exception { + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)){ + cleaner.stopRun(); + cleaner = null; + init(conf); + } + } + public DbNotificationListener(Configuration config) throws MetaException { super(config); conf = config; @@ -184,8 +195,19 @@ public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException { TimeUnit.SECONDS); MetastoreConf.setTimeVar(getConf(), MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, time, TimeUnit.SECONDS); - cleaner.setTimeToLive(MetastoreConf.getTimeVar(getConf(), - MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, TimeUnit.SECONDS)); + boolean isReplEnabled = MetastoreConf.getBoolVar(getConf(), ConfVars.REPLCMENABLED); + if(!isReplEnabled){ + cleaner.setTimeToLive(MetastoreConf.getTimeVar(getConf(), ConfVars.EVENT_DB_LISTENER_TTL, + TimeUnit.SECONDS)); + } + } else if (key.equals(ConfVars.REPL_EVENT_DB_LISTENER_TTL.toString()) || + key.equals(ConfVars.REPL_EVENT_DB_LISTENER_TTL.getHiveName())) { + long time = MetastoreConf.convertTimeStr(tableEvent.getNewValue(), TimeUnit.SECONDS, + TimeUnit.SECONDS); + boolean isReplEnabled = MetastoreConf.getBoolVar(getConf(), ConfVars.REPLCMENABLED); + if(isReplEnabled){ + cleaner.setTimeToLive(time); + } } if (key.equals(ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL.toString()) || @@ -1190,13 +1212,21 @@ private void process(NotificationEvent event, ListenerEvent listenerEvent) throw private static class CleanerThread extends Thread { private RawStore rs; private int ttl; + private boolean shouldRun = true; private long sleepTime; CleanerThread(Configuration conf, RawStore rs) { super("DB-Notification-Cleaner"); this.rs = rs; - setTimeToLive(MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_TTL, - TimeUnit.SECONDS)); + boolean isReplEnabled = MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED); + if(isReplEnabled){ + setTimeToLive(MetastoreConf.getTimeVar(conf, ConfVars.REPL_EVENT_DB_LISTENER_TTL, + TimeUnit.SECONDS)); + } + else { + setTimeToLive(MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_TTL, + TimeUnit.SECONDS)); + } setCleanupInterval(MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS)); setDaemon(true); @@ -1204,7 +1234,7 @@ private void process(NotificationEvent event, ListenerEvent listenerEvent) throw @Override public void run() { - while (true) { + while (shouldRun) { try { rs.cleanNotificationEvents(ttl); rs.cleanWriteNotificationEvents(ttl); @@ -1235,5 +1265,9 @@ public void setCleanupInterval(long configInterval) { sleepTime = configInterval; } + @VisibleForTesting + private synchronized void stopRun(){ + shouldRun = false; + } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java index 4d94975945..19a56defd2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java @@ -25,6 +25,7 @@ import org.junit.rules.TestRule; import java.util.ArrayList; +import java.util.List; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,8 +33,10 @@ public class TestReplWithJsonMessageFormat extends TestReplicationScenarios { @Rule public TestRule replV1BackwardCompatibleRule = - new ReplicationV1CompatRule(metaStoreClient, hconf, - new ArrayList<>(Collections.singletonList("testEventFilters"))); + new ReplicationV1CompatRule(metaStoreClient, hconf, new ArrayList() {{ + add("testEventFilters"); + add("testReplConfiguredCleanupOfNotificationEvents"); + }}); @BeforeClass public static void setUpBeforeClass() throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index bef0a95071..ff6c13277c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.apache.hadoop.hive.metastore.*; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; @@ -102,6 +103,7 @@ import java.util.List; import java.util.Map; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import java.util.Base64; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -2162,6 +2164,149 @@ public void testConfiguredDeleteOfPrevDumpDir() throws IOException { verifySetupSteps = verifySetupOriginal; } + @Test + public void testReplConfiguredCleanupOfNotificationEvents() throws Exception { + + boolean verifySetupOriginal = verifySetupSteps; + verifySetupSteps = true; + final int CLEANER_TTL_SECONDS = 1; + final int CLEANER_INTERVAL_SECONDS = 1; + String nameOfTest = testName.getMethodName(); + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + //bootstrap + bootstrapLoadAndVerify(dbName, replDbName); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + // CM was enabled during setup, REPL_EVENT_DB_LISTENER_TTL should be used, set the other one to a low value + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANER_INTERVAL_SECONDS, TimeUnit.SECONDS); + DbNotificationListener.resetCleaner(hconf); + + //sleep to ensure correct conf(REPL_EVENT_DB_LISTENER_TTL) is used + try { + Thread.sleep(CLEANER_INTERVAL_SECONDS * 1000 * 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //verify events get replicated + Tuple incrDump = replDumpDb(dbName); + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT * from " + replDbName + ".unptned", unptnData, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + + // For next run, CM is enabled, set REPL_EVENT_DB_LISTENER_TTL to low value for events to get deleted + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS * 60 * 60, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS , TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANER_INTERVAL_SECONDS, TimeUnit.SECONDS); + DbNotificationListener.resetCleaner(hconf); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + try { + Thread.sleep(CLEANER_INTERVAL_SECONDS * 1000 * 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + incrDump = replDumpDb(dbName); + + // expected empty data because REPL_EVENT_DB_LISTENER_TTL should have been exceeded before dump + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty, driverMirror); + + // With CM disabled, EVENT_DB_LISTENER_TTL should be used. + // First check with high ttl + MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS * 60 * 60, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANER_INTERVAL_SECONDS, TimeUnit.SECONDS); + DbNotificationListener.resetCleaner(hconf); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + + //sleep to ensure correct conf(EVENT_DB_LISTENER_TTL) is used + try { + Thread.sleep(CLEANER_INTERVAL_SECONDS * 1000 * 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //check replication success + incrDump = replDumpDb(dbName); + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + + + //With CM disabled, set a low ttl for events to get deleted + MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, CLEANER_TTL_SECONDS * 60 * 60, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANER_INTERVAL_SECONDS, TimeUnit.SECONDS); + DbNotificationListener.resetCleaner(hconf); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + + try { + Thread.sleep(CLEANER_INTERVAL_SECONDS * 1000 * 10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //events should be deleted before dump + incrDump = replDumpDb(dbName); + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", empty, driverMirror); + + + //restore original values + MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, true); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 86400, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 7200, TimeUnit.SECONDS); + DbNotificationListener.resetCleaner(hconf); + verifySetupSteps = verifySetupOriginal; + } + @Test public void testIncrementalInsertToPartition() throws IOException { String testName = "incrementalInsertToPartition"; diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index cdbe919972..acf5e25549 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -600,8 +600,9 @@ public static ConfVars getMetaConf(String name) { + "present in HMS Notification. Any key-value pair whose key is matched with any regex will" +" be removed from Parameters map during Serialization of Table/Partition object."), EVENT_DB_LISTENER_TTL("metastore.event.db.listener.timetolive", - "hive.metastore.event.db.listener.timetolive", 7, TimeUnit.DAYS, - "time after which events will be removed from the database listener queue"), + "hive.metastore.event.db.listener.timetolive", 1, TimeUnit.DAYS, + "time after which events will be removed from the database listener queue when repl.cm.enabled \n" + + "is set to false. When set to true, the conf repl.event.db.listener.timetolive is used instead."), EVENT_CLEAN_MAX_EVENTS("metastore.event.db.clean.maxevents", "hive.metastore.event.db.clean.maxevents", 10000, "Limit on number events to be cleaned at a time in metastore cleanNotificationEvents " + @@ -975,7 +976,7 @@ public static ConfVars getMetaConf(String name) { REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir", "hive.repl.cm.nonencryptionzone.rootdir", "", "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."), - REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS, + REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24 * 10, TimeUnit.HOURS, "Time to retain removed files in cmrootdir."), REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS, "Inteval for cmroot cleanup thread."), @@ -991,6 +992,10 @@ public static ConfVars getMetaConf(String name) { "hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."), + REPL_EVENT_DB_LISTENER_TTL("metastore.repl.event.db.listener.timetolive", + "hive.repl.event.db.listener.timetolive", 10, TimeUnit.DAYS, + "time after which events will be removed from the database listener queue when repl.cm.enabled \n" + + "is set to true. When set to false, the conf event.db.listener.timetolive is used instead."), REPL_METRICS_CACHE_MAXSIZE("metastore.repl.metrics.cache.maxsize", "hive.repl.metrics.cache.maxsize", 10000 /*10000 rows */, "Maximum in memory cache size to collect replication metrics. The metrics will be pushed to persistent"