diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 5aff71e0e9..24d074a978 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -67,6 +67,7 @@ static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private long SLEEP_TIME = 10000; + private static final int NOT_SET = -1; private String workerName; private JobConf mrJob; // the MR job for compaction @@ -94,6 +95,7 @@ public void run() { // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. CompactionHeartbeater heartbeater = null; + long compactorTxnId = NOT_SET; try { if (msc == null) { msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); @@ -164,7 +166,7 @@ public void run() { * multiple statements in it (for query based compactor) which is not supported (and since * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ - long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -227,7 +229,6 @@ public Object run() throws Exception { } heartbeater.cancel(); msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - msc.commitTxn(compactorTxnId); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } @@ -253,7 +254,16 @@ public Object run() throws Exception { LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + StringUtils.stringifyException(t)); } finally { - if(heartbeater != null) { + if (compactorTxnId != NOT_SET) { + try { + msc.commitTxn(compactorTxnId); + } catch (TException e) { + LOG.error( + "Caught an exception while committing compaction in worker " + workerName + ", " + + StringUtils.stringifyException(e)); + } + } + if (heartbeater != null) { heartbeater.cancel(); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 908ceb43fc..ee04671637 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -469,4 +469,47 @@ public void testCompactionAbort() throws Exception { //now the aborted compactor txn is gone Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize()); } + + /** + * Not enough deltas to compact, no need to clean: there is absolutely nothing to do. + */ + @Test public void testNotEnoughToCompact() throws Exception { + int[][] tableData = {{1, 2}, {3, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); + + runWorker(hiveConf); + assertTableIsEmpty("TXNS"); + assertTableIsEmpty("TXN_COMPONENTS"); + + runCleaner(hiveConf); + assertTableIsEmpty("TXNS"); + assertTableIsEmpty("TXN_COMPONENTS"); + } + + /** + * There aren't enough deltas to compact, but cleaning is needed because an insert overwrite + * was executed. + */ + @Test public void testNotEnoughToCompactNeedsCleaning() throws Exception { + int[][] tableData = {{1, 2}, {3, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver( + "insert overwrite table " + Table.ACIDTBL + " " + makeValuesClause(tableData)); + + runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); + + runWorker(hiveConf); + assertTableIsEmpty("TXNS"); + assertTableIsEmpty("TXN_COMPONENTS"); + + runCleaner(hiveConf); + assertTableIsEmpty("TXNS"); + assertTableIsEmpty("TXN_COMPONENTS"); + } + + private void assertTableIsEmpty(String table) throws Exception { + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0, + TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table)); + } }