diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4d71eb4f4d..30312aa887 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1229,10 +1229,8 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( - delta.minWriteId, delta.maxWriteId)) { + if(ValidWriteIdList.RangeResponse.ALL == + writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } if (writeIdList.isWriteIdRangeValid( diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 71e130b608..6c59aded90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -499,6 +499,7 @@ public HiveTxnManager getTxnMgr() { * it's not coupled to the executing thread. Since tests run against Derby which often wedges * under concurrent access, tests must use a single thead and simulate concurrent access. * For example, {@code TestDbTxnManager2} + * @return previous {@link HiveTxnManager} or null */ @VisibleForTesting public HiveTxnManager setTxnMgr(HiveTxnManager mgr) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3565616171..f7eefde965 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -18,7 +18,12 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -32,9 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,13 +50,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,9 +63,6 @@ private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; - // List of compactions to clean. - private Map> compactId2LockMap = new HashMap<>(); - private Map compactId2CompactInfoMap = new HashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { @@ -96,95 +89,9 @@ public void run() { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - // First look for all the compactions that are waiting to be cleaned. If we have not - // seen an entry before, look for all the locks held on that table or partition and - // record them. We will then only clean the partition once all of those locks have been - // released. This way we avoid removing the files while they are in use, - // while at the same time avoiding starving the cleaner as new readers come along. - // This works because we know that any reader who comes along after the worker thread has - // done the compaction will read the more up to date version of the data (either in a - // newer delta or in a newer base). - List toClean = txnHandler.findReadyToClean(); - { - /** - * Since there may be more than 1 instance of Cleaner running we may have state info - * for items which were cleaned by instances. Here we remove them. - * - * In the long run if we add end_time to compaction_queue, then we can check that - * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case - * we know the lock owner is reading files created by this compaction or later. - * The advantage is that we don't have to store the locks. - */ - Set currentToCleanSet = new HashSet<>(); - for (CompactionInfo ci : toClean) { - currentToCleanSet.add(ci.id); - } - Set cleanPerformedByOthers = new HashSet<>(); - for (long id : compactId2CompactInfoMap.keySet()) { - if (!currentToCleanSet.contains(id)) { - cleanPerformedByOthers.add(id); - } - } - for (long id : cleanPerformedByOthers) { - compactId2CompactInfoMap.remove(id); - compactId2LockMap.remove(id); - } - } - if (toClean.size() > 0 || compactId2LockMap.size() > 0) { - ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - if(LOG.isDebugEnabled()) { - dumpLockState(locksResponse); - } - for (CompactionInfo ci : toClean) { - // Check to see if we have seen this request before. If so, ignore it. If not, - // add it to our queue. - if (!compactId2LockMap.containsKey(ci.id)) { - compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); - compactId2CompactInfoMap.put(ci.id, ci); - } - } - - // Now, for each entry in the queue, see if all of the associated locks are clear so we - // can clean - Set currentLocks = buildCurrentLockSet(locksResponse); - List expiredLocks = new ArrayList(); - List compactionsCleaned = new ArrayList(); - try { - for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { - boolean sawLock = false; - for (Long lockId : queueEntry.getValue()) { - if (currentLocks.contains(lockId)) { - sawLock = true; - break; - } else { - expiredLocks.add(lockId); - } - } - - if (!sawLock) { - // Remember to remove this when we're out of the loop, - // we can't do it in the loop or we'll get a concurrent modification exception. - compactionsCleaned.add(queueEntry.getKey()); - //Future thought: this may be expensive so consider having a thread pool run in parallel - clean(compactId2CompactInfoMap.get(queueEntry.getKey())); - } else { - // Remove the locks we didn't see so we don't look for them again next time - for (Long lockId : expiredLocks) { - queueEntry.getValue().remove(lockId); - } - LOG.info("Skipping cleaning of " + - idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + - " due to reader present: " + queueEntry.getValue()); - } - } - } finally { - if (compactionsCleaned.size() > 0) { - for (Long compactId : compactionsCleaned) { - compactId2LockMap.remove(compactId); - compactId2CompactInfoMap.remove(compactId); - } - } - } + Long minOpenTxn = txnHandler.findMinHistoryLevel(); + for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + clean(compactionInfo, minOpenTxn); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + @@ -212,41 +119,7 @@ public void run() { } while (!stop.get()); } - private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { - Set relatedLocks = new HashSet(); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - /** - * Hive QL is not case sensitive wrt db/table/column names - * Partition names get - * normalized (as far as I can tell) by lower casing column name but not partition value. - * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} - * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} - * Since user input may start out in any case, compare here case-insensitive for db/table - * but leave partition name as is. - */ - if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { - if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { - if ((ci.partName == null && lock.getPartname() == null) || - (ci.partName != null && ci.partName.equals(lock.getPartname()))) { - relatedLocks.add(lock.getLockid()); - } - } - } - } - - return relatedLocks; - } - - private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { - Set currentLocks = new HashSet(locksResponse.getLocks().size()); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - currentLocks.add(lock.getLockid()); - } - return currentLocks; - } - - private void clean(CompactionInfo ci) throws MetaException { + private void clean(CompactionInfo ci, Long minOpenTxn) throws MetaException { LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); @@ -271,6 +144,32 @@ private void clean(CompactionInfo ci) throws MetaException { StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); + ValidReaderWriteIdList minOpenValidWriteIdList = null; + //todo: should this figure out minHistory for each 'ci'? if not, the same ValidTxnList can be shared + if(minOpenTxn != null) { + /** + * if minOpenTxn==null => we have no open txns and we'll use CompactionInfo.highestWriteId. + * + * createValidReadTxnList() below uses minOpenTxn as HWM and so the list of exceptions, + * if any, will only contain aborted txns. It cannot have any open txns since minOpenTxn + * is by definition the system side minimum open. + */ + ValidTxnList validTxnList = TxnCommonUtils. + //todo: explain the -1 here + createValidReadTxnList(txnHandler.getOpenTxns(), minOpenTxn - 1); + List tblNames = Collections.singletonList( + TableName.getDbTable(t.getDbName(), t.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); + rqst.setValidTxnList(validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + minOpenValidWriteIdList = TxnCommonUtils.createValidReaderWriteIdList( + rsp.getTblValidWriteIds().get(0)); + } /** * Each Compaction only compacts as far as the highest txn id such that all txns below it * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked @@ -279,18 +178,21 @@ private void clean(CompactionInfo ci) throws MetaException { * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from * under an active reader. * + * Suppose Cleaner checks and finds no minOpenTxn ID. * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a * clean request for D2. - * Cleaner checks existing locks and finds none. * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction * completes which creates D4. * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete * unless ValidWriteIdList is "capped" at highestWriteId. */ - final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), - ci.highestWriteId) - : new ValidReaderWriteIdList(); + final ValidWriteIdList validWriteIdList = + (minOpenValidWriteIdList != null) ? + minOpenValidWriteIdList : + ((ci.highestWriteId > 0) ? + new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], + new BitSet(), ci.highestWriteId) : + new ValidReaderWriteIdList()); if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); @@ -327,7 +229,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); - List filesToDelete = new ArrayList(obsoleteDirs.size()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); @@ -337,9 +239,6 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - List compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); - Collections.sort(compactIds); - extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { @@ -359,63 +258,4 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti fs.delete(dead, true); } } - private static class LockComparator implements Comparator { - //sort ascending by resource, nulls first - @Override - public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { - if(o1 == o2) { - return 0; - } - if(o1 == null) { - return -1; - } - if(o2 == null) { - return 1; - } - int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); - if(v != 0) { - return v; - } - if(o1.getTablename() == null) { - return -1; - } - if(o2.getTablename() == null) { - return 1; - } - v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); - if(v != 0) { - return v; - } - if(o1.getPartname() == null) { - return -1; - } - if(o2.getPartname() == null) { - return 1; - } - v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); - if(v != 0) { - return v; - } - //if still equal, compare by lock ids - v = Long.compare(o1.getLockid(), o2.getLockid()); - if(v != 0) { - return v; - } - return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); - - } - } - private void dumpLockState(ShowLocksResponse slr) { - Iterator l = slr.getLocksIterator(); - List sortedList = new ArrayList<>(); - while(l.hasNext()) { - sortedList.add(l.next()); - } - //sort for readability - sortedList.sort(new LockComparator()); - LOG.info("dumping locks"); - for(ShowLocksResponseElement lock : sortedList) { - LOG.info(lock.toString()); - } - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000000..947d98cb7c --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,270 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.orc.impl.OrcAcidUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager; + +public class TestTxnCommands3 extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands3.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + protected String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + @Ignore("HIVE-20327") + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("drop database if exists mydb2 cascade"); + runStatementOnDriver("create database mydb1"); + runStatementOnDriver("create database mydb2"); + runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); + runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); + runStatementOnDriver("alter table mydb1.T compact 'minor'"); + + runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + + runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='bar'")); + Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); + } + //todo: re-compile w/o the fix and make sure this fails + @Test + public void testCompactionEmptyFile() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + //create delta_2 (so that writeId=2 is allocated) + runStatementOnDriver("insert into T values(1,2),(4,5)"); + FileSystem fs = FileSystem.get(hiveConf); + Path emptyFile = new Path(getWarehouseDir() + "/t/delta_0000002_0000002_0000/bucket_00000"); + fs.delete(emptyFile); + //now re-create it as empty + FSDataOutputStream os = fs.create(emptyFile); + os.close(); + os = fs.create(OrcAcidUtils.getSideFile(emptyFile)); + os.writeLong(0); + os.close(); + + Assert.assertEquals(0, fs.getFileStatus(emptyFile).getLen()); + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "t/delta_0000001_0000001_0000/bucket_00000"} + }; + checkResult(expected, "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a", false, + "before compaciton", LOG); + + runStatementOnDriver("alter table T compact 'MINOR'"); + runWorker(hiveConf); + + String[][] expected2 = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "t/delta_0000001_0000002/bucket_00000"} + }; + checkResult(expected2, "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a", false, + "after compaction", LOG); + + Path emptyBaseFile = new Path(getWarehouseDir() + "/t/base_0000002/bucket_00000"); + os = fs.create(emptyBaseFile); + os.close(); + + runStatementOnDriver("insert into T values(1,2),(4,5)");//create committed writeId=3 + Path p = new Path(getWarehouseDir() + "/t/delta_0000003_0000003_0000/bucket_00000"); + fs.delete(p); + os = fs.create(p);//make the file empty + os.close(); + + runStatementOnDriver("alter table T compact 'MAJOR'"); + runWorker(hiveConf); + //none of this fails. OrcRawRecordMerger already has len > 0 checks for base files and + // + } + + @Test + public void testCleaner2() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + dropTable(new String[] {"T"}); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2 + + Driver driver2 = new Driver(new QueryState.Builder().withHiveConf(hiveConf).build(), null); + driver2.setMaxRows(10000); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + HiveTxnManager txnMgr1 = swapTxnManager(txnMgr2); + Driver driver1 = swapDrivers(driver2); + runStatementOnDriver("start transaction");//T3 + /* this select sees + target/warehouse/t/ + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", + "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4", + "t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + txnMgr2 = swapTxnManager(txnMgr1); + driver2 = swapDrivers(driver1); + runStatementOnDriver("alter table T compact 'minor'");//T4 + TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2 & delete_delta_1_2 + /* Now we should have + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + FileSystem fs = FileSystem.get(hiveConf); + Path warehousePath = new Path(getWarehouseDir()); + FileStatus[] actualList = fs.listStatus(new Path(warehousePath + "/t"), + FileUtils.HIDDEN_FILES_PATH_FILTER); + + String[] expectedList = new String[] { + "/t/delete_delta_0000001_0000002", + "/t/delta_0000001_0000002", + "/t/delta_0000001_0000001_0000", + "/t/delta_0000002_0000002_0000", + }; + checkExpectedFiles(actualList, expectedList, warehousePath.toString()); + /* Now the state should be + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + The cleaner should not delete delta_2_2 since the txn running 'select * from T' may still be + reading this file (assuming this select statement is still running)*/ + + TestTxnCommands2.runCleaner(hiveConf); + expectedList = new String[] { + "/t/delete_delta_0000001_0000002", + "/t/delta_0000001_0000002", + "/t/delta_0000002_0000002_0000", + }; + actualList = fs.listStatus(new Path(warehousePath + "/t"), + FileUtils.HIDDEN_FILES_PATH_FILTER); + checkExpectedFiles(actualList, expectedList, warehousePath.toString()); + //ok, so this doesn't work - d2 is removed - this actually makes sense since if delta_1_2 is available the assumption is that + //it should be used in favor of delta_1_1 + delta_2_2 + //relying on state of LM won't help here either: + /*1. start txn do a select * from T as above + * now the txn does the same select * again and while it's running compactor makes d_1_2 and cleaner runs while the 2nd select * is reading d2_2 + * The new locks will tell the cleaner that it's ok to clean.... + * */ + + + + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } + private static void checkExpectedFiles(FileStatus[] actualList, String[] expectedList, String filePrefix) throws Exception { + Set expectedSet = new HashSet<>(); + Set unexpectedSet = new HashSet<>(); + for(String f : expectedList) { + expectedSet.add(f); + } + for(FileStatus fs : actualList) { + String endOfPath = fs.getPath().toString().substring(fs.getPath().toString().indexOf(filePrefix) + filePrefix.length()); + if(!expectedSet.remove(endOfPath)) { + unexpectedSet.add(endOfPath); + } + } + Assert.assertTrue("not found set: " + expectedSet + " unexpected set: " + unexpectedSet, expectedSet.isEmpty() && unexpectedSet.isEmpty()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index a5bd1cbd67..e882c94cfe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -176,67 +176,4 @@ public void testConcatenateMM() throws Exception { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } - @Test - public void testRenameTable() throws Exception { - MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - runStatementOnDriver("drop database if exists mydb1 cascade"); - runStatementOnDriver("drop database if exists mydb2 cascade"); - runStatementOnDriver("create database mydb1"); - runStatementOnDriver("create database mydb2"); - runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); - runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); - //put something in WRITE_SET - runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); - runStatementOnDriver("alter table mydb1.T compact 'minor'"); - - runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); - - String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; - String[][] expected = new String[][] { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", - "s/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "s/delta_0000002_0000002_0000/bucket_00000"}}; - checkResult(expected, testQuery, false, "check data", LOG); - - - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='s'")); - Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); - - runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='bar'")); - Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 05ce3e214d..c52de89f12 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -249,4 +249,14 @@ protected void checkResult(String[][] expectedResult, String query, boolean isVe checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); } + void dropTable(String[] tabs) throws Exception { + for(String tab : tabs) { + d.run("drop table if exists " + tab); + } + } + Driver swapDrivers(Driver otherDriver) { + Driver tmp = d; + d = otherDriver; + return tmp; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 77fe73687a..1990899128 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.TestTxnCommands2; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -87,7 +88,7 @@ private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; - private Driver driver; + private Driver driver, driver2; private TxnStore txnHandler; public TestDbTxnManager2() throws Exception { @@ -103,6 +104,7 @@ public void setUp() throws Exception { SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build(), null); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build(), null); TxnDbUtil.cleanDb(conf); TxnDbUtil.prepDb(conf); SessionState ss = SessionState.get(); @@ -115,6 +117,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { driver.close(); + driver2.close(); if (txnMgr != null) { txnMgr.closeTxnManager(); } @@ -548,10 +551,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); @@ -561,10 +564,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); @@ -576,7 +579,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); @@ -586,7 +589,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); @@ -824,7 +827,7 @@ public static ShowLocksResponseElement checkLock(LockType expectedType, LockStat * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us * to write good tests) */ - private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { return SessionState.get().setTxnMgr(txnMgr); } @Test @@ -2556,4 +2559,122 @@ public void testTruncate() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test + public void testCleaner() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) stored as" + + " orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + checkCmdOnDriver(driver.run("start transaction")); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver2.run("start transaction")); + checkCmdOnDriver(driver2.run("select * from T")); + + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("commit")); + checkCmdOnDriver(driver.run("alter table T compact 'minor'")); + TestTxnCommands2.runWorker(conf); + + //so now we should have d1,d2,d1_2 + /* Now we should have d1_d2 from minor compaction and d1 from 1st txn by 'driver' and + d2 from 2nd txn by 'driver' - both committed at this point. + At this point, txn doint 'select * from T' from 'driver2' is still open and it sees the txn + that created d2 as open, so cleaner should not remove d2 + [ + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + TestTxnCommands2.runCleaner(conf); +/* +target/warehouse/t/ +├── delete_delta_0000001_0000002 +│   ├── _orc_acid_version +│   └── bucket_00000 +├── delta_0000001_0000002 +│   ├── _orc_acid_version +│   └── bucket_00000 +└── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + System.currentTimeMillis(); + //todo: the state is correct - add checks that the right files are there + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } + @Test + public void testCleaner2() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) stored as" + + " orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T values(0,2)"));//makes delta_1_1 in T1 + checkCmdOnDriver(driver.run("insert into T values(1,4)"));//makes delta_2_2 in T2 + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver2.run("start transaction"));//T3 + checkCmdOnDriver(driver2.run("select INPUT__FILE__NAME from T")); + List rs = new ArrayList<>(); + driver2.getResults(rs); + /* this select sees + target/warehouse/t/ + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("alter table T compact 'minor'"));//T4 + TestTxnCommands2.runWorker(conf);//makes delta_1_2 & delete_delta_1_2 + /* Now we should have + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + TestTxnCommands2.runCleaner(conf); + /* Now the state should be + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + The cleaner should not delete delta_2_2 since the txn running 'select * from T' may still be + reading this file (assuming this select statement is still running)*/ + System.currentTimeMillis(); + //todo: the state is correct - add checks that the right files are there + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ce574b4ac4..01a96cd7a2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -197,231 +198,6 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { Assert.assertTrue(sawDelta); } - @Test - public void blockedByLockTable() throws Exception { - Table t = newTable("default", "bblt", false); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblt", compacts.get(0).getTablename()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void blockedByLockPartition() throws Exception { - Table t = newTable("default", "bblp", true); - Partition p = newPartition(t, "today"); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblp", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); - comp.setTablename("bblp"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.DELETE); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania")); - req.setTxnid(resp.getTxn_ids().get(0)); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblp", compacts.get(0).getTablename()); - Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void notBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", false); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.INSERT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp2.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List components2 = new ArrayList(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void partitionNotBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", true); - Partition p = newPartition(t, "today"); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp.setTablename("bblt"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.INSERT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp2.setTablename("bblt"); - comp2.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.SELECT); - List components2 = new ArrayList(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - @Test public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { Table t = newTable("default", "campcnb", true); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cbb76d51d6..cdd3724fbe 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -324,6 +324,39 @@ public void markCompacted(CompactionInfo info) throws MetaException { return findReadyToClean(); } } + @Override + public Long findMinHistoryLevel() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select MIN(MHL_MIN_OPEN_TXNID) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + rs.next(); + long minOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + return null;//we have no active txns in the system.... + } + return minOpenTxnId; + } catch (SQLException e) { + LOG.error("Unable to select MIN(MHL_MIN_OPEN_TXNID) for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findMinHistoryLevel"); + throw new MetaException("Unable to execute findMinHistoryLevel() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinHistoryLevel(); + } + + } /** * This will remove an entry from the queue after diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 080cc5284b..1e0190f6eb 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -352,6 +352,14 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.ReadOnly List findReadyToClean() throws MetaException; + /** + * Returns the smallest txnid that is seen in open state across all active + * transactions in the system. + * @return {@code null} if there are no active transactions + */ + @RetrySemantics.ReadOnly + Long findMinHistoryLevel() throws MetaException; + /** * This will remove an entry from the queue after * it has been compacted. diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 3bb1f0c62c..b11d1c43d6 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -83,17 +83,6 @@ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValid } } - public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { - // This is based on the existing valid write ID list that was built for a select query; - // therefore we assume all the aborted txns, etc. were already accounted for. - // All we do is adjust the high watermark to only include contiguous txns. - Long minOpenWriteId = ids.getMinOpenWriteId(); - if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { - return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); - } - return ids; - } - /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index db4dd9ec42..0bca451fbe 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -83,63 +83,6 @@ public void testOpenTxnNotExcluded() throws Exception { Assert.assertFalse(validTxns.isTxnValid(4)); } - @Test - public void testTxnRange() throws Exception { - ValidTxnList validTxns = client.getValidTxns(); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 3L)); - List tids = client.openTxns("me", 5).getTxn_ids(); - - HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5); - Assert.assertEquals(0, rsp.getNosuch().size()); - Assert.assertEquals(0, rsp.getAborted().size()); - - client.rollbackTxn(1L); - client.commitTxn(2L); - client.commitTxn(3L); - client.commitTxn(4L); - validTxns = client.getValidTxns(); - System.out.println("validTxns = " + validTxns); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 3L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(3L, 4L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(2L, 5L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4L, 5L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 1L)); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(5L, 10L)); - - validTxns = new ValidReadTxnList("10:5:4,5,6:"); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(4,6)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(7, 10)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(7, 11)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(3, 6)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4, 7)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1, 12)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(1, 3)); - } - @Test public void testLocks() throws Exception { LockRequestBuilder rqstBuilder = new LockRequestBuilder(); diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index b8ff03f9c4..856cc3256f 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -26,7 +26,7 @@ * This class will view a transaction as valid only if it is committed. Both open and aborted * transactions will be seen as invalid. */ -public class ValidReadTxnList implements ValidTxnList { +public final class ValidReadTxnList implements ValidTxnList { protected long[] exceptions; protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open @@ -62,33 +62,6 @@ public boolean isTxnValid(long txnid) { return Arrays.binarySearch(exceptions, txnid) < 0; } - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - // check the easy cases first - if (minTxnId > highWatermark) { - return RangeResponse.NONE; - } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) { - return RangeResponse.ALL; - } - - // since the exceptions and the range in question overlap, count the - // exceptions in the range - long count = Math.max(0, maxTxnId - highWatermark); - for(long txn: exceptions) { - if (minTxnId <= txn && txn <= maxTxnId) { - count += 1; - } - } - - if (count == 0) { - return RangeResponse.ALL; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.NONE; - } else { - return RangeResponse.SOME; - } - } - @Override public String toString() { return writeToString(); @@ -191,34 +164,5 @@ public boolean isTxnAborted(long txnid) { int index = Arrays.binarySearch(exceptions, txnid); return index >= 0 && abortedBits.get(index); } - - @Override - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { - // check the easy cases first - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } - - int count = 0; // number of aborted txns found in exceptions - - // traverse the aborted txns list, starting at first aborted txn index - for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) { - long abortedTxnId = exceptions[i]; - if (abortedTxnId > maxTxnId) { // we've already gone beyond the specified range - break; - } - if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) { - count++; - } - } - - if (count == 0) { - return RangeResponse.NONE; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.ALL; - } else { - return RangeResponse.SOME; - } - } } diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index d4c3b09730..919a75ede0 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -46,16 +46,6 @@ */ public boolean isTxnValid(long txnid); - /** - * Find out if a range of transaction ids are valid. Note that valid may have different meanings - * for different implementations, as some will only want to see committed transactions and some - * both committed and aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are valid. - */ - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId); - /** * Write this validTxnList into a string. This should produce a string that * can be used by {@link #readFromString(String)} to populate a validTxnsList. @@ -90,14 +80,6 @@ */ public boolean isTxnAborted(long txnid); - /** - * Find out if a range of transaction ids are aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are aborted. - */ - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId); - /** * Returns smallest Open transaction in this set, {@code null} if there is none. */