diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index 4f9ddf1..87e7e30 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -70,8 +70,8 @@ public void readFromString(String src); /** - * Get the largest committed transaction id. - * @return largest committed transaction id + * Get the largest transaction id used. + * @return largest transaction id used */ public long getHighWatermark(); diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ab7e3c6..45c5807 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -991,6 +991,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVETESTMODEDUMMYSTATAGGR("hive.test.dummystats.aggregator", "", "internal variable for test", false), HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false), HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false), + HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 813b107..da367ca 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -790,6 +790,63 @@ public void minorCompactAfterAbort() throws Exception { } } + /** + * HIVE-12352 has details + * @throws Exception + */ + @Test + public void writeBetweenWorkerAndCleaner() throws Exception { + String tblName = "HIVE12352"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + //create some data + executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver); + executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver); + + //run Worker to execute compaction + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + //delete something, but make sure txn is rolled back + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + executeStatementOnDriver("delete from " + tblName + " where a = 1", driver); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + List expected = new ArrayList<>(); + expected.add("1\tfoo"); + expected.add("2\tbar"); + expected.add("3\tblah"); + Assert.assertEquals("", expected, + execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()")); + + //run Cleaner + Cleaner c = new Cleaner(); + c.setThreadId((int)c.getId()); + c.setHiveConf(conf); + c.init(stop, new AtomicBoolean()); + c.run(); + + //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns() + Initiator i = new Initiator(); + i.setThreadId((int)i.getId()); + i.setHiveConf(conf); + i.init(stop, new AtomicBoolean()); + i.run(); + + //check that aborted operation didn't become committed + Assert.assertEquals("", expected, + execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()")); + } @Test public void majorCompactAfterAbort() throws Exception { String dbName = "default"; @@ -934,7 +991,7 @@ public long getHighWatermark() { /** * convenience method to execute a select stmt and dump results to log file */ - private static void execSelectAndDumpData(String selectStmt, Driver driver, String msg) + private static List execSelectAndDumpData(String selectStmt, Driver driver, String msg) throws Exception { executeStatementOnDriver(selectStmt, driver); ArrayList valuesReadFromHiveDriver = new ArrayList(); @@ -944,6 +1001,7 @@ private static void execSelectAndDumpData(String selectStmt, Driver driver, Stri for(String row : valuesReadFromHiveDriver) { LOG.debug(" rowIdx=" + rowIdx++ + ":" + row); } + return valuesReadFromHiveDriver; } /** * Execute Hive CLI statement diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 1dae7b9..d3cb7d5 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -30,6 +30,10 @@ public CompactionType type; public String runAs; public boolean tooManyAborts = false; + /** + * {@code null} means it wasn't set (e.g. in case of upgrades) + */ + public Long highestTxnId; private String fullPartitionName = null; private String fullTableName = null; @@ -80,6 +84,7 @@ public String toString() { "partName:" + partName + "," + "type:" + type + "," + "runAs:" + runAs + "," + - "tooManyAborts:" + tooManyAborts; + "tooManyAborts:" + tooManyAborts + "," + + "highestTxnId:" + highestTxnId; } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 3e0e656..9130322 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -276,7 +276,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; + "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -291,6 +291,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); + long highestTxnId = rs.getLong(7); + info.highestTxnId = rs.wasNull() ? null : highestTxnId; rc.add(info); } LOG.debug("Going to rollback"); @@ -315,17 +317,6 @@ public void markCompacted(CompactionInfo info) throws MetaException { * This will remove an entry from the queue after * it has been compacted. * - * todo: Worker will start with DB in state X (wrt this partition). - * while it's working more txns will happen, against partition it's compacting. - * then this will delete state up to X and since then. There may be new delta files created - * between compaction starting and cleaning. These will not be compacted until more - * transactions happen. So this ideally should only delete - * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run - * at READ_COMMITTED. So this means we'd want to store HWM in COMPACTION_QUEUE when - * Worker picks up the job. - * - * Also, by using this method when Worker fails, we prevent future compactions from - * running until more data is written to table or compaction is invoked explicitly * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { @@ -349,12 +340,16 @@ public void markCleaned(CompactionInfo info) throws MetaException { } // Remove entries from completed_txn_components as well, so we don't start looking there - // again. + // again but only up to the highest txn ID include in this compaction job. + //highestTxnId will be NULL in upgrade scenarios s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + "ctc_table = '" + info.tableName + "'"; if (info.partName != null) { s += " and ctc_partition = '" + info.partName + "'"; } + if(info.highestTxnId != null) { + s += " and ctc_txnid <= " + info.highestTxnId; + } LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + @@ -363,7 +358,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; + info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId); if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); rs = stmt.executeQuery(s); @@ -419,7 +414,9 @@ public void markCleaned(CompactionInfo info) throws MetaException { } /** - * Clean up aborted transactions from txns that have no components in txn_components. + * Clean up aborted transactions from txns that have no components in txn_components. The reson such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. */ public void cleanEmptyAbortedTxns() throws MetaException { try { @@ -625,7 +622,9 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { /** * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to - * compact the files, and thus treats only open transactions as invalid. + * compact the files, and thus treats only open transactions as invalid. Additionally any + * txnId > highestOpenTxnId is also invalid. This is avoid creating something like + * delta_17_120 where txnId 80, for example, is still open. * @param txns txn list from the metastore * @return a valid txn list. */ @@ -638,7 +637,36 @@ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txn if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); exceptions[i++] = txn.getId(); } - return new ValidCompactorTxnList(exceptions, minOpenTxn, highWater); + highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; + return new ValidCompactorTxnList(exceptions, -1, highWater); + } + /** + * Record the highest txn id that the {@code ci} compaction job will pay attention to. + */ + public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId + + " WHERE CQ_ID = " + ci.id); + if(updCount != 1) { + throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); + } + dbConn.commit(); + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException ex) { + setCompactionHighestTxnId(ci, highestTxnId); + } } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 91abb80..2015526 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -117,7 +117,8 @@ public static void prepDb() throws Exception { " CQ_TYPE char(1) NOT NULL," + " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + - " CQ_RUN_AS varchar(128))"); + " CQ_RUN_AS varchar(128)," + + " CQ_HIGHEST_TXN_ID bigint)"); stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java index 67631ba..648fd49 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java @@ -34,6 +34,8 @@ * records when compacting. */ public class ValidCompactorTxnList extends ValidReadTxnList { + //TODO: refactor this - minOpenTxn is not needed if we set + // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns) // The minimum open transaction id private long minOpenTxn; diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 29e6315..9bff08f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1012,7 +1012,12 @@ private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnM // releasing the locks. if (txnMgr.isTxnOpen()) { if (commit) { - txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx + if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) { + txnMgr.rollbackTxn(); + } + else { + txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx + } } else { txnMgr.rollbackTxn(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 2d051fd..a8fe57d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -179,7 +179,7 @@ private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { - if (!e.getState().equals(TxnHandler.CLEANING_RESPONSE) && + if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 002464f..045ce63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -136,6 +137,7 @@ public void run() { final ValidTxnList txns = CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); LOG.debug("ValidCompactTxnList: " + txns.writeToString()); + txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark()); final StringBuilder jobName = new StringBuilder(name); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName());