diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 45c5807..e71535e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -992,6 +992,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false), HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false), HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false), + HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index da367ca..5d8ab69 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -791,12 +791,56 @@ public void minorCompactAfterAbort() throws Exception { } /** + * HIVE-12353 + * @throws Exception + */ + @Test + public void testInitiatorWithMultipleFailedCompactions() throws Exception { + String tblName = "hive12353"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4); + for(int i = 0; i < 5; i++) { + //generate 5 delta files so that Initiator schedules + executeStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')", driver); + } + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + + AtomicBoolean stop = new AtomicBoolean(true); + for(int i = 0; i < 3; i++) { + //each of these 3 should fail + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + //this should not schedule a new compaction due to prior failures + Initiator i = new Initiator(); + i.setThreadId((int)i.getId()); + i.setHiveConf(conf); + i.init(stop, new AtomicBoolean()); + i.run(); + + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number or compactions", 3, resp.getCompactsSize()); + Assert.assertEquals("Unexpected state for compact 0", CompactionTxnHandler.FAILED_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertEquals("Unexpected state for compact 1", CompactionTxnHandler.FAILED_RESPONSE, resp.getCompacts().get(1).getState()); + Assert.assertEquals("Unexpected state for compact 2", CompactionTxnHandler.FAILED_RESPONSE, resp.getCompacts().get(2).getState()); + } + /** * HIVE-12352 has details * @throws Exception */ @Test public void writeBetweenWorkerAndCleaner() throws Exception { - String tblName = "HIVE12352"; + String tblName = "hive12352"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index d3cb7d5..18afd9d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -27,13 +27,18 @@ public String dbname; public String tableName; public String partName; + char state; public CompactionType type; + String workerId; + long start; public String runAs; public boolean tooManyAborts = false; /** * {@code null} means it wasn't set (e.g. in case of upgrades) */ public Long highestTxnId; + byte[] metaInfo; + String hadoopJobId; private String fullPartitionName = null; private String fullTableName = null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 9130322..c28b42c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.hadoop.mapreduce.lib.chain.Chain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,16 +175,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { info.dbname = rs.getString(2); info.tableName = rs.getString(3); info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: - info.type = CompactionType.MAJOR; - break; - case MINOR_TYPE: - info.type = CompactionType.MINOR; - break; - default: - throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + @@ -314,6 +306,143 @@ public void markCompacted(CompactionInfo info) throws MetaException { } /** + * 1. need config var to control retention period in COMPLETED_COMPACTIONS. Should this + * be system wide or a table prop? Best is to have both. Some tables make have + * frequent compactions (every 20min) others once a week. Thus a single 24h retention + * period won't really work. Ideally, we should have a general mechanism where every (where reasonable) + * property can be specified system wide and overridden per table. + * + * Perhaps instead retention period should be number of runs worth of history? + * + * If last 3 consecutive runs failed, don't start a new compaction. + * + * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should. + * That would be a meta operations, i.e. first find all partitions for this table (which have + * txn info) and schedule each compaction separately. This avoids complications in this logic. + * todo: should this throw anything? + * + * todo: background process to clean "history" + */ + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = " + quoteString(ci.dbname) + " and " + + "CC_TABLE = " + quoteString(ci.tableName) + + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + + " order by CC_ID desc"); + int numFailed = 0; + int numTotal = 0; + while(rs.next() && ++numTotal < 3) { + if(rs.getString(1).charAt(1) == FAILED_STATE) { + numFailed++; + } + else { + numFailed--; + } + } + return numFailed == 3; + } + catch (SQLException e) { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + return false;//todo: is this right? + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return checkFailedCompactions(ci); + } + } + /** + * If there is an entry in compaction_queue with ci.id, remove it + * Make entry in completed_compactions with status 'f'. + * + * TODO: normal markCleaned() has to copy from Queue to Completed just like this method + * but what abount markCleaned() which is called when table is had been deleted... + * todo: make sure Initiator/Worker check COMPLETED_COMP... to see if it should bail. + * Actually only iniitator should check. Worker should just go. + */ + public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw + try { + Connection dbConn = null; + Statement stmt = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + //do we need serializable? Once we have the HWM as above, no. Before that + //it's debatable, but problem described above applies either way + //Thus can drop to RC + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + if(rs.next()) { + CompactionInfo fullCi = new CompactionInfo(); + fullCi.id = ci.id; + fullCi.dbname = rs.getString(1); + fullCi.tableName = rs.getString(2); + fullCi.partName = rs.getString(3); + fullCi.state = rs.getString(4).charAt(0);//cq_state + fullCi.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); + fullCi.workerId = rs.getString(6); + fullCi.start = rs.getLong(7); + fullCi.runAs = rs.getString(8); + fullCi.highestTxnId = rs.getLong(9); + fullCi.metaInfo = rs.getBytes(10); + fullCi.hadoopJobId = rs.getString(11); + close(rs); + ci = fullCi; + String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + close(rs, stmt, null); + } + + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + pStmt.setLong(1, ci.id); + pStmt.setString(2, ci.dbname); + pStmt.setString(3, ci.tableName); + pStmt.setString(4, ci.partName); + pStmt.setString(5, Character.toString(FAILED_STATE)); + pStmt.setString(6, Character.toString(thriftCompactionType2DbType(ci.type))); + pStmt.setString(7, ci.workerId); + pStmt.setLong(8, ci.start); + pStmt.setLong(9, getDbTime(dbConn)); + pStmt.setString(10, ci.runAs); + pStmt.setLong(11, ci.highestTxnId); + pStmt.setBytes(12, ci.metaInfo); + pStmt.setString(13, ci.hadoopJobId); + pStmt.execute(); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + try { + checkRetryable(dbConn, e, "markFailed(" + ci + ")"); + } + catch(MetaException ex) { + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); + } + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + markFailed(ci); + } + } + + /** * This will remove an entry from the queue after * it has been compacted. * diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 2015526..2a7545c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -118,10 +118,27 @@ public static void prepDb() throws Exception { " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + " CQ_RUN_AS varchar(128)," + - " CQ_HIGHEST_TXN_ID bigint)"); + " CQ_HIGHEST_TXN_ID bigint," + + " CQ_META_INFO varchar(2048) for bit data," + + " CQ_HADOOP_JOB_ID varchar(32))"); stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); + + stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + + " CC_ID bigint PRIMARY KEY," + + " CC_DATABASE varchar(128) NOT NULL," + + " CC_TABLE varchar(128) NOT NULL," + + " CC_PARTITION varchar(767)," + + " CC_STATE char(1) NOT NULL," + + " CC_TYPE char(1) NOT NULL," + + " CC_WORKER_ID varchar(128)," + + " CC_START bigint," + + " CC_END bigint," + + " CC_RUN_AS varchar(128)," + + " CC_HIGHEST_TXN_ID bigint," + + " CC_META_INFO varchar(2048) for bit data," + + " CC_HADOOP_JOB_ID varchar(32))"); conn.commit(); } catch (SQLException e) { @@ -161,7 +178,7 @@ public static void cleanDb() throws Exception { dropTable(stmt, "NEXT_LOCK_ID"); dropTable(stmt, "COMPACTION_QUEUE"); dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); - + dropTable(stmt, "COMPLETED_COMPACTIONS"); conn.commit(); } finally { closeResources(conn, stmt, null); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 50d8892..9c26733 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -68,10 +68,14 @@ static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; static final public String CLEANING_RESPONSE = "ready for cleaning"; + static final public String FAILED_RESPONSE = "failed"; + static final public String SUCCESS_RESPONSE = "SUCCESS"; static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; static final protected char READY_FOR_CLEANING = 'r'; + static final char FAILED_STATE = 'f'; + static final char SUCCESS_STATE = 's'; // Compactor types static final protected char MAJOR_TYPE = 'a'; @@ -759,7 +763,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } } - public void compact(CompactionRequest rqst) throws MetaException { + public long compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { Connection dbConn = null; @@ -826,6 +830,7 @@ public void compact(CompactionRequest rqst) throws MetaException { stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); + return id; } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -837,7 +842,7 @@ public void compact(CompactionRequest rqst) throws MetaException { closeDbConn(dbConn); } } catch (RetryException e) { - compact(rqst); + return compact(rqst); } } @@ -850,7 +855,11 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, cq_run_as from COMPACTION_QUEUE"; + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id from COMPACTION_QUEUE union all " + + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id from COMPLETED_COMPACTIONS order by cc_end desc"; + //sort so that currently running jobs are at the end of the list (bottom of screen) + //good idea?!?! LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -862,6 +871,8 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; case WORKING_STATE: e.setState(WORKING_RESPONSE); break; case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; + case FAILED_STATE: e.setState(FAILED_RESPONSE); break; + case SUCCESS_STATE: e.setState(SUCCESS_RESPONSE); break; default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); } switch (rs.getString(5).charAt(0)) { @@ -871,7 +882,12 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep } e.setWorkerid(rs.getString(6)); e.setStart(rs.getLong(7)); - e.setRunAs(rs.getString(8)); + long endTime = rs.getLong(8); + if(endTime != -1) { + e.setEndTime(endTime); + } + e.setRunAs(rs.getString(9)); + e.setHadoopJobId(rs.getString(10)); response.addToCompacts(e); } LOG.debug("Going to rollback"); @@ -2374,41 +2390,32 @@ private String addForUpdateClause(Connection dbConn, String selectStatement) thr throw new MetaException(msg); } } - /** - * the caller is expected to retry if this fails - * - * @return - * @throws SQLException - * @throws MetaException - */ - private long generateNewExtLockId() throws SQLException, MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(getRequiredIsolationLevel()); - stmt = dbConn.createStatement(); - - // Get the next lock id. - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit."); - dbConn.commit(); - return extLockId; + static String quoteString(String input) { + return "'" + input + "'"; + } + static String quoteString(char input) { + return "'" + input + "'"; + } + CompactionType dbCompactionType2ThriftType(char dbValue) { + switch (dbValue) { + case MAJOR_TYPE: + return CompactionType.MAJOR; + case MINOR_TYPE: + return CompactionType.MINOR; + default: + LOG.warn("Unexpected compaction type " + dbValue); + return null;//todo: bad idea? } - finally { - close(rs, stmt, dbConn); + } + Character thriftCompactionType2DbType(CompactionType ct) { + switch (ct) { + case MAJOR: + return MAJOR_TYPE; + case MINOR: + return MINOR_TYPE; + default: + LOG.warn("Unexpected compaction type " + ct); + return null;//todo: bad idea? } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index b847202..fbf5481 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -189,6 +189,7 @@ private void clean(CompactionInfo ci) throws MetaException { if (t == null) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped"); + txnHandler.markCleaned(ci); return; } Partition p = null; @@ -198,6 +199,7 @@ private void clean(CompactionInfo ci) throws MetaException { // The partition was dropped before we got around to cleaning it. LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped"); + txnHandler.markCleaned(ci); return; } } @@ -223,13 +225,11 @@ public Object run() throws Exception { } }); } - + txnHandler.markCleaned(ci); } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning " + + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + StringUtils.stringifyException(e)); - } finally { - // We need to clean this out one way or another. - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7d0f46a..07ac0c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -137,6 +137,10 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag */ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException { + + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { + throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); + } JobConf job = createBaseJobConf(conf, jobName, t, sd, txns); // Figure out and encode what files we need to read. We do this here (rather than in diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a8fe57d..721d5b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -119,6 +119,10 @@ public void run() { ci.getFullPartitionName() + " so we will not initiate another compaction"); continue; } + if(txnHandler.checkFailedCompactions(ci)) { + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed."); + continue; + } // Figure out who we should run the file operations as Partition p = resolvePartition(ci); @@ -134,9 +138,9 @@ public void run() { if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + - ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " + + ci + ". Marking clean to avoid repeated failures, " + "" + StringUtils.stringifyException(t)); - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } @@ -300,7 +304,7 @@ private void requestCompaction(CompactionInfo ci, String runAs, CompactionType t if (ci.partName != null) rqst.setPartitionname(ci.partName); rqst.setRunas(runAs); LOG.info("Requesting compaction: " + rqst); - txnHandler.compact(rqst); + ci.id = txnHandler.compact(rqst); } // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 045ce63..7b0fb20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -70,7 +69,7 @@ public static String hostname() { throw new RuntimeException(e); } } - +//todo: this doesn;t check if compaction is already running (even though Initiator does but we don't go through Initiator for user initiated compactions) @Override public void run() { do { @@ -174,9 +173,9 @@ public Object run() throws Exception { } txnHandler.markCompacted(ci); } catch (Exception e) { - LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() + + LOG.error("Caught exception while trying to compact " + ci + ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); - txnHandler.markCleaned(ci); + txnHandler.markFailed(ci); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " +