diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3fe67b2..a918948 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1296,6 +1296,9 @@ "Number of aborted transactions involving a particular table or partition before major\n" + "compaction is initiated."), + HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", + new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, "Whether writes to HBase should be forced to the write-ahead log. \n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 928fd61..bede378 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -348,7 +348,7 @@ public static Directory getAcidState(Path directory, long bestBaseTxn = 0; final List deltas = new ArrayList(); List working = new ArrayList(); - final List original = new ArrayList(); + List originalDirectories = new ArrayList(); final List obsolete = new ArrayList(); List children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter); @@ -375,16 +375,26 @@ public static Directory getAcidState(Path directory, working.add(delta); } } else { - findOriginals(fs, child, original); + // This is just the directory. We need to recurse and find the actual files. But don't + // do this until we have determined there is no base. This saves time. Plus, + // it is possible that the cleaner is running and removing these original files, + // in which case recursing through them could cause us to get an error. + originalDirectories.add(child); } } + final List original = new ArrayList(); // if we have a base, the original files are obsolete. if (bestBase != null) { - obsolete.addAll(original); // remove the entries so we don't get confused later and think we should // use them. original.clear(); + } else { + // Okay, we're going to need these originals. Recurse through them and figure out what we + // really need. + for (FileStatus origDir : originalDirectories) { + findOriginals(fs, origDir, original); + } } Collections.sort(working); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 18bb2c0..f11111a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -24,15 +24,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnListImpl; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -41,7 +38,12 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; /** * A class to clean directories after compactions. This will run in a separate thread. @@ -50,35 +52,81 @@ static final private String CLASS_NAME = Cleaner.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - private long cleanerCheckInterval = 5000; + private long cleanerCheckInterval = 0; + + // List of compactions to clean. + private Map> compactId2LockMap = new HashMap>(); + private Map compactId2CompactInfoMap = new HashMap(); @Override public void run() { + if (cleanerCheckInterval == 0) { + cleanerCheckInterval = conf.getTimeVar( + HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS); + } + // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. do { try { long startedAt = System.currentTimeMillis(); - // Now look for new entries ready to be cleaned. + // First look for all the compactions that are waiting to be cleaned. If we have not + // seen an entry before, look for all the locks held on that table or partition and + // record them. We will then only clean the partition once all of those locks have been + // released. This way we avoid removing the files while they are in use, + // while at the same time avoiding starving the cleaner as new readers come along. + // This works because we know that any reader who comes along after the worker thread has + // done the compaction will read the more up to date version of the data (either in a + // newer delta or in a newer base). List toClean = txnHandler.findReadyToClean(); - for (CompactionInfo ci : toClean) { - LockComponent comp = null; - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname); - comp.setTablename(ci.tableName); - if (ci.partName != null) comp.setPartitionname(ci.partName); - List components = new ArrayList(1); - components.add(comp); - LockRequest rqst = new LockRequest(components, System.getProperty("user.name"), - Worker.hostname()); - LockResponse rsp = txnHandler.lockNoWait(rqst); + if (toClean.size() > 0 || compactId2LockMap.size() > 0) { + ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); + + for (CompactionInfo ci : toClean) { + // Check to see if we have seen this request before. If so, ignore it. If not, + // add it to our queue. + if (!compactId2LockMap.containsKey(ci.id)) { + compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); + compactId2CompactInfoMap.put(ci.id, ci); + } + } + + // Now, for each entry in the queue, see if all of the associated locks are clear so we + // can clean + Set currentLocks = buildCurrentLockSet(locksResponse); + List expiredLocks = new ArrayList(); + List compactionsCleaned = new ArrayList(); try { - if (rsp.getState() == LockState.ACQUIRED) { - clean(ci); + for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { + boolean sawLock = false; + for (Long lockId : queueEntry.getValue()) { + if (currentLocks.contains(lockId)) { + sawLock = true; + break; + } else { + expiredLocks.add(lockId); + } + } + + if (!sawLock) { + // Remember to remove this when we're out of the loop, + // we can't do it in the loop or we'll get a concurrent modification exception. + compactionsCleaned.add(queueEntry.getKey()); + clean(compactId2CompactInfoMap.get(queueEntry.getKey())); + } else { + // Remove the locks we didn't see so we don't look for them again next time + for (Long lockId : expiredLocks) { + queueEntry.getValue().remove(lockId); + } + } } } finally { - if (rsp.getState() == LockState.ACQUIRED) { - txnHandler.unlock(new UnlockRequest(rsp.getLockid())); + if (compactionsCleaned.size() > 0) { + for (Long compactId : compactionsCleaned) { + compactId2LockMap.remove(compactId); + compactId2CompactInfoMap.remove(compactId); + } } } } @@ -94,6 +142,31 @@ public void run() { } while (!stop.boolVal); } + private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { + Set relatedLocks = new HashSet(); + for (ShowLocksResponseElement lock : locksResponse.getLocks()) { + if (ci.dbname.equals(lock.getDbname())) { + if ((ci.tableName == null && lock.getTablename() == null) || + (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) { + if ((ci.partName == null && lock.getPartname() == null) || + (ci.partName != null && ci.partName.equals(lock.getPartname()))) { + relatedLocks.add(lock.getLockid()); + } + } + } + } + + return relatedLocks; + } + + private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { + Set currentLocks = new HashSet(locksResponse.getLocks().size()); + for (ShowLocksResponseElement lock : locksResponse.getLocks()) { + currentLocks.add(lock.getLockid()); + } + return currentLocks; + } + private void clean(CompactionInfo ci) throws MetaException { LOG.info("Starting cleaning for " + ci.getFullPartitionName()); try { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index b174496..c0bdb00 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -217,11 +217,11 @@ public void testObsoleteOriginals() throws Exception { Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:")); + // The two original buckets won't be in the obsolete list because we don't look at those + // until we have determined there is no base. List obsolete = dir.getObsolete(); - assertEquals(3, obsolete.size()); + assertEquals(1, obsolete.size()); assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index ec1379d..fa44a16 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -63,12 +63,13 @@ protected CompactionTxnHandler txnHandler; protected IMetaStoreClient ms; protected long sleepTime = 1000; + protected HiveConf conf; private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer(); private final File tmpdir; protected CompactorTest() throws Exception { - HiveConf conf = new HiveConf(); + conf = new HiveConf(); TxnDbUtil.setConfValues(conf); TxnDbUtil.cleanDb(); ms = new HiveMetaStoreClient(conf); @@ -79,16 +80,16 @@ protected CompactorTest() throws Exception { tmpdir.deleteOnExit(); } - protected void startInitiator(HiveConf conf) throws Exception { - startThread('i', conf); + protected void startInitiator() throws Exception { + startThread('i', true); } - protected void startWorker(HiveConf conf) throws Exception { - startThread('w', conf); + protected void startWorker() throws Exception { + startThread('w', true); } - protected void startCleaner(HiveConf conf) throws Exception { - startThread('c', conf); + protected void startCleaner(boolean stopAfterOne) throws Exception { + startThread('c', stopAfterOne); } protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException { @@ -117,6 +118,9 @@ protected Table newTable(String dbName, String tableName, boolean partitioned, table.setParameters(parameters); + // drop the table first, in case some previous test created it + ms.dropTable(dbName, tableName); + ms.createTable(table); return table; } @@ -142,37 +146,27 @@ protected long openTxn() throws MetaException { return txns.get(0); } - protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, - int numRecords) throws Exception{ - addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true); + protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords) + throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true); } - protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn, - int numRecords) throws Exception{ - addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); + protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception { + addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); } - protected void addLegacyFile(HiveConf conf, Table t, Partition p, - int numRecords) throws Exception { - addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true); + protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception { + addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true); } - protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, - int numRecords, int numBuckets, boolean allBucketsPresent) - throws Exception { - addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent); + protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, + int numBuckets, boolean allBucketsPresent) throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent); } - protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn, - int numRecords, int numBuckets, boolean allBucketsPresent) - throws Exception { - addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent); - } - - protected void addLegacyFile(HiveConf conf, Table t, Partition p, - int numRecords, int numBuckets, boolean allBucketsPresent) - throws Exception { - addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent); + protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, int numBuckets, + boolean allBucketsPresent) throws Exception { + addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent); } protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception { @@ -191,6 +185,10 @@ protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnE for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid)); } + protected void stopThread() { + stop.boolVal = true; + } + private StorageDescriptor newStorageDescriptor(String location, List sortCols) { StorageDescriptor sd = new StorageDescriptor(); List cols = new ArrayList(2); @@ -214,9 +212,8 @@ private StorageDescriptor newStorageDescriptor(String location, List sort return sd; } - // I can't do this with @Before because I want to be able to control the config file provided - // to each test. - private void startThread(char type, HiveConf conf) throws Exception { + // I can't do this with @Before because I want to be able to control when the thead starts + private void startThread(char type, boolean stopAfterOne) throws Exception { TxnDbUtil.setConfValues(conf); CompactorThread t = null; switch (type) { @@ -227,9 +224,10 @@ private void startThread(char type, HiveConf conf) throws Exception { } t.setThreadId((int) t.getId()); t.setHiveConf(conf); - stop.boolVal = true; + stop.boolVal = stopAfterOne; t.init(stop); - t.run(); + if (stopAfterOne) t.run(); + else t.start(); } private String getLocation(String tableName, String partValue) { @@ -243,7 +241,7 @@ private String getLocation(String tableName, String partValue) { private enum FileType {BASE, DELTA, LEGACY}; - private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn, + private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, boolean allBucketsPresent) throws Exception { String partValue = (p == null) ? null : p.getValues().get(0); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index edc7f32..7dbbe24 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -22,12 +22,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Tests for the compactor Cleaner thread @@ -41,19 +40,17 @@ public TestCleaner() throws Exception { public void nothing() throws Exception { // Test that the whole things works when there's nothing in the queue. This is just a // survival test. - startCleaner(new HiveConf()); + startCleaner(true); } @Test public void cleanupAfterMajorTableCompaction() throws Exception { Table t = newTable("default", "camtc", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); - addBaseFile(conf, t, null, 25L, 25); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addBaseFile(t, null, 25L, 25); burnThroughTransactions(25); @@ -63,7 +60,7 @@ public void cleanupAfterMajorTableCompaction() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -80,12 +77,10 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { Table t = newTable("default", "campc", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); - addBaseFile(conf, t, p, 25L, 25); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addBaseFile(t, p, 25L, 25); burnThroughTransactions(25); @@ -96,7 +91,7 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -112,12 +107,10 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { public void cleanupAfterMinorTableCompaction() throws Exception { Table t = newTable("default", "camitc", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); - addDeltaFile(conf, t, null, 21L, 24L, 4); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 21L, 24L, 4); burnThroughTransactions(25); @@ -127,7 +120,7 @@ public void cleanupAfterMinorTableCompaction() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -151,12 +144,10 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { Table t = newTable("default", "camipc", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); - addDeltaFile(conf, t, p, 21L, 24L, 4); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaFile(t, p, 21L, 24L, 4); burnThroughTransactions(25); @@ -167,7 +158,7 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -190,12 +181,10 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { public void blockedByLockTable() throws Exception { Table t = newTable("default", "bblt", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); - addDeltaFile(conf, t, null, 21L, 24L, 4); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 21L, 24L, 4); burnThroughTransactions(25); @@ -212,7 +201,7 @@ public void blockedByLockTable() throws Exception { LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -228,12 +217,10 @@ public void blockedByLockPartition() throws Exception { Table t = newTable("default", "bblp", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); - addDeltaFile(conf, t, p, 21L, 24L, 4); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaFile(t, p, 21L, 24L, 4); burnThroughTransactions(25); @@ -244,7 +231,7 @@ public void blockedByLockPartition() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("bblp"); comp.setPartitionname("ds=today"); List components = new ArrayList(1); @@ -252,7 +239,7 @@ public void blockedByLockPartition() throws Exception { LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -265,15 +252,141 @@ public void blockedByLockPartition() throws Exception { } @Test + public void notBlockedBySubsequentLock() throws Exception { + Table t = newTable("default", "bblt", false); + + // Set the run frequency low on this test so it doesn't take long + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, + TimeUnit.MILLISECONDS); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); + comp.setTablename("bblt"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(false); + + // Make sure the compactor has a chance to run once + Thread.currentThread().sleep(100); + + // There should still be one request, as the locks still held. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + + + // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial + // clean request + LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); + comp2.setTablename("bblt"); + List components2 = new ArrayList(1); + components2.add(comp2); + LockRequest req2 = new LockRequest(components, "me", "localhost"); + LockResponse res2 = txnHandler.lock(req2); + + // Unlock the previous lock + txnHandler.unlock(new UnlockRequest(res.getLockid())); + + Thread.currentThread().sleep(200); + stopThread(); + Thread.currentThread().sleep(100); + + + // Check there are no compactions requests left. + rsp = txnHandler.showCompact(new ShowCompactRequest()); + compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + @Test + public void partitionNotBlockedBySubsequentLock() throws Exception { + Table t = newTable("default", "bblt", true); + Partition p = newPartition(t, "today"); + + // Set the run frequency low on this test so it doesn't take long + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, + TimeUnit.MILLISECONDS); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaFile(t, p, 21L, 24L, 4); + + burnThroughTransactions(25); + + CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + txnHandler.markCompacted(ci); + txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); + comp.setTablename("bblt"); + comp.setPartitionname("ds=today"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + startCleaner(false); + + // Make sure the compactor has a chance to run once + Thread.currentThread().sleep(100); + + // There should still be one request, as the locks still held. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + + + // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial + // clean request + LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); + comp2.setTablename("bblt"); + comp2.setPartitionname("ds=today"); + List components2 = new ArrayList(1); + components2.add(comp2); + LockRequest req2 = new LockRequest(components, "me", "localhost"); + LockResponse res2 = txnHandler.lock(req2); + + // Unlock the previous lock + txnHandler.unlock(new UnlockRequest(res.getLockid())); + + Thread.currentThread().sleep(200); + stopThread(); + Thread.currentThread().sleep(100); + + + // Check there are no compactions requests left. + rsp = txnHandler.showCompact(new ShowCompactRequest()); + compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + @Test public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { Table t = newTable("default", "campcnb", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addDeltaFile(conf, t, p, 1L, 22L, 22); - addDeltaFile(conf, t, p, 23L, 24L, 2); - addBaseFile(conf, t, p, 25L, 25); + addDeltaFile(t, p, 1L, 22L, 22); + addDeltaFile(t, p, 23L, 24L, 2); + addBaseFile(t, p, 25L, 25); burnThroughTransactions(25); @@ -284,7 +397,7 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { txnHandler.markCompacted(ci); txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - startCleaner(conf); + startCleaner(true); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -295,9 +408,4 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { Assert.assertEquals(1, paths.size()); Assert.assertEquals("base_25", paths.get(0).getName()); } - - @Before - public void setUpTxnDb() throws Exception { - TxnDbUtil.setConfValues(new HiveConf()); - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 128fef1..7d88c1a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -47,7 +47,7 @@ public TestInitiator() throws Exception { public void nothing() throws Exception { // Test that the whole things works when there's nothing in the queue. This is just a // survival test. - startInitiator(new HiveConf()); + startInitiator(); } @Test @@ -63,7 +63,7 @@ public void recoverFailedLocalWorkers() throws Exception { txnHandler.findNextToCompact(Worker.hostname() + "-193892"); txnHandler.findNextToCompact("nosuchhost-193892"); - startInitiator(new HiveConf()); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -89,10 +89,9 @@ public void recoverFailedRemoteWorkers() throws Exception { txnHandler.findNextToCompact("nosuchhost-193892"); - HiveConf conf = new HiveConf(); conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -104,7 +103,6 @@ public void recoverFailedRemoteWorkers() throws Exception { public void majorCompactOnTableTooManyAborts() throws Exception { Table t = newTable("default", "mcottma", false); - HiveConf conf = new HiveConf(); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); for (int i = 0; i < 11; i++) { @@ -119,7 +117,7 @@ public void majorCompactOnTableTooManyAborts() throws Exception { txnHandler.abortTxn(new AbortTxnRequest(txnid)); } - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -134,7 +132,6 @@ public void majorCompactOnPartitionTooManyAborts() throws Exception { Table t = newTable("default", "mcoptma", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); for (int i = 0; i < 11; i++) { @@ -150,7 +147,7 @@ public void majorCompactOnPartitionTooManyAborts() throws Exception { txnHandler.abortTxn(new AbortTxnRequest(txnid)); } - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -168,7 +165,6 @@ public void noCompactOnManyDifferentPartitionAborts() throws Exception { Partition p = newPartition(t, "day-" + i); } - HiveConf conf = new HiveConf(); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); for (int i = 0; i < 11; i++) { @@ -184,7 +180,7 @@ public void noCompactOnManyDifferentPartitionAborts() throws Exception { txnHandler.abortTxn(new AbortTxnRequest(txnid)); } - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); @@ -197,8 +193,6 @@ public void cleanEmptyAbortedTxns() throws Exception { // accidently clean it too. Table t = newTable("default", "ceat", false); - HiveConf conf = new HiveConf(); - long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ceat"); @@ -216,7 +210,7 @@ public void cleanEmptyAbortedTxns() throws Exception { GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(101, openTxns.getOpen_txnsSize()); - startInitiator(conf); + startInitiator(); openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(1, openTxns.getOpen_txnsSize()); @@ -228,7 +222,6 @@ public void noCompactWhenNoCompactSet() throws Exception { parameters.put("NO_AUTO_COMPACTION", "true"); Table t = newTable("default", "ncwncs", false, parameters); - HiveConf conf = new HiveConf(); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); for (int i = 0; i < 11; i++) { @@ -243,7 +236,7 @@ public void noCompactWhenNoCompactSet() throws Exception { txnHandler.abortTxn(new AbortTxnRequest(txnid)); } - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); @@ -253,7 +246,6 @@ public void noCompactWhenNoCompactSet() throws Exception { public void noCompactWhenCompactAlreadyScheduled() throws Exception { Table t = newTable("default", "ncwcas", false); - HiveConf conf = new HiveConf(); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10); for (int i = 0; i < 11; i++) { @@ -277,7 +269,7 @@ public void noCompactWhenCompactAlreadyScheduled() throws Exception { Assert.assertEquals("initiated", compacts.get(0).getState()); Assert.assertEquals("ncwcas", compacts.get(0).getTablename()); - startInitiator(conf); + startInitiator(); rsp = txnHandler.showCompact(new ShowCompactRequest()); compacts = rsp.getCompacts(); @@ -291,11 +283,9 @@ public void noCompactWhenCompactAlreadyScheduled() throws Exception { public void compactTableHighDeltaPct() throws Exception { Table t = newTable("default", "cthdp", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(23); @@ -309,7 +299,7 @@ public void compactTableHighDeltaPct() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -324,11 +314,9 @@ public void compactPartitionHighDeltaPct() throws Exception { Table t = newTable("default", "cphdp", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); burnThroughTransactions(23); @@ -343,7 +331,7 @@ public void compactPartitionHighDeltaPct() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -358,11 +346,9 @@ public void compactPartitionHighDeltaPct() throws Exception { public void noCompactTableDeltaPctNotHighEnough() throws Exception { Table t = newTable("default", "nctdpnhe", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 50L, 50); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(t, null, 50L, 50); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(53); @@ -376,7 +362,7 @@ public void noCompactTableDeltaPctNotHighEnough() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); @@ -386,20 +372,18 @@ public void noCompactTableDeltaPctNotHighEnough() throws Exception { public void compactTableTooManyDeltas() throws Exception { Table t = newTable("default", "cttmd", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 200L, 200); - addDeltaFile(conf, t, null, 201L, 201L, 1); - addDeltaFile(conf, t, null, 202L, 202L, 1); - addDeltaFile(conf, t, null, 203L, 203L, 1); - addDeltaFile(conf, t, null, 204L, 204L, 1); - addDeltaFile(conf, t, null, 205L, 205L, 1); - addDeltaFile(conf, t, null, 206L, 206L, 1); - addDeltaFile(conf, t, null, 207L, 207L, 1); - addDeltaFile(conf, t, null, 208L, 208L, 1); - addDeltaFile(conf, t, null, 209L, 209L, 1); - addDeltaFile(conf, t, null, 210L, 210L, 1); - addDeltaFile(conf, t, null, 211L, 211L, 1); + addBaseFile(t, null, 200L, 200); + addDeltaFile(t, null, 201L, 201L, 1); + addDeltaFile(t, null, 202L, 202L, 1); + addDeltaFile(t, null, 203L, 203L, 1); + addDeltaFile(t, null, 204L, 204L, 1); + addDeltaFile(t, null, 205L, 205L, 1); + addDeltaFile(t, null, 206L, 206L, 1); + addDeltaFile(t, null, 207L, 207L, 1); + addDeltaFile(t, null, 208L, 208L, 1); + addDeltaFile(t, null, 209L, 209L, 1); + addDeltaFile(t, null, 210L, 210L, 1); + addDeltaFile(t, null, 211L, 211L, 1); burnThroughTransactions(210); @@ -413,7 +397,7 @@ public void compactTableTooManyDeltas() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -428,20 +412,18 @@ public void compactPartitionTooManyDeltas() throws Exception { Table t = newTable("default", "cptmd", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 200L, 200); - addDeltaFile(conf, t, p, 201L, 201L, 1); - addDeltaFile(conf, t, p, 202L, 202L, 1); - addDeltaFile(conf, t, p, 203L, 203L, 1); - addDeltaFile(conf, t, p, 204L, 204L, 1); - addDeltaFile(conf, t, p, 205L, 205L, 1); - addDeltaFile(conf, t, p, 206L, 206L, 1); - addDeltaFile(conf, t, p, 207L, 207L, 1); - addDeltaFile(conf, t, p, 208L, 208L, 1); - addDeltaFile(conf, t, p, 209L, 209L, 1); - addDeltaFile(conf, t, p, 210L, 210L, 1); - addDeltaFile(conf, t, p, 211L, 211L, 1); + addBaseFile(t, p, 200L, 200); + addDeltaFile(t, p, 201L, 201L, 1); + addDeltaFile(t, p, 202L, 202L, 1); + addDeltaFile(t, p, 203L, 203L, 1); + addDeltaFile(t, p, 204L, 204L, 1); + addDeltaFile(t, p, 205L, 205L, 1); + addDeltaFile(t, p, 206L, 206L, 1); + addDeltaFile(t, p, 207L, 207L, 1); + addDeltaFile(t, p, 208L, 208L, 1); + addDeltaFile(t, p, 209L, 209L, 1); + addDeltaFile(t, p, 210L, 210L, 1); + addDeltaFile(t, p, 211L, 211L, 1); burnThroughTransactions(210); @@ -456,7 +438,7 @@ public void compactPartitionTooManyDeltas() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -471,11 +453,9 @@ public void compactPartitionTooManyDeltas() throws Exception { public void noCompactTableNotEnoughDeltas() throws Exception { Table t = newTable("default", "nctned", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 200L, 200); - addDeltaFile(conf, t, null, 201L, 205L, 5); - addDeltaFile(conf, t, null, 206L, 211L, 6); + addBaseFile(t, null, 200L, 200); + addDeltaFile(t, null, 201L, 205L, 5); + addDeltaFile(t, null, 206L, 211L, 6); burnThroughTransactions(210); @@ -489,7 +469,7 @@ public void noCompactTableNotEnoughDeltas() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); @@ -499,20 +479,18 @@ public void noCompactTableNotEnoughDeltas() throws Exception { public void chooseMajorOverMinorWhenBothValid() throws Exception { Table t = newTable("default", "cmomwbv", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 200L, 200); - addDeltaFile(conf, t, null, 201L, 211L, 11); - addDeltaFile(conf, t, null, 212L, 222L, 11); - addDeltaFile(conf, t, null, 223L, 233L, 11); - addDeltaFile(conf, t, null, 234L, 244L, 11); - addDeltaFile(conf, t, null, 245L, 255L, 11); - addDeltaFile(conf, t, null, 256L, 266L, 11); - addDeltaFile(conf, t, null, 267L, 277L, 11); - addDeltaFile(conf, t, null, 278L, 288L, 11); - addDeltaFile(conf, t, null, 289L, 299L, 11); - addDeltaFile(conf, t, null, 300L, 310L, 11); - addDeltaFile(conf, t, null, 311L, 321L, 11); + addBaseFile(t, null, 200L, 200); + addDeltaFile(t, null, 201L, 211L, 11); + addDeltaFile(t, null, 212L, 222L, 11); + addDeltaFile(t, null, 223L, 233L, 11); + addDeltaFile(t, null, 234L, 244L, 11); + addDeltaFile(t, null, 245L, 255L, 11); + addDeltaFile(t, null, 256L, 266L, 11); + addDeltaFile(t, null, 267L, 277L, 11); + addDeltaFile(t, null, 278L, 288L, 11); + addDeltaFile(t, null, 289L, 299L, 11); + addDeltaFile(t, null, 300L, 310L, 11); + addDeltaFile(t, null, 311L, 321L, 11); burnThroughTransactions(320); @@ -526,7 +504,7 @@ public void chooseMajorOverMinorWhenBothValid() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -541,19 +519,17 @@ public void enoughDeltasNoBase() throws Exception { Table t = newTable("default", "ednb", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addDeltaFile(conf, t, p, 1L, 201L, 200); - addDeltaFile(conf, t, p, 202L, 202L, 1); - addDeltaFile(conf, t, p, 203L, 203L, 1); - addDeltaFile(conf, t, p, 204L, 204L, 1); - addDeltaFile(conf, t, p, 205L, 205L, 1); - addDeltaFile(conf, t, p, 206L, 206L, 1); - addDeltaFile(conf, t, p, 207L, 207L, 1); - addDeltaFile(conf, t, p, 208L, 208L, 1); - addDeltaFile(conf, t, p, 209L, 209L, 1); - addDeltaFile(conf, t, p, 210L, 210L, 1); - addDeltaFile(conf, t, p, 211L, 211L, 1); + addDeltaFile(t, p, 1L, 201L, 200); + addDeltaFile(t, p, 202L, 202L, 1); + addDeltaFile(t, p, 203L, 203L, 1); + addDeltaFile(t, p, 204L, 204L, 1); + addDeltaFile(t, p, 205L, 205L, 1); + addDeltaFile(t, p, 206L, 206L, 1); + addDeltaFile(t, p, 207L, 207L, 1); + addDeltaFile(t, p, 208L, 208L, 1); + addDeltaFile(t, p, 209L, 209L, 1); + addDeltaFile(t, p, 210L, 210L, 1); + addDeltaFile(t, p, 211L, 211L, 1); burnThroughTransactions(210); @@ -568,7 +544,7 @@ public void enoughDeltasNoBase() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -584,11 +560,9 @@ public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exceptio Table t = newTable("default", "ttospgocr", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); burnThroughTransactions(23); @@ -614,7 +588,7 @@ public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exceptio res = txnHandler.lock(req); txnHandler.commitTxn(new CommitTxnRequest(txnid)); - startInitiator(conf); + startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -626,9 +600,4 @@ public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exceptio } // TODO test compactions with legacy file types - - @Before - public void setUpTxnDb() throws Exception { - TxnDbUtil.setConfValues(new HiveConf()); - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 90a722b..17bbe24 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -48,7 +48,7 @@ public TestWorker() throws Exception { public void nothing() throws Exception { // Test that the whole things works when there's nothing in the queue. This is just a // survival test. - startWorker(new HiveConf()); + startWorker(); } @Test @@ -205,19 +205,17 @@ public void sortedTable() throws Exception { Table t = newTable("default", "st", false, new HashMap(), sortCols); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); - addDeltaFile(conf, t, null, 21L, 24L, 4); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 21L, 24L, 4); burnThroughTransactions(25); CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); @@ -232,12 +230,11 @@ public void sortedPartition() throws Exception { Table t = newTable("default", "sp", true, new HashMap(), sortCols); Partition p = newPartition(t, "today", sortCols); - HiveConf conf = new HiveConf(); - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); - addDeltaFile(conf, t, p, 21L, 24L, 4); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaFile(t, p, 21L, 24L, 4); burnThroughTransactions(25); @@ -245,7 +242,7 @@ public void sortedPartition() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); @@ -258,18 +255,16 @@ public void minorTableWithBase() throws Exception { LOG.debug("Starting minorTableWithBase"); Table t = newTable("default", "mtwb", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(25); CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR); txnHandler.compact(rqst); - startWorker(conf); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -304,11 +299,10 @@ public void minorTableWithBase() throws Exception { public void minorPartitionWithBase() throws Exception { Table t = newTable("default", "mpwb", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); burnThroughTransactions(25); @@ -316,7 +310,7 @@ public void minorPartitionWithBase() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -351,17 +345,15 @@ public void minorTableNoBase() throws Exception { LOG.debug("Starting minorTableWithBase"); Table t = newTable("default", "mtnb", false); - HiveConf conf = new HiveConf(); - - addDeltaFile(conf, t, null, 1L, 2L, 2); - addDeltaFile(conf, t, null, 3L, 4L, 2); + addDeltaFile(t, null, 1L, 2L, 2); + addDeltaFile(t, null, 3L, 4L, 2); burnThroughTransactions(5); CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -396,18 +388,16 @@ public void majorTableWithBase() throws Exception { LOG.debug("Starting majorTableWithBase"); Table t = newTable("default", "matwb", false); - HiveConf conf = new HiveConf(); - - addBaseFile(conf, t, null, 20L, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(25); CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -442,11 +432,10 @@ public void majorPartitionWithBase() throws Exception { LOG.debug("Starting majorPartitionWithBase"); Table t = newTable("default", "mapwb", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - addBaseFile(conf, t, p, 20L, 20); - addDeltaFile(conf, t, p, 21L, 22L, 2); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); burnThroughTransactions(25); @@ -454,7 +443,7 @@ public void majorPartitionWithBase() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -489,17 +478,15 @@ public void majorTableNoBase() throws Exception { LOG.debug("Starting majorTableNoBase"); Table t = newTable("default", "matnb", false); - HiveConf conf = new HiveConf(); - - addDeltaFile(conf, t, null, 1L, 2L, 2); - addDeltaFile(conf, t, null, 3L, 4L, 2); + addDeltaFile(t, null, 1L, 2L, 2); + addDeltaFile(t, null, 3L, 4L, 2); burnThroughTransactions(5); CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -534,18 +521,16 @@ public void majorTableLegacy() throws Exception { LOG.debug("Starting majorTableLegacy"); Table t = newTable("default", "matl", false); - HiveConf conf = new HiveConf(); - - addLegacyFile(conf, t, null, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addLegacyFile(t, null, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(25); CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -580,18 +565,16 @@ public void minorTableLegacy() throws Exception { LOG.debug("Starting minorTableLegacy"); Table t = newTable("default", "mtl", false); - HiveConf conf = new HiveConf(); - - addLegacyFile(conf, t, null, 20); - addDeltaFile(conf, t, null, 21L, 22L, 2); - addDeltaFile(conf, t, null, 23L, 24L, 2); + addLegacyFile(t, null, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); burnThroughTransactions(25); CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -622,11 +605,10 @@ public void minorTableLegacy() throws Exception { public void majorPartitionWithBaseMissingBuckets() throws Exception { Table t = newTable("default", "mapwbmb", true); Partition p = newPartition(t, "today"); - HiveConf conf = new HiveConf(); - addBaseFile(conf, t, p, 20L, 20, 2, false); - addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addBaseFile(t, p, 20L, 20, 2, false); + addDeltaFile(t, p, 21L, 22L, 2, 2, false); + addDeltaFile(t, p, 23L, 24L, 2); burnThroughTransactions(25); @@ -634,7 +616,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - startWorker(new HiveConf()); + startWorker(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -669,9 +651,4 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { } Assert.assertTrue(sawNewBase); } - - @Before - public void setUpTxnDb() throws Exception { - TxnDbUtil.setConfValues(new HiveConf()); - } }