diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 99da86f910..7611aa7432 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -27,9 +27,12 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; @@ -230,4 +233,21 @@ protected boolean replIsCompactionDisabledForTable(Table tbl) { } return isCompactDisabled; } + + // Figure out if there are any currently running compactions on the same table or partition. + protected boolean lookForCurrentCompactions(ShowCompactResponse compactions, + CompactionInfo ci) { + if (compactions.getCompacts() != null) { + for (ShowCompactResponseElement e : compactions.getCompacts()) { + if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && + e.getDbname().equals(ci.dbname) && + e.getTablename().equals(ci.tableName) && + (e.getPartitionname() == null && ci.partName == null || + e.getPartitionname().equals(ci.partName))) { + return true; + } + } + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7a0e32463d..691056401a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -217,23 +216,6 @@ private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } - // Figure out if there are any currently running compactions on the same table or partition. - private boolean lookForCurrentCompactions(ShowCompactResponse compactions, - CompactionInfo ci) { - if (compactions.getCompacts() != null) { - for (ShowCompactResponseElement e : compactions.getCompacts()) { - if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && - e.getDbname().equals(ci.dbname) && - e.getTablename().equals(ci.tableName) && - (e.getPartitionname() == null && ci.partName == null || - e.getPartitionname().equals(ci.partName))) { - return true; - } - } - } - return false; - } - private CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, final StorageDescriptor sd, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 5aff71e0e9..d210405343 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -84,8 +84,7 @@ public static String hostname() { throw new RuntimeException(e); } } -//todo: this doesn;t check if compaction is already running (even though Initiator does but we -// don't go through Initiator for user initiated compactions) + @Override public void run() { LOG.info("Starting Worker thread"); @@ -100,6 +99,7 @@ public void run() { } final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo( msc.findNextCompact(workerName)); + LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { @@ -112,6 +112,12 @@ public void run() { } } + if (lookForCurrentCompactions(msc.showCompactions(), ci)) { + LOG.debug("Found currently initiated or working compaction for " + + ci.getFullPartitionName() + " so we will not start another compaction. "); + continue; + } + // Find the table we will be working with. Table t1 = null; try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 70ae85c458..7e775db671 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -1041,6 +1041,34 @@ public void oneDeltaWithAbortedTxn() throws Exception { verifyTxn1IsAborted(1, t, CompactionType.MINOR); } + @Test + public void noCompactWhenCompactionAlreadyRunning() throws Exception { + String dbName = "default"; + String tableName = "ncwcar"; + newTable(dbName, tableName, false); + CompactionRequest request = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + txnHandler.compact(request); + + ShowCompactResponse response = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = response.getCompacts(); + Assert.assertEquals(1, compacts.size()); + ShowCompactResponseElement compactResponseElement = compacts.get(0); + Assert.assertEquals("initiated", compactResponseElement.getState()); + Assert.assertEquals(dbName, compactResponseElement.getDbname()); + Assert.assertEquals(tableName, compactResponseElement.getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compactResponseElement.getType()); + + startWorker(); + response = txnHandler.showCompact(new ShowCompactRequest()); + compacts = response.getCompacts(); + Assert.assertEquals(1, compacts.size()); + compactResponseElement = compacts.get(0); + Assert.assertEquals("working", compactResponseElement.getState()); + Assert.assertEquals(dbName, compactResponseElement.getDbname()); + Assert.assertEquals(tableName, compactResponseElement.getTablename()); + Assert.assertEquals(CompactionType.MAJOR, compactResponseElement.getType()); + } + private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type) throws Exception { CompactionRequest rqst = new CompactionRequest("default", t.getTableName(), type);