diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 61db90c437..4e0f75da1d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2927,25 +2927,25 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new RangeValidator(0, 100), "Determines how many attempted compaction records will be " + "retained in compaction history for a given table/partition."), /** - * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_REAPER_INTERVAL + * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL */ @Deprecated COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m", new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"), /** - * @deprecated Use MetastoreConf.TIMEDOUT_TXN_REAPER_START + * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_START */ @Deprecated HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), /** - * @deprecated Use MetastoreConf.TIMEDOUT_TXN_REAPER_INTERVAL + * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL */ @Deprecated HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), /** - * @deprecated Use MetastoreConf.WRITE_SET_REAPER_INTERVAL + * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL */ @Deprecated WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 255780994b..fa2ede3738 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -114,7 +114,9 @@ public void run() { Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, compactionInterval) - .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet()); + .stream() + .filter(ci -> isEligibleForCompaction(ci, currentCompactions)) + .collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -153,12 +155,6 @@ public void run() { // Check for timed out remote workers. recoverFailedCompactions(true); - - // Clean anything from the txns table that has no components left in txn_components. - txnHandler.cleanEmptyAbortedAndCommittedTxns(); - - // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. - txnHandler.cleanTxnToWriteIdTable(); } catch (Throwable t) { LOG.error("Initiator loop caught unexpected exception this time through the loop: " + StringUtils.stringifyException(t)); @@ -437,7 +433,7 @@ private static boolean checkDynPartitioning(Table t, CompactionInfo ci){ return false; } - private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse currentCompactions) { + private boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions) { LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); // Check if we already have initiated or are working on a compaction for this partition diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 48bf8529fa..366282a30f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -47,7 +48,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -1026,6 +1027,7 @@ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tb runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')"); } hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON, true); int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); AtomicBoolean stop = new AtomicBoolean(true); @@ -1045,9 +1047,9 @@ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tb checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler)); hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS); - AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService(); - compactionHistoryService.setConf(hiveConf); - compactionHistoryService.run(); + MetastoreTaskThread houseKeeper = new AcidHouseKeeperService(); + houseKeeper.setConf(hiveConf); + houseKeeper.run(); checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler)); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR)); @@ -1060,7 +1062,7 @@ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tb numAttemptedCompactions++; checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler)); - compactionHistoryService.run(); + houseKeeper.run(); //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here) checkCompactionState(new CompactionsByState( hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), @@ -1084,7 +1086,7 @@ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tb hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler)); runCleaner(hiveConf); // transition to Success state - compactionHistoryService.run(); + houseKeeper.run(); checkCompactionState(new CompactionsByState( hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0, diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 5b8c6701e1..e1f669ac3c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -430,10 +429,6 @@ public void testCompactionAbort() throws Exception { Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0)); runCleaner(hiveConf); - runInitiator(hiveConf);//to make sure any (which is not in this case) - // 'empty aborted' TXNS metadata is removed - openResp = txnHandler.getOpenTxns(); - Assert.assertEquals(openResp.toString(), 1, openResp.getOpen_txnsSize()); //we still have 1 aborted (compactor) txn Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0)); Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, @@ -464,10 +459,6 @@ public void testCompactionAbort() throws Exception { //delete metadata about aborted txn from txn_components and files (if any) runCleaner(hiveConf); - runInitiator(hiveConf);//to clean 'empty aborted' - openResp = txnHandler.getOpenTxns(); - //now the aborted compactor txn is gone - Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize()); } /** @@ -479,15 +470,9 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain - hiveConf.set("metastore.txn.opentxn.timeout", "1"); - runInitiator(hiveConf); - hiveConf.set("metastore.txn.opentxn.timeout", "1000"); - assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -504,14 +489,9 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain - hiveConf.set("metastore.txn.opentxn.timeout", "1"); - runInitiator(hiveConf); - hiveConf.set("metastore.txn.opentxn.timeout", "1000"); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index eac2c6307f..535bf113c4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -465,11 +465,6 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); - // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain - Thread.sleep(1000); - runInitiator(hiveConf); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); // Initiate a major compaction request on the table. runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); @@ -478,16 +473,11 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception runWorker(hiveConf); verifyDirAndResult(2, true); - // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain - Thread.sleep(1000); - runInitiator(hiveConf); // Run Cleaner. runCleaner(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); verifyDirAndResult(0, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 1687425bcb..2adabe7058 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; +import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.ql.TestTxnCommands2; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -1167,7 +1167,7 @@ public void testWriteSetTracking4() throws Exception { Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from \"WRITE_SET\""), 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\"")); - AcidWriteSetService houseKeeper = new AcidWriteSetService(); + MetastoreTaskThread houseKeeper = new AcidHouseKeeperService(); houseKeeper.setConf(conf); houseKeeper.run(); //since T3 overlaps with Long Running (still open) GC does nothing @@ -1277,7 +1277,7 @@ public void testWriteSetTracking6() throws Exception { txnMgr.openTxn(ctx, "Long Running"); Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); // Now we can clean the write_set - MetastoreTaskThread writeSetService = new AcidWriteSetService(); + MetastoreTaskThread writeSetService = new AcidHouseKeeperService(); writeSetService.setConf(conf); writeSetService.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\"")); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 058430fc5a..279de19744 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -19,26 +19,21 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -202,38 +197,6 @@ public void noCompactOnManyDifferentPartitionAborts() throws Exception { Assert.assertEquals(0, rsp.getCompactsSize()); } - @Test - public void cleanEmptyAbortedTxns() throws Exception { - // Test that we are cleaning aborted transactions with no components left in txn_components. - // Put one aborted transaction with an entry in txn_components to make sure we don't - // accidently clean it too. - Table t = newTable("default", "ceat", false); - - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); - comp.setTablename("ceat"); - comp.setOperationType(DataOperationType.UPDATE); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - txnHandler.setOpenTxnTimeOutMillis(30000); - conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); - OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( - TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); - txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); - GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); - Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); - txnHandler.setOpenTxnTimeOutMillis(1); - startInitiator(); - - openTxns = txnHandler.getOpenTxns(); - // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last - Assert.assertEquals(2, openTxns.getOpen_txnsSize()); - } - /** * Test that HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD triggers compaction. * diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 842b7fe53d..f1e99b5c84 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -86,17 +86,14 @@ static final String METASTORE_DELEGATION_MANAGER_CLASS = "org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager"; @VisibleForTesting - static final String ACID_COMPACTION_HISTORY_SERVICE_CLASS = - "org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService"; - @VisibleForTesting static final String ACID_HOUSE_KEEPER_SERVICE_CLASS = "org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService"; @VisibleForTesting + static final String ACID_TXN_CLEANER_SERVICE_CLASS = + "org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService"; + @VisibleForTesting static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS = "org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService"; - @VisibleForTesting - static final String ACID_WRITE_SET_SERVICE_CLASS = - "org.apache.hadoop.hive.metastore.txn.AcidWriteSetService"; public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME = "metastore.authentication.ldap.userMembershipKey"; @@ -271,6 +268,15 @@ public static ConfVars getMetaConf(String name) { public enum ConfVars { // alpha order, PLEASE! + ACID_HOUSEKEEPER_SERVICE_INTERVAL("metastore.acid.housekeeper.interval", + "hive.metastore.acid.housekeeper.interval", 60, TimeUnit.SECONDS, + "Time interval describing how often the acid housekeeper runs."), + ACID_HOUSEKEEPER_SERVICE_START("metastore.acid.housekeeper.start", + "hive.metastore.acid.housekeeper.start", 60, TimeUnit.SECONDS, + "Time delay of 1st acid housekeeper run after metastore has started."), + ACID_TXN_CLEANER_INTERVAL("metastore.acid.txn.cleaner.interval", + "hive.metastore.acid.txn.cleaner.interval", 10, TimeUnit.SECONDS, + "Time interval describing how often aborted and committed txns are cleaned."), ADDED_JARS("metastore.added.jars.path", "hive.added.jars.path", "", "This an internal parameter."), AGGREGATE_STATS_CACHE_CLEAN_UNTIL("metastore.aggregate.stats.cache.clean.until", @@ -382,9 +388,6 @@ public static ConfVars getMetaConf(String name) { "has an infinite lifetime."), CLIENT_SOCKET_TIMEOUT("metastore.client.socket.timeout", "hive.metastore.client.socket.timeout", 600, TimeUnit.SECONDS, "MetaStore Client socket timeout in seconds"), - COMPACTOR_HISTORY_REAPER_INTERVAL("metastore.compactor.history.reaper.interval", - "hive.compactor.history.reaper.interval", 2, TimeUnit.MINUTES, - "Determines how often compaction history reaper runs"), COMPACTOR_HISTORY_RETENTION_ATTEMPTED("metastore.compactor.history.retention.attempted", "hive.compactor.history.retention.attempted", 2, new RangeValidator(0, 100), "Determines how many attempted compaction records will be " + @@ -1059,12 +1062,11 @@ public static ConfVars getMetaConf(String name) { "or in server mode. They must implement " + METASTORE_TASK_THREAD_CLASS), TASK_THREADS_REMOTE_ONLY("metastore.task.threads.remote", "metastore.task.threads.remote", ACID_HOUSE_KEEPER_SERVICE_CLASS + "," + + ACID_TXN_CLEANER_SERVICE_CLASS + "," + ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," + - ACID_COMPACTION_HISTORY_SERVICE_CLASS + "," + - ACID_WRITE_SET_SERVICE_CLASS + "," + MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," + PARTITION_MANAGEMENT_TASK_CLASS, - "Command separated list of tasks that will be started in separate threads. These will be" + + "Comma-separated list of tasks that will be started in separate threads. These will be" + " started only when the metastore is running as a separate service. They must " + "implement " + METASTORE_TASK_THREAD_CLASS), TCP_KEEP_ALIVE("metastore.server.tcp.keepalive", @@ -1150,12 +1152,6 @@ public static ConfVars getMetaConf(String name) { "metastore. SEQUENTIAL implies that the first valid metastore from the URIs specified " + "through hive.metastore.uris will be picked. RANDOM implies that the metastore " + "will be picked randomly"), - TIMEDOUT_TXN_REAPER_START("metastore.timedout.txn.reaper.start", - "hive.timedout.txn.reaper.start", 100, TimeUnit.SECONDS, - "Time delay of 1st reaper run after metastore start"), - TIMEDOUT_TXN_REAPER_INTERVAL("metastore.timedout.txn.reaper.interval", - "hive.timedout.txn.reaper.interval", 180, TimeUnit.SECONDS, - "Time interval describing how often the reaper runs"), TOKEN_SIGNATURE("metastore.token.signature", "hive.metastore.token.signature", "", "The delegation token service name to match when selecting a token from the current user's tokens."), METASTORE_CACHE_CAN_USE_EVENT("metastore.cache.can.use.event", "hive.metastore.cache.can.use.event", false, @@ -1252,9 +1248,6 @@ public static ConfVars getMetaConf(String name) { "hive.metastore.warehouse.external.dir", "", "Default location for external tables created in the warehouse. " + "If not set or null, then the normal warehouse location will be used as the default location."), - WRITE_SET_REAPER_INTERVAL("metastore.writeset.reaper.interval", - "hive.writeset.reaper.interval", 60, TimeUnit.SECONDS, - "Frequency of WriteSet reaper runs"), WM_DEFAULT_POOL_SIZE("metastore.wm.default.pool.size", "hive.metastore.wm.default.pool.size", 4, "The size of a default pool to create when creating an empty resource plan;\n" + diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 7bba8d6ee6..17732b0e8e 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -10293,8 +10293,6 @@ private static void logCompactionParameters(Configuration conf) { MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_WORKER_THREADS)); HMSHandler.LOG .info("hive.metastore.runworker.in = {}", MetastoreConf.getVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN)); - HMSHandler.LOG.info("metastore.compactor.history.reaper.interval = {}", - MetastoreConf.getTimeVar(conf, ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MINUTES)); HMSHandler.LOG.info("metastore.compactor.history.retention.attempted = {}", MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)); HMSHandler.LOG.info("metastore.compactor.history.retention.failed = {}", diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index d35c9602a6..4c2d5e31b3 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -71,10 +71,10 @@ public void run() { LOG.debug("Number of materialization locks deleted: " + removedCnt); } } - } catch(Throwable t) { - LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } catch (Throwable t) { + LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); } finally { - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java index c4a488bac0..177abb7d61 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java @@ -17,27 +17,34 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.commons.lang3.Functions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; +import static org.apache.commons.lang3.Functions.FailableRunnable; + /** * Performs background tasks for Transaction management in Hive. * Runs inside Hive Metastore Service. */ public class AcidHouseKeeperService implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class); private Configuration conf; + private boolean isCompactorEnabled; private TxnStore txnHandler; @Override public void setConf(Configuration configuration) { - this.conf = configuration; + conf = configuration; + isCompactorEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON); txnHandler = TxnUtils.getTxnStore(conf); } @@ -48,7 +55,7 @@ public Configuration getConf() { @Override public long runFrequency(TimeUnit unit) { - return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TIMEDOUT_TXN_REAPER_INTERVAL, unit); + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_HOUSEKEEPER_SERVICE_INTERVAL, unit); } @Override @@ -56,16 +63,35 @@ public void run() { TxnStore.MutexAPI.LockHandle handle = null; try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name()); - long startTime = System.currentTimeMillis(); - txnHandler.performTimeOuts(); - LOG.debug("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + - "seconds."); - } catch(Throwable t) { - LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + LOG.info("Starting to run AcidHouseKeeperService."); + long start = System.currentTimeMillis(); + cleanTheHouse(); + LOG.debug("Total time AcidHouseKeeperService took: {} seconds.", elapsedSince(start)); + } catch (Throwable t) { + LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); } finally { - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } } } + + private void cleanTheHouse() { + performTask(txnHandler::performTimeOuts, "Cleaning timed out txns and locks"); + performTask(txnHandler::performWriteSetGC, "Cleaning obsolete write set entries"); + performTask(txnHandler::cleanTxnToWriteIdTable, "Cleaning obsolete TXN_TO_WRITE_ID entries"); + if (isCompactorEnabled) { + performTask(txnHandler::purgeCompactionHistory, "Cleaning obsolete compaction history entries"); + } + } + + private void performTask(FailableRunnable task, String description) { + long start = System.currentTimeMillis(); + Functions.run(task); + LOG.debug("{} took {} seconds.", description, elapsedSince(start)); + } + + private long elapsedSince(long start) { + return (System.currentTimeMillis() - start) / 1000; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java index 2ad5a89f03..fe78800439 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java @@ -29,7 +29,9 @@ * Runs inside Hive Metastore Service. */ public class AcidOpenTxnsCounterService implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class); + private static final int LOG_INTERVAL_MS = 60 * 1000; private Configuration conf; private int isAliveCounter = 0; @@ -44,18 +46,17 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { try { - long startTime = System.currentTimeMillis(); + long start = System.currentTimeMillis(); isAliveCounter++; txnHandler.countOpenTxns(); - if (System.currentTimeMillis() - lastLogTime > 60 * 1000) { - LOG.info("AcidOpenTxnsCounterService ran for " + - ((System.currentTimeMillis() - startTime) / 1000) + - " seconds. isAliveCounter = " + isAliveCounter); - lastLogTime = System.currentTimeMillis(); + long now = System.currentTimeMillis(); + if (now - lastLogTime > LOG_INTERVAL_MS) { + LOG.info("Open txn counter ran for {} seconds. isAliveCounter: {}", (now - start) / 1000, isAliveCounter); + lastLogTime = now; } } - catch(Throwable t) { - LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + catch (Throwable t) { + LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java similarity index 70% rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java rename to standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java index e96a7ba289..d2800bd008 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java @@ -26,17 +26,19 @@ import java.util.concurrent.TimeUnit; /** - * Purges obsolete items from compaction history data + * Periodically cleans out empty aborted and committed txns from the TXNS table. + * Runs inside Hive Metastore Service. */ -public class AcidCompactionHistoryService implements MetastoreTaskThread { - private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class); +public class AcidTxnCleanerService implements MetastoreTaskThread { + + private static final Logger LOG = LoggerFactory.getLogger(AcidTxnCleanerService.class); private Configuration conf; private TxnStore txnHandler; @Override public void setConf(Configuration configuration) { - this.conf = configuration; + conf = configuration; txnHandler = TxnUtils.getTxnStore(conf); } @@ -47,25 +49,27 @@ public Configuration getConf() { @Override public long runFrequency(TimeUnit unit) { - return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, - unit); + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_TXN_CLEANER_INTERVAL, unit); } @Override public void run() { TxnStore.MutexAPI.LockHandle handle = null; try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name()); - long startTime = System.currentTimeMillis(); - txnHandler.purgeCompactionHistory(); - LOG.debug("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + - "seconds."); - } catch(Throwable t) { - LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name()); + long start = System.currentTimeMillis(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start)); + } catch (Throwable t) { + LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); } finally { - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } } } + + private long elapsedSince(long start) { + return (System.currentTimeMillis() - start) / 1000; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java deleted file mode 100644 index 5ec513dfd4..0000000000 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.MetastoreTaskThread; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Periodically cleans WriteSet tracking information used in Transaction management - */ -public class AcidWriteSetService implements MetastoreTaskThread { - private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class); - - private Configuration conf; - private TxnStore txnHandler; - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - txnHandler = TxnUtils.getTxnStore(conf); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public long runFrequency(TimeUnit unit) { - return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.WRITE_SET_REAPER_INTERVAL, unit); - } - - @Override - public void run() { - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name()); - long startTime = System.currentTimeMillis(); - txnHandler.performWriteSetGC(); - LOG.debug("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds."); - } catch(Throwable t) { - LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); - } finally { - if(handle != null) { - handle.releaseLocks(); - } - } - } -} diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 8fded608d0..a945857a5d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -3592,15 +3592,14 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc * operations. * - * HIVE_LOCKS is (presumably) expected to be removed by AcidHouseKeeperServices - * WS_SET is (presumably) expected to be removed by AcidWriteSetService + * HIVE_LOCKS and WS_SET are cleaned up by {@link AcidHouseKeeperService}, if turned on */ @Override @RetrySemantics.Idempotent public void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator partitionIterator) throws MetaException { - // cleanup should be done only for objecdts belonging to default catalog + // cleanup should be done only for objects belonging to default catalog final String defaultCatalog = getDefaultCatalog(conf); try { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 28f22e6371..1e177f4a7b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -45,8 +45,8 @@ String TXN_KEY_START = "_meta"; enum MUTEX_KEY { - Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, - WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild + Initiator, Cleaner, HouseKeeper, CheckLock, TxnCleaner, + CompactionScheduler, WriteIdAllocator, MaterializationRebuild } // Compactor states (Should really be enum) String INITIATED_RESPONSE = "initiated"; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java index 9905a14983..c73de779a3 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService; import org.hamcrest.CoreMatchers; import org.hamcrest.core.StringContains; import org.hamcrest.core.StringEndsWith; @@ -47,10 +48,8 @@ import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask; import org.apache.hadoop.hive.metastore.events.EventCleanerTask; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; -import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService; -import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; @Category(MetastoreUnitTest.class) public class TestMetastoreConf { @@ -465,13 +464,11 @@ public void testClassNames() { EventCleanerTask.class.getName()); Assert.assertEquals(MetastoreConf.METASTORE_DELEGATION_MANAGER_CLASS, MetastoreDelegationTokenManager.class.getName()); - Assert.assertEquals(MetastoreConf.ACID_COMPACTION_HISTORY_SERVICE_CLASS, - AcidCompactionHistoryService.class.getName()); Assert.assertEquals(MetastoreConf.ACID_HOUSE_KEEPER_SERVICE_CLASS, AcidHouseKeeperService.class.getName()); + Assert.assertEquals(MetastoreConf.ACID_TXN_CLEANER_SERVICE_CLASS, + AcidTxnCleanerService.class.getName()); Assert.assertEquals(MetastoreConf.ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS, AcidOpenTxnsCounterService.class.getName()); - Assert.assertEquals(MetastoreConf.ACID_WRITE_SET_SERVICE_CLASS, - AcidWriteSetService.class.getName()); } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java new file mode 100644 index 0000000000..ba8ba734a5 --- /dev/null +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +import static java.util.Collections.singletonList; + +/** + * Testing whether AcidTxnCleanerService removes the correct records + * from the TXNS table (via TxnStore). + */ +public class TestAcidTxnCleanerService { + + private AcidTxnCleanerService underTest; + private TxnStore txnHandler; + private Configuration conf; + + @Before + public void setUp() throws Exception { + conf = MetastoreConf.newMetastoreConf(); + underTest = new AcidTxnCleanerService(); + underTest.setConf(conf); + txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.setOpenTxnTimeOutMillis(100); + TxnDbUtil.prepDb(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(conf); + } + + @Test + public void cleansEmptyAbortedTxns() throws Exception { + for (int i = 0; i < 5; ++i) { + long txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + // +1 represents the initial TXNS record (txnid=0) + Assert.assertEquals(5 + 1, getTxnCount()); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2); + + underTest.run(); + + // always leaves the MAX(TXN_ID) in the TXNS table + Assert.assertEquals(1, getTxnCount()); + Assert.assertEquals(5, getMaxTxnId()); + } + + @Test + public void doesNotCleanAbortedTxnsThatAreNonEmpty() throws Exception { + for (int i = 0; i < 5; ++i) { + openNonEmptyThenAbort(); + } + Assert.assertEquals(5 + 1, getTxnCount()); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2); + + underTest.run(); + + // deletes only the initial (committed) TXNS record + Assert.assertEquals(5, getTxnCount()); + Assert.assertEquals(5, getMaxTxnId()); + } + + @Test + public void cleansAllCommittedTxns() throws Exception { + for (int i = 0; i < 5; ++i) { + long txnid = openTxn(); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + } + Assert.assertEquals(5 + 1, getTxnCount()); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2); + + underTest.run(); + + // always leaves the MAX(TXN_ID) in the TXNS table + Assert.assertEquals(1, getTxnCount()); + Assert.assertEquals(5, getMaxTxnId()); + } + + @Test + public void cleansCommittedAndEmptyAbortedOnly() throws Exception { + for (int i = 0; i < 5; ++i) { + // commit one + long txnid = openTxn(); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + // abort one empty + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + // abort one non-empty + openNonEmptyThenAbort(); + } + Assert.assertEquals(15 + 1, getTxnCount()); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2); + + underTest.run(); + + // kept only the 5 non-empty aborted ones + Assert.assertEquals(5, getTxnCount()); + Assert.assertEquals(15, getMaxTxnId()); + } + + @Test + public void cleansEmptyAbortedBatchTxns() throws Exception { + // add one non-empty aborted txn + openNonEmptyThenAbort(); + // add a batch of empty, aborted txns + txnHandler.setOpenTxnTimeOutMillis(30000); + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.TXN_MAX_OPEN_BATCH, + TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); + OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( + TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); + txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); + txnHandler.setOpenTxnTimeOutMillis(1); + + underTest.run(); + + openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); + Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, getMaxTxnId()); + } + + private void openNonEmptyThenAbort() throws MetaException, NoSuchTxnException, TxnAbortedException { + long txnid = openTxn(); + LockRequest req = getLockRequest(); + req.setTxnid(txnid); + txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + } + + private long openTxn() throws MetaException { + return txnHandler + .openTxns(new OpenTxnRequest(1, "me", "localhost")) + .getTxn_ids() + .get(0); + } + + private LockRequest getLockRequest() { + LockComponent comp = new LockComponentBuilder() + .setDbName("default") + .setTableName("ceat") + .setOperationType(DataOperationType.UPDATE) + .setSharedWrite() + .build(); + return new LockRequest(singletonList(comp), "me", "localhost"); + } + + private long getTxnCount() throws Exception { + return TxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"TXNS\""); + } + + private long getMaxTxnId() throws Exception { + return TxnDbUtil.countQueryAgent(conf, "SELECT MAX(\"TXN_ID\") FROM \"TXNS\""); + } +}