diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4d71eb4f4d..30312aa887 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1229,10 +1229,8 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( - delta.minWriteId, delta.maxWriteId)) { + if(ValidWriteIdList.RangeResponse.ALL == + writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } if (writeIdList.isWriteIdRangeValid( diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3565616171..f7eefde965 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -18,7 +18,12 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -32,9 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,13 +50,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,9 +63,6 @@ private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; - // List of compactions to clean. - private Map> compactId2LockMap = new HashMap<>(); - private Map compactId2CompactInfoMap = new HashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { @@ -96,95 +89,9 @@ public void run() { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - // First look for all the compactions that are waiting to be cleaned. If we have not - // seen an entry before, look for all the locks held on that table or partition and - // record them. We will then only clean the partition once all of those locks have been - // released. This way we avoid removing the files while they are in use, - // while at the same time avoiding starving the cleaner as new readers come along. - // This works because we know that any reader who comes along after the worker thread has - // done the compaction will read the more up to date version of the data (either in a - // newer delta or in a newer base). - List toClean = txnHandler.findReadyToClean(); - { - /** - * Since there may be more than 1 instance of Cleaner running we may have state info - * for items which were cleaned by instances. Here we remove them. - * - * In the long run if we add end_time to compaction_queue, then we can check that - * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case - * we know the lock owner is reading files created by this compaction or later. - * The advantage is that we don't have to store the locks. - */ - Set currentToCleanSet = new HashSet<>(); - for (CompactionInfo ci : toClean) { - currentToCleanSet.add(ci.id); - } - Set cleanPerformedByOthers = new HashSet<>(); - for (long id : compactId2CompactInfoMap.keySet()) { - if (!currentToCleanSet.contains(id)) { - cleanPerformedByOthers.add(id); - } - } - for (long id : cleanPerformedByOthers) { - compactId2CompactInfoMap.remove(id); - compactId2LockMap.remove(id); - } - } - if (toClean.size() > 0 || compactId2LockMap.size() > 0) { - ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - if(LOG.isDebugEnabled()) { - dumpLockState(locksResponse); - } - for (CompactionInfo ci : toClean) { - // Check to see if we have seen this request before. If so, ignore it. If not, - // add it to our queue. - if (!compactId2LockMap.containsKey(ci.id)) { - compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); - compactId2CompactInfoMap.put(ci.id, ci); - } - } - - // Now, for each entry in the queue, see if all of the associated locks are clear so we - // can clean - Set currentLocks = buildCurrentLockSet(locksResponse); - List expiredLocks = new ArrayList(); - List compactionsCleaned = new ArrayList(); - try { - for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { - boolean sawLock = false; - for (Long lockId : queueEntry.getValue()) { - if (currentLocks.contains(lockId)) { - sawLock = true; - break; - } else { - expiredLocks.add(lockId); - } - } - - if (!sawLock) { - // Remember to remove this when we're out of the loop, - // we can't do it in the loop or we'll get a concurrent modification exception. - compactionsCleaned.add(queueEntry.getKey()); - //Future thought: this may be expensive so consider having a thread pool run in parallel - clean(compactId2CompactInfoMap.get(queueEntry.getKey())); - } else { - // Remove the locks we didn't see so we don't look for them again next time - for (Long lockId : expiredLocks) { - queueEntry.getValue().remove(lockId); - } - LOG.info("Skipping cleaning of " + - idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + - " due to reader present: " + queueEntry.getValue()); - } - } - } finally { - if (compactionsCleaned.size() > 0) { - for (Long compactId : compactionsCleaned) { - compactId2LockMap.remove(compactId); - compactId2CompactInfoMap.remove(compactId); - } - } - } + Long minOpenTxn = txnHandler.findMinHistoryLevel(); + for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + clean(compactionInfo, minOpenTxn); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + @@ -212,41 +119,7 @@ public void run() { } while (!stop.get()); } - private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { - Set relatedLocks = new HashSet(); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - /** - * Hive QL is not case sensitive wrt db/table/column names - * Partition names get - * normalized (as far as I can tell) by lower casing column name but not partition value. - * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} - * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} - * Since user input may start out in any case, compare here case-insensitive for db/table - * but leave partition name as is. - */ - if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { - if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { - if ((ci.partName == null && lock.getPartname() == null) || - (ci.partName != null && ci.partName.equals(lock.getPartname()))) { - relatedLocks.add(lock.getLockid()); - } - } - } - } - - return relatedLocks; - } - - private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { - Set currentLocks = new HashSet(locksResponse.getLocks().size()); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - currentLocks.add(lock.getLockid()); - } - return currentLocks; - } - - private void clean(CompactionInfo ci) throws MetaException { + private void clean(CompactionInfo ci, Long minOpenTxn) throws MetaException { LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); @@ -271,6 +144,32 @@ private void clean(CompactionInfo ci) throws MetaException { StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); + ValidReaderWriteIdList minOpenValidWriteIdList = null; + //todo: should this figure out minHistory for each 'ci'? if not, the same ValidTxnList can be shared + if(minOpenTxn != null) { + /** + * if minOpenTxn==null => we have no open txns and we'll use CompactionInfo.highestWriteId. + * + * createValidReadTxnList() below uses minOpenTxn as HWM and so the list of exceptions, + * if any, will only contain aborted txns. It cannot have any open txns since minOpenTxn + * is by definition the system side minimum open. + */ + ValidTxnList validTxnList = TxnCommonUtils. + //todo: explain the -1 here + createValidReadTxnList(txnHandler.getOpenTxns(), minOpenTxn - 1); + List tblNames = Collections.singletonList( + TableName.getDbTable(t.getDbName(), t.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); + rqst.setValidTxnList(validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + minOpenValidWriteIdList = TxnCommonUtils.createValidReaderWriteIdList( + rsp.getTblValidWriteIds().get(0)); + } /** * Each Compaction only compacts as far as the highest txn id such that all txns below it * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked @@ -279,18 +178,21 @@ private void clean(CompactionInfo ci) throws MetaException { * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from * under an active reader. * + * Suppose Cleaner checks and finds no minOpenTxn ID. * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a * clean request for D2. - * Cleaner checks existing locks and finds none. * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction * completes which creates D4. * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete * unless ValidWriteIdList is "capped" at highestWriteId. */ - final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), - ci.highestWriteId) - : new ValidReaderWriteIdList(); + final ValidWriteIdList validWriteIdList = + (minOpenValidWriteIdList != null) ? + minOpenValidWriteIdList : + ((ci.highestWriteId > 0) ? + new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], + new BitSet(), ci.highestWriteId) : + new ValidReaderWriteIdList()); if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); @@ -327,7 +229,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); - List filesToDelete = new ArrayList(obsoleteDirs.size()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); @@ -337,9 +239,6 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - List compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); - Collections.sort(compactIds); - extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { @@ -359,63 +258,4 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti fs.delete(dead, true); } } - private static class LockComparator implements Comparator { - //sort ascending by resource, nulls first - @Override - public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { - if(o1 == o2) { - return 0; - } - if(o1 == null) { - return -1; - } - if(o2 == null) { - return 1; - } - int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); - if(v != 0) { - return v; - } - if(o1.getTablename() == null) { - return -1; - } - if(o2.getTablename() == null) { - return 1; - } - v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); - if(v != 0) { - return v; - } - if(o1.getPartname() == null) { - return -1; - } - if(o2.getPartname() == null) { - return 1; - } - v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); - if(v != 0) { - return v; - } - //if still equal, compare by lock ids - v = Long.compare(o1.getLockid(), o2.getLockid()); - if(v != 0) { - return v; - } - return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); - - } - } - private void dumpLockState(ShowLocksResponse slr) { - Iterator l = slr.getLocksIterator(); - List sortedList = new ArrayList<>(); - while(l.hasNext()) { - sortedList.add(l.next()); - } - //sort for readability - sortedList.sort(new LockComparator()); - LOG.info("dumping locks"); - for(ShowLocksResponseElement lock : sortedList) { - LOG.info(lock.toString()); - } - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 77fe73687a..640ab7bcb9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.TestTxnCommands2; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -87,7 +88,7 @@ private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; - private Driver driver; + private Driver driver, driver2; private TxnStore txnHandler; public TestDbTxnManager2() throws Exception { @@ -103,6 +104,7 @@ public void setUp() throws Exception { SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build(), null); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build(), null); TxnDbUtil.cleanDb(conf); TxnDbUtil.prepDb(conf); SessionState ss = SessionState.get(); @@ -115,6 +117,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { driver.close(); + driver2.close(); if (txnMgr != null) { txnMgr.closeTxnManager(); } @@ -548,10 +551,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); @@ -561,10 +564,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); @@ -576,7 +579,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); @@ -586,7 +589,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); @@ -2556,4 +2559,32 @@ public void testTruncate() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test + public void testCleaner() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) stored as" + + " orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + checkCmdOnDriver(driver.run("start transaction")); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver2.run("start transaction")); + checkCmdOnDriver(driver2.run("select * from T")); + + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("commit")); + checkCmdOnDriver(driver.run("alter table T compact 'minor'")); + TestTxnCommands2.runWorker(conf); + + //so now we should have d1,d2,d1_2 + TestTxnCommands2.runCleaner(conf); + //so now we should have d2,d1_2 + System.currentTimeMillis(); + //todo: the state is correct - add checks that the right files are there + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cbb76d51d6..6217dfe77c 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -324,6 +324,38 @@ public void markCompacted(CompactionInfo info) throws MetaException { return findReadyToClean(); } } + @Override + public Long findMinHistoryLevel() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select MIN(MHL_MIN_OPEN_TXNID) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long minOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + return null;//we have no active txns in the system.... + } + return minOpenTxnId; + } catch (SQLException e) { + LOG.error("Unable to select MIN(MHL_MIN_OPEN_TXNID) for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findMinHistoryLevel"); + throw new MetaException("Unable to execute findMinHistoryLevel() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinHistoryLevel(); + } + + } /** * This will remove an entry from the queue after diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 080cc5284b..1e0190f6eb 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -352,6 +352,14 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.ReadOnly List findReadyToClean() throws MetaException; + /** + * Returns the smallest txnid that is seen in open state across all active + * transactions in the system. + * @return {@code null} if there are no active transactions + */ + @RetrySemantics.ReadOnly + Long findMinHistoryLevel() throws MetaException; + /** * This will remove an entry from the queue after * it has been compacted. diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 3bb1f0c62c..b11d1c43d6 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -83,17 +83,6 @@ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValid } } - public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { - // This is based on the existing valid write ID list that was built for a select query; - // therefore we assume all the aborted txns, etc. were already accounted for. - // All we do is adjust the high watermark to only include contiguous txns. - Long minOpenWriteId = ids.getMinOpenWriteId(); - if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { - return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); - } - return ids; - } - /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration