diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index ab7da68..0c51713 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -691,7 +691,7 @@ private void checkForDeletion(List deleteSet, CompactionInfo ci, Retention } /** - * For any given compactable entity (partition, table if not partitioned) the history of compactions + * For any given compactable entity (partition; table if not partitioned) the history of compactions * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the * history such that a configurable number of each type of state is present. Any other entries * can be purged. This scheme has advantage of always retaining the last failure/success even if @@ -793,7 +793,7 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { "CC_DATABASE = " + quoteString(ci.dbname) + " and " + "CC_TABLE = " + quoteString(ci.tableName) + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + - " order by CC_ID desc"); + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); int numFailed = 0; int numTotal = 0; int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); @@ -825,7 +825,7 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { * If there is an entry in compaction_queue with ci.id, remove it * Make entry in completed_compactions with status 'f'. * - * but what abount markCleaned() which is called when table is had been deleted... + * but what aboun markCleaned() which is called when table has been deleted... */ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure @@ -845,12 +845,27 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho int updCnt = stmt.executeUpdate(s); } else { - throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + if(ci.id > 0) { + //the record with valid CQ_ID has disappeared - this is a sign of something wrong + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + } + } + if(ci.id == 0) { + //The failure occurred before we even made an entry in COMPACTION_QUEUE + //generate ID so that we can make an entry in COMPLETED_COMPACTIONS + ci.id = generateCompactionQueueId(stmt); + //mostly this indicates that the Initiator is paying attention to some table even though + //compactions are not happening. + ci.state = ATTEMPTED_STATE; + //this is not strictly accurate, but 'type' cannot be null. + ci.type = CompactionType.MINOR; + } + else { + ci.state = FAILED_STATE; } close(rs, stmt, null); pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); - ci.state = FAILED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit"); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f061767..bcc11a6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.metastore.txn; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Service; import com.jolbox.bonecp.BoneCPConfig; import com.jolbox.bonecp.BoneCPDataSource; import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Stat; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; @@ -1252,6 +1254,21 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } } + long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { + // Get the id for the next entry in the queue + String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + LOG.debug("going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Transaction tables not properly initiated, " + + "no record found in next_compaction_queue_id"); + } + long id = rs.getLong(1); + s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + return id; + } public long compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { @@ -1261,21 +1278,8 @@ public long compact(CompactionRequest rqst) throws MetaException { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - - // Get the id for the next entry in the queue - String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); - LOG.debug("going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); - } - long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + + long id = generateCompactionQueueId(stmt); StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + "cq_table, "); @@ -1315,7 +1319,7 @@ public long compact(CompactionRequest rqst) throws MetaException { buf.append(rqst.getRunas()); } buf.append("')"); - s = buf.toString(); + String s = buf.toString(); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1366,6 +1370,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; case FAILED_STATE: e.setState(FAILED_RESPONSE); break; case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break; + case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break; default: //do nothing to handle RU/D if we add another status } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 949cbd5..a55fa1c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -126,8 +126,9 @@ public void run() { continue; } if(txnHandler.checkFailedCompactions(ci)) { - //todo: make 'a' state entry in completed_compactions - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed."); + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); + txnHandler.markFailed(ci); continue; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 903337d..553003e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -525,30 +525,40 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { init.setHiveConf(hiveConf); init.init(stop, new AtomicBoolean()); init.run(); + int numAttemptedCompactions = 1; CompactionsByState cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed); - Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total); + Assert.assertEquals("Unexpected number of Attempted compactions", numAttemptedCompactions, cbs.attempted); + Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions + numAttemptedCompactions, cbs.total); hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS); AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService(); runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history cbs = countCompacts(txnHandler); Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed); - Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total); + Assert.assertEquals("Unexpected number of Attempted compactions", numAttemptedCompactions, cbs.attempted); + Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions + numAttemptedCompactions, cbs.total); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR)); runWorker(hiveConf);//will fail txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); runWorker(hiveConf);//will fail + init.run(); + numAttemptedCompactions++; + init.run(); + numAttemptedCompactions++; cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed); - Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total); + Assert.assertEquals("Unexpected number of Attempted compactions", numAttemptedCompactions, cbs.attempted); + Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2 + numAttemptedCompactions, cbs.total); runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here) cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); - Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total); + Assert.assertEquals("Unexpected num attempted2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), cbs.attempted); + Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), cbs.total); hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); @@ -557,20 +567,30 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated); - Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + Assert.assertEquals("working", 0, cbs.working); + Assert.assertEquals("succeeded", 0, cbs.succeeded); + Assert.assertEquals("Unexpected num attempted", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), cbs.attempted); + Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1, cbs.total); runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean); - Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + Assert.assertEquals("working", 0, cbs.working); + Assert.assertEquals("Unexpected num attempted", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), cbs.attempted); + Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED) + 1, cbs.total); runCleaner(hiveConf); // transition to Success state runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes cbs = countCompacts(txnHandler); Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); + Assert.assertEquals("Unexpected num attempted", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), cbs.attempted); + Assert.assertEquals("working", 0, cbs.working); Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded); - Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1, cbs.total); } /**