diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 32049eb..8cb4a19 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -993,6 +993,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false), HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false), HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false), + HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), @@ -1571,11 +1572,32 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000, "Number of aborted transactions involving a given table or partition that will trigger\n" + "a major compaction."), - + + COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2, + new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " + + "after which automatic compactions will not be scheduled any more. Note that this must be less " + + "than hive.compactor.history.retention.failed."), + HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), + + COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3, + new RangeValidator(0, 100), "Determines how many successful compaction records will be " + + "retained in compaction history for a given table/partition."), + + COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3, + new RangeValidator(0, 100), "Determines how many failed compaction records will be " + + "retained in compaction history for a given table/partition."), + + COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2, + new RangeValidator(0, 100), "Determines how many attempted compaction records will be " + + "retained in compaction history for a given table/partition."), + + COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m", + new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"), + HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index da367ca..226a1fa 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -790,63 +790,6 @@ public void minorCompactAfterAbort() throws Exception { } } - /** - * HIVE-12352 has details - * @throws Exception - */ - @Test - public void writeBetweenWorkerAndCleaner() throws Exception { - String tblName = "HIVE12352"; - executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + - " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); - - //create some data - executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver); - executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver); - - //run Worker to execute compaction - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); - txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setHiveConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - - //delete something, but make sure txn is rolled back - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); - executeStatementOnDriver("delete from " + tblName + " where a = 1", driver); - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - - List expected = new ArrayList<>(); - expected.add("1\tfoo"); - expected.add("2\tbar"); - expected.add("3\tblah"); - Assert.assertEquals("", expected, - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()")); - - //run Cleaner - Cleaner c = new Cleaner(); - c.setThreadId((int)c.getId()); - c.setHiveConf(conf); - c.init(stop, new AtomicBoolean()); - c.run(); - - //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns() - Initiator i = new Initiator(); - i.setThreadId((int)i.getId()); - i.setHiveConf(conf); - i.init(stop, new AtomicBoolean()); - i.run(); - - //check that aborted operation didn't become committed - Assert.assertEquals("", expected, - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()")); - } @Test public void majorCompactAfterAbort() throws Exception { String dbName = "default"; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ace644b..7830f17 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6406,7 +6406,10 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception { if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) { return; } - Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); + } + private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() //should be called form it HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java index eb4ea93..539ace0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java @@ -36,4 +36,10 @@ * Returns short description of services this module provides. */ public String getServiceDescription(); + + /** + * This is incremented each time the service is performed. Can be useful to + * check if serivce is still alive. + */ + public int getIsAliveCounter(); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index d3cb7d5..73255d2 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -19,6 +19,10 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + /** * Information on a possible or running compaction. */ @@ -27,13 +31,18 @@ public String dbname; public String tableName; public String partName; + char state; public CompactionType type; + String workerId; + long start; public String runAs; public boolean tooManyAborts = false; /** - * {@code null} means it wasn't set (e.g. in case of upgrades) + * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) */ - public Long highestTxnId; + public long highestTxnId; + byte[] metaInfo; + String hadoopJobId; private String fullPartitionName = null; private String fullTableName = null; @@ -44,6 +53,11 @@ public CompactionInfo(String dbname, String tableName, String partName, Compacti this.partName = partName; this.type = type; } + CompactionInfo(long id, String dbname, String tableName, String partName, char state) { + this(dbname, tableName, partName, null); + this.id = id; + this.state = state; + } CompactionInfo() {} public String getFullPartitionName() { @@ -82,9 +96,47 @@ public String toString() { "dbname:" + dbname + "," + "tableName:" + tableName + "," + "partName:" + partName + "," + + "state:" + state + "," + "type:" + type + "," + "runAs:" + runAs + "," + "tooManyAborts:" + tooManyAborts + "," + "highestTxnId:" + highestTxnId; } + + /** + * loads object from a row in Select * from COMPACTION_QUEUE + * @param rs ResultSet after call to rs.next() + * @throws SQLException + */ + static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException { + CompactionInfo fullCi = new CompactionInfo(); + fullCi.id = rs.getLong(1); + fullCi.dbname = rs.getString(2); + fullCi.tableName = rs.getString(3); + fullCi.partName = rs.getString(4); + fullCi.state = rs.getString(5).charAt(0);//cq_state + fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0)); + fullCi.workerId = rs.getString(7); + fullCi.start = rs.getLong(8); + fullCi.runAs = rs.getString(9); + fullCi.highestTxnId = rs.getLong(10); + fullCi.metaInfo = rs.getBytes(11); + fullCi.hadoopJobId = rs.getString(12); + return fullCi; + } + static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { + pStmt.setLong(1, ci.id); + pStmt.setString(2, ci.dbname); + pStmt.setString(3, ci.tableName); + pStmt.setString(4, ci.partName); + pStmt.setString(5, Character.toString(ci.state)); + pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type))); + pStmt.setString(7, ci.workerId); + pStmt.setLong(8, ci.start); + pStmt.setLong(9, endTime); + pStmt.setString(10, ci.runAs); + pStmt.setLong(11, ci.highestTxnId); + pStmt.setBytes(12, ci.metaInfo); + pStmt.setString(13, ci.hadoopJobId); + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 9130322..18b288d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -174,16 +174,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { info.dbname = rs.getString(2); info.tableName = rs.getString(3); info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: - info.type = CompactionType.MAJOR; - break; - case MINOR_TYPE: - info.type = CompactionType.MINOR; - break; - default: - throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + @@ -291,8 +282,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); - long highestTxnId = rs.getLong(7); - info.highestTxnId = rs.wasNull() ? null : highestTxnId; + info.highestTxnId = rs.getLong(7); rc.add(info); } LOG.debug("Going to rollback"); @@ -323,13 +313,19 @@ public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { - //do we need serializable? Once we have the HWM as above, no. Before that - //it's debatable, but problem described above applies either way - //Thus can drop to RC - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + if(rs.next()) { + info = CompactionInfo.loadFullFromCompactionQueue(rs); + } + else { + throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); + } + close(rs); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCount = stmt.executeUpdate(s); @@ -338,6 +334,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); } + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + info.state = SUCCEEDED_STATE; + CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); + updCount = pStmt.executeUpdate(); // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest txn ID include in this compaction job. @@ -347,7 +347,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { if (info.partName != null) { s += " and ctc_partition = '" + info.partName + "'"; } - if(info.highestTxnId != null) { + if(info.highestTxnId != 0) { s += " and ctc_txnid <= " + info.highestTxnId; } LOG.debug("Going to execute update <" + s + ">"); @@ -358,7 +358,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId); + info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId); if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); rs = stmt.executeQuery(s); @@ -406,6 +406,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(pStmt); close(rs, stmt, dbConn); } } catch (RetryException e) { @@ -668,6 +669,225 @@ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) thro setCompactionHighestTxnId(ci, highestTxnId); } } + private static class RetentionCounters { + int attemptedRetention = 0; + int failedRetention = 0; + int succeededRetention = 0; + RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) { + this.attemptedRetention = attemptedRetention; + this.failedRetention = failedRetention; + this.succeededRetention = succeededRetention; + } + } + private void checkForDeletion(List deleteSet, CompactionInfo ci, RetentionCounters rc) { + switch (ci.state) { + case ATTEMPTED_STATE: + if(--rc.attemptedRetention < 0) { + deleteSet.add(ci.id); + } + break; + case FAILED_STATE: + if(--rc.failedRetention < 0) { + deleteSet.add(ci.id); + } + break; + case SUCCEEDED_STATE: + if(--rc.succeededRetention < 0) { + deleteSet.add(ci.id); + } + break; + default: + //do nothing to hanlde future RU/D where we may want to add new state types + } + } + + /** + * For any given compactable entity (partition, table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + public void purgeCompactionHistory() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + List deleteSet = new ArrayList<>(); + RetentionCounters rc = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, + thus this query groups by entity and withing group sorts most recent first*/ + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); + String lastCompactedEntity = null; + /*In each group, walk from most recent and count occurences of each state type. Once you + * have counted enough (for each state) to satisfy retention policy, delete all other + * instances of this status.*/ + while(rs.next()) { + CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0)); + if(!ci.getFullPartitionName().equals(lastCompactedEntity)) { + lastCompactedEntity = ci.getFullPartitionName(); + rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + getFailedCompactionRetention(), + conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); + } + checkForDeletion(deleteSet, ci, rc); + } + close(rs); + + String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN("; + StringBuilder queryStr = new StringBuilder(baseDeleteSql); + for(int i = 0; i < deleteSet.size(); i++) { + if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) { + queryStr.setCharAt(queryStr.length() - 1, ')'); + stmt.executeUpdate(queryStr.toString()); + dbConn.commit(); + queryStr = new StringBuilder(baseDeleteSql); + } + queryStr.append(deleteSet.get(i)).append(','); + } + if(queryStr.length() > baseDeleteSql.length()) { + queryStr.setCharAt(queryStr.length() - 1, ')'); + int updCnt = stmt.executeUpdate(queryStr.toString()); + dbConn.commit(); + } + dbConn.commit(); + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "purgeCompactionHistory()"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException ex) { + purgeCompactionHistory(); + } + } + /** + * this ensures that the number of failed compaction entries retained is > than number of failed + * compaction threshold which prevents new compactions from being scheduled. + */ + public int getFailedCompactionRetention() { + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED); + if(failedRetention < failedThreshold) { + LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname + + "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" + + failedRetention + ". Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname + + "=" + failedRetention); + failedRetention = failedThreshold; + } + return failedRetention; + } + /** + * Returns {@code true} if there already exists sufficient number of consecutive failures for + * this table/partition so that no new automatic compactions will be scheduled. + * User initiated compactions don't do this check. + * + * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should. + * That would be a meta operations, i.e. first find all partitions for this table (which have + * txn info) and schedule each compaction separately. This avoids complications in this logic. + */ + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = " + quoteString(ci.dbname) + " and " + + "CC_TABLE = " + quoteString(ci.tableName) + + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + + " order by CC_ID desc"); + int numFailed = 0; + int numTotal = 0; + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + while(rs.next() && ++numTotal <= failedThreshold) { + if(rs.getString(1).charAt(0) == FAILED_STATE) { + numFailed++; + } + else { + numFailed--; + } + } + return numFailed == failedThreshold; + } + catch (SQLException e) { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + return false;//weren't able to check + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return checkFailedCompactions(ci); + } + } + /** + * If there is an entry in compaction_queue with ci.id, remove it + * Make entry in completed_compactions with status 'f'. + * + * but what abount markCleaned() which is called when table is had been deleted... + */ + public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw + //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure + try { + Connection dbConn = null; + Statement stmt = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + if(rs.next()) { + ci = CompactionInfo.loadFullFromCompactionQueue(rs); + String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; + LOG.debug("Going to execute update <" + s + ">"); + int updCnt = stmt.executeUpdate(s); + } + else { + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + } + close(rs, stmt, null); + + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + ci.state = FAILED_STATE; + CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + LOG.debug("Going to commit"); + closeStmt(pStmt); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + try { + checkRetryable(dbConn, e, "markFailed(" + ci + ")"); + } + catch(MetaException ex) { + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); + } + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, null); + close(null, pStmt, dbConn); + } + } catch (RetryException e) { + markFailed(ci); + } + } + } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 2015526..2a7545c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -118,10 +118,27 @@ public static void prepDb() throws Exception { " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + " CQ_RUN_AS varchar(128)," + - " CQ_HIGHEST_TXN_ID bigint)"); + " CQ_HIGHEST_TXN_ID bigint," + + " CQ_META_INFO varchar(2048) for bit data," + + " CQ_HADOOP_JOB_ID varchar(32))"); stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); + + stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + + " CC_ID bigint PRIMARY KEY," + + " CC_DATABASE varchar(128) NOT NULL," + + " CC_TABLE varchar(128) NOT NULL," + + " CC_PARTITION varchar(767)," + + " CC_STATE char(1) NOT NULL," + + " CC_TYPE char(1) NOT NULL," + + " CC_WORKER_ID varchar(128)," + + " CC_START bigint," + + " CC_END bigint," + + " CC_RUN_AS varchar(128)," + + " CC_HIGHEST_TXN_ID bigint," + + " CC_META_INFO varchar(2048) for bit data," + + " CC_HADOOP_JOB_ID varchar(32))"); conn.commit(); } catch (SQLException e) { @@ -161,7 +178,7 @@ public static void cleanDb() throws Exception { dropTable(stmt, "NEXT_LOCK_ID"); dropTable(stmt, "COMPACTION_QUEUE"); dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); - + dropTable(stmt, "COMPLETED_COMPACTIONS"); conn.commit(); } finally { closeResources(conn, stmt, null); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 50d8892..c836f80 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -64,14 +64,20 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class TxnHandler { - // Compactor states + // Compactor states (Should really be enum) static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; static final public String CLEANING_RESPONSE = "ready for cleaning"; + static final public String FAILED_RESPONSE = "failed"; + static final public String SUCCEEDED_RESPONSE = "succeeded"; + static final public String ATTEMPTED_RESPONSE = "attempted"; static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; static final protected char READY_FOR_CLEANING = 'r'; + static final char FAILED_STATE = 'f'; + static final char SUCCEEDED_STATE = 's'; + static final char ATTEMPTED_STATE = 'a'; // Compactor types static final protected char MAJOR_TYPE = 'a'; @@ -759,7 +765,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } } - public void compact(CompactionRequest rqst) throws MetaException { + public long compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { Connection dbConn = null; @@ -826,6 +832,7 @@ public void compact(CompactionRequest rqst) throws MetaException { stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); + return id; } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -837,7 +844,7 @@ public void compact(CompactionRequest rqst) throws MetaException { closeDbConn(dbConn); } } catch (RetryException e) { - compact(rqst); + return compact(rqst); } } @@ -850,7 +857,13 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, cq_run_as from COMPACTION_QUEUE"; + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; + //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) + //to sort so that currently running jobs are at the end of the list (bottom of screen) + //and currently running ones are in sorted by start time + //w/o order by likely currently running compactions will be first (LHS of Union) LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -862,16 +875,26 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; case WORKING_STATE: e.setState(WORKING_RESPONSE); break; case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; - default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); + case FAILED_STATE: e.setState(FAILED_RESPONSE); break; + case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break; + default: + //do nothing to handle RU/D if we add another status } switch (rs.getString(5).charAt(0)) { case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; case MINOR_TYPE: e.setType(CompactionType.MINOR); break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + default: + //do nothing to handle RU/D if we add another status } e.setWorkerid(rs.getString(6)); e.setStart(rs.getLong(7)); - e.setRunAs(rs.getString(8)); + long endTime = rs.getLong(8); + if(endTime != -1) { + e.setEndTime(endTime); + } + e.setRunAs(rs.getString(9)); + e.setHadoopJobId(rs.getString(10)); + long id = rs.getLong(11);//for debugging response.addToCompacts(e); } LOG.debug("Going to rollback"); @@ -2374,41 +2397,29 @@ private String addForUpdateClause(Connection dbConn, String selectStatement) thr throw new MetaException(msg); } } - /** - * the caller is expected to retry if this fails - * - * @return - * @throws SQLException - * @throws MetaException - */ - private long generateNewExtLockId() throws SQLException, MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(getRequiredIsolationLevel()); - stmt = dbConn.createStatement(); - - // Get the next lock id. - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit."); - dbConn.commit(); - return extLockId; + static String quoteString(String input) { + return "'" + input + "'"; + } + static CompactionType dbCompactionType2ThriftType(char dbValue) { + switch (dbValue) { + case MAJOR_TYPE: + return CompactionType.MAJOR; + case MINOR_TYPE: + return CompactionType.MINOR; + default: + LOG.warn("Unexpected compaction type " + dbValue); + return null; } - finally { - close(rs, stmt, dbConn); + } + static Character thriftCompactionType2DbType(CompactionType ct) { + switch (ct) { + case MAJOR: + return MAJOR_TYPE; + case MINOR: + return MINOR_TYPE; + default: + LOG.warn("Unexpected compaction type " + ct); + return null; } } } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 06e0932..ff2c2c1 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -219,7 +219,8 @@ public void testMarkCleaned() throws Exception { assertEquals(0, txnHandler.findReadyToClean().size()); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(0, rsp.getCompactsSize()); + assertEquals(1, rsp.getCompactsSize()); + assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java new file mode 100644 index 0000000..a91ca5c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HouseKeeperService; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Purges obsolete items from compaction history data + */ +public class AcidCompactionHistoryService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class); + + @Override + protected long getStartDelayMs() { + return 0; + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new ObsoleteEntryReaper(hiveConf, isAliveCounter); + } + + @Override + public String getServiceDescription() { + return "Removes obsolete entries from Compaction History"; + } + + private static final class ObsoleteEntryReaper implements Runnable { + private final CompactionTxnHandler txnHandler; + private final AtomicInteger isAliveCounter; + private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = new CompactionTxnHandler(hiveConf); + this.isAliveCounter = isAliveCounter; + } + + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + txnHandler.purgeCompactionHistory(); + int count = isAliveCounter.incrementAndGet(); + LOG.info("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index dee7601..96e4d40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -17,17 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -35,58 +30,40 @@ * Performs background tasks for Transaction management in Hive. * Runs inside Hive Metastore Service. */ -public class AcidHouseKeeperService implements HouseKeeperService { +public class AcidHouseKeeperService extends HouseKeeperServiceBase { private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class); - private ScheduledExecutorService pool = null; - private final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE); + @Override - public void start(HiveConf hiveConf) throws Exception { - HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); - if(!mgr.supportsAcid()) { - LOG.info(AcidHouseKeeperService.class.getName() + " not started since " + - mgr.getClass().getName() + " does not support Acid."); - return;//there are no transactions in this case - } - pool = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final AtomicInteger threadCounter = new AtomicInteger(); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement()); - } - }); - TimeUnit tu = TimeUnit.MILLISECONDS; - pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this), - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu), - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu), - TimeUnit.MILLISECONDS); - LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" + - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu); + protected long getStartDelayMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, TimeUnit.MILLISECONDS); } @Override - public void stop() { - if(pool != null && !pool.isShutdown()) { - pool.shutdown(); - } - pool = null; + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new TimedoutTxnReaper(hiveConf, isAliveCounter); } + @Override public String getServiceDescription() { return "Abort expired transactions"; } + private static final class TimedoutTxnReaper implements Runnable { private final TxnHandler txnHandler; - private final AcidHouseKeeperService owner; - private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) { + private final AtomicInteger isAliveCounter; + private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { txnHandler = new TxnHandler(hiveConf); - this.owner = owner; + this.isAliveCounter = isAliveCounter; } @Override public void run() { try { long startTime = System.currentTimeMillis(); txnHandler.performTimeOuts(); - int count = owner.isAliveCounter.incrementAndGet(); + int count = isAliveCounter.incrementAndGet(); LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); } catch(Throwable t) { @@ -94,12 +71,4 @@ public void run() { } } } - - /** - * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1. - * Starts with {@link java.lang.Integer#MIN_VALUE} - */ - public int getIsAliveCounter() { - return isAliveCounter.get(); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index b847202..fbf5481 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -189,6 +189,7 @@ private void clean(CompactionInfo ci) throws MetaException { if (t == null) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped"); + txnHandler.markCleaned(ci); return; } Partition p = null; @@ -198,6 +199,7 @@ private void clean(CompactionInfo ci) throws MetaException { // The partition was dropped before we got around to cleaning it. LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped"); + txnHandler.markCleaned(ci); return; } } @@ -223,13 +225,11 @@ public Object run() throws Exception { } }); } - + txnHandler.markCleaned(ci); } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning " + + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + StringUtils.stringifyException(e)); - } finally { - // We need to clean this out one way or another. - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7d0f46a..07ac0c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -137,6 +137,10 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag */ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException { + + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { + throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); + } JobConf job = createBaseJobConf(conf, jobName, t, sd, txns); // Figure out and encode what files we need to read. We do this here (rather than in diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java new file mode 100644 index 0000000..947f17c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HouseKeeperService; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class HouseKeeperServiceBase implements HouseKeeperService { + private static final Logger LOG = LoggerFactory.getLogger(HouseKeeperServiceBase.class); + private ScheduledExecutorService pool = null; + protected final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE); + protected HiveConf hiveConf; + + @Override + public void start(HiveConf hiveConf) throws Exception { + this.hiveConf = hiveConf; + HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + if(!mgr.supportsAcid()) { + LOG.info(this.getClass().getName() + " not started since " + + mgr.getClass().getName() + " does not support Acid."); + return;//there are no transactions in this case + } + pool = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, this.getClass().getName() + "-" + threadCounter.getAndIncrement()); + } + }); + + TimeUnit tu = TimeUnit.MILLISECONDS; + pool.scheduleAtFixedRate(getScheduedAction(hiveConf, isAliveCounter), getStartDelayMs(), + getIntervalMs(), tu); + LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + getStartDelayMs() + "/" + + getIntervalMs() + " " + tu); + } + + @Override + public void stop() { + if(pool != null && !pool.isShutdown()) { + pool.shutdown(); + } + pool = null; + } + + /** + * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1. + * Starts with {@link java.lang.Integer#MIN_VALUE} + */ + @Override + public int getIsAliveCounter() { + return isAliveCounter.get(); + } + + /** + * Delay in millis before first run of the task of this service. + */ + protected abstract long getStartDelayMs(); + /** + * Determines how fequently the service is running its task. + */ + protected abstract long getIntervalMs(); + + /** + * The actual task implementation. Must increment the counter on each iteration. + */ + protected abstract Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a8fe57d..2ef06de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -78,7 +78,7 @@ public void run() { // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop // don't doom the entire thread. - try { + try {//todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); @@ -119,6 +119,11 @@ public void run() { ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } + if(txnHandler.checkFailedCompactions(ci)) { + //todo: make 'a' state entry in completed_compactions + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed."); + continue; + } // Figure out who we should run the file operations as Partition p = resolvePartition(ci); @@ -134,9 +139,9 @@ public void run() { if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + - ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " + + ci + ". Marking clean to avoid repeated failures, " + "" + StringUtils.stringifyException(t)); - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } @@ -300,7 +305,7 @@ private void requestCompaction(CompactionInfo ci, String runAs, CompactionType t if (ci.partName != null) rqst.setPartitionname(ci.partName); rqst.setRunas(runAs); LOG.info("Requesting compaction: " + rqst); - txnHandler.compact(rqst); + ci.id = txnHandler.compact(rqst); } // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 045ce63..ce03c8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -70,7 +69,8 @@ public static String hostname() { throw new RuntimeException(e); } } - +//todo: this doesn;t check if compaction is already running (even though Initiator does but we +// don't go through Initiator for user initiated compactions) @Override public void run() { do { @@ -174,9 +174,9 @@ public Object run() throws Exception { } txnHandler.markCompacted(ci); } catch (Exception e) { - LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() + + LOG.error("Caught exception while trying to compact " + ci + ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " + diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b784585..7a1a3d2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -24,10 +24,21 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HouseKeeperService; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; +import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; @@ -42,6 +53,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -454,6 +466,211 @@ public void testInsertOverwriteWithSelfJoin() throws Exception { //insert overwrite not supported for ACID tables } /** + * HIVE-12353 + * @throws Exception + */ + @Test + public void testInitiatorWithMultipleFailedCompactions() throws Exception { + String tblName = "hive12353"; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4); + for(int i = 0; i < 5; i++) { + //generate enough delta files so that Initiator can trigger auto compaction + runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')"); + } + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + + int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + AtomicBoolean stop = new AtomicBoolean(true); + //create failed compactions + for(int i = 0; i < numFailedCompactions; i++) { + //each of these should fail + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + runWorker(hiveConf); + } + //this should not schedule a new compaction due to prior failures + Initiator init = new Initiator(); + init.setThreadId((int)init.getId()); + init.setHiveConf(hiveConf); + init.init(stop, new AtomicBoolean()); + init.run(); + + CompactionsByState cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed); + Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total); + + hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS); + AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService(); + runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history + cbs = countCompacts(txnHandler); + Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed); + Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total); + + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR)); + runWorker(hiveConf);//will fail + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + runWorker(hiveConf);//will fail + cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed); + Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total); + runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have + //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here) + cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); + Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total); + + + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated + cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); + Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated); + Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + + runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning + cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); + Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean); + Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + + runCleaner(hiveConf); // transition to Success state + runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes + cbs = countCompacts(txnHandler); + Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed); + Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded); + Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); + } + private static class CompactionsByState { + private int attempted; + private int failed; + private int initiated; + private int readyToClean; + private int succeeded; + private int working; + private int total; + } + private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException { + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + CompactionsByState compactionsByState = new CompactionsByState(); + compactionsByState.total = resp.getCompactsSize(); + for(ShowCompactResponseElement compact : resp.getCompacts()) { + if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) { + compactionsByState.failed++; + } + else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) { + compactionsByState.readyToClean++; + } + else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) { + compactionsByState.initiated++; + } + else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) { + compactionsByState.succeeded++; + } + else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) { + compactionsByState.working++; + } + else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) { + compactionsByState.attempted++; + } + } + return compactionsByState; + } + private static void runWorker(HiveConf hiveConf) throws MetaException { + AtomicBoolean stop = new AtomicBoolean(true); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + private static void runCleaner(HiveConf hiveConf) throws MetaException { + AtomicBoolean stop = new AtomicBoolean(true); + Cleaner t = new Cleaner(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception { + int lastCount = houseKeeperService.getIsAliveCounter(); + houseKeeperService.start(conf); + while(houseKeeperService.getIsAliveCounter() <= lastCount) { + try { + Thread.sleep(100);//make sure it has run at least once + } + catch(InterruptedException ex) { + //... + } + } + houseKeeperService.stop(); + } + + /** + * HIVE-12352 has details + * @throws Exception + */ + @Test + public void writeBetweenWorkerAndCleaner() throws Exception { + String tblName = "hive12352"; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + + //create some data + runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); + runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); + + //run Worker to execute compaction + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + //delete something, but make sure txn is rolled back + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("delete from " + tblName + " where a = 1"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + List expected = new ArrayList<>(); + expected.add("1\tfoo"); + expected.add("2\tbar"); + expected.add("3\tblah"); + Assert.assertEquals("", expected, + runStatementOnDriver("select a,b from " + tblName + " order by a")); + + //run Cleaner + Cleaner c = new Cleaner(); + c.setThreadId((int)c.getId()); + c.setHiveConf(hiveConf); + c.init(stop, new AtomicBoolean()); + c.run(); + + //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns() + Initiator i = new Initiator(); + i.setThreadId((int)i.getId()); + i.setHiveConf(hiveConf); + i.init(stop, new AtomicBoolean()); + i.run(); + + //check that aborted operation didn't become committed + Assert.assertEquals("", expected, + runStatementOnDriver("select a,b from " + tblName + " order by a")); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index bca5002..899f5a1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,8 @@ public void cleanupAfterMajorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, null); @@ -102,7 +104,8 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -131,7 +134,8 @@ public void cleanupAfterMinorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, null); @@ -169,7 +173,8 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -323,7 +328,8 @@ public void notBlockedBySubsequentLock() throws Exception { // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + Assert.assertEquals(1, compacts.size()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -396,7 +402,8 @@ public void partitionNotBlockedBySubsequentLock() throws Exception { // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + Assert.assertEquals(1, compacts.size()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -421,7 +428,8 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -451,7 +459,8 @@ public void droppedTable() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -478,7 +487,8 @@ public void droppedPartition() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Override boolean useHive130DeltaDirName() { diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index fe1d0d3..d0db406 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.*; @@ -933,7 +934,8 @@ public void droppedTable() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + Assert.assertEquals(1, compacts.size()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState())); } @Test @@ -957,6 +959,7 @@ public void droppedPartition() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); + Assert.assertEquals(1, compacts.size()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } }