diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index 14d389464f..d95606795b 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -113,8 +113,9 @@ public void cleanupAfterMajorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest(dbName, "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); assertCleanerActions(6); } @@ -135,8 +136,9 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); assertCleanerActions(6); } @@ -155,8 +157,9 @@ public void cleanupAfterMinorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); assertCleanerActions(4); } @@ -177,8 +180,9 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); assertCleanerActions(4); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 4a1cac123c..4863c3e974 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,36 +148,13 @@ public void run() { txnHandler.markCleaned(ci); continue; } - - final boolean isMajor = ci.isMajorCompaction(); - - // Compaction doesn't work under a transaction and hence pass 0 for current txn Id - // The response will have one entry per table and hence we get only one OpenWriteIds String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - - // Determine who to run as - String runAs; if (ci.runAs == null) { - runAs = findUserToRunAs(sd.getLocation(), t); - txnHandler.setRunAs(ci.id, runAs); - } else { - runAs = ci.runAs; + ci.runAs = findUserToRunAs(sd.getLocation(), t); } - - /** - * HIVE-20942: We need a transaction. could call txnHandler directly but then we'd have to set up a hearbeat - * but using {@link HiveTxnManager} creates a Thrift connection to the HMS - * will this cause security checks that could fail? - * on the other hand we run SQL via Driver which certainly uses {@link HiveTxnManager} - final HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - * openTxn requires Context() which is set up based on query parse/plan.... - long txnid = txnMgr.openTxn(null, null); - */ - OpenTxnRequest otReq = new OpenTxnRequest(1, runAs, hostname()); + OpenTxnRequest otReq = new OpenTxnRequest(1, ci.runAs, hostname()); otReq.setAgentInfo(getName());//ThreadName long compactorTxnId = txnHandler.openTxns(otReq).getTxn_ids().get(0); - //todo: now we can update compaction_queue entry with this id - //also make sure to write to TXN_COMPONENTS so that if txn aborts, we don't delete the metadata about it from TXNS!!!! heartbeater = new CompactionHeartbeater(txnHandler, compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -192,19 +168,21 @@ public void run() { LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - //todo: this is a RDBMS call - so is setRunAs() above - could combine into 1 - txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark()); + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); + //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about + //it until after any data written by it are physically removed + txnHandler.updateCompactorState(ci, compactorTxnId); final StringBuilder jobName = new StringBuilder(workerName); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName()); final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf, - runJobAsSelf(runAs) ? runAs : t.getOwner()); + runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { - if (runJobAsSelf(runAs)) { + if (runJobAsSelf(ci.runAs)) { mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index f27a5b0ceb..b28b57779b 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -88,8 +88,8 @@ public void testFindNextToCompact() throws Exception { assertEquals(CompactionType.MINOR, ci.type); assertNull(ci.runAs); assertNull(txnHandler.findNextToCompact("fred")); - - txnHandler.setRunAs(ci.id, "bob"); + ci.runAs = "bob"; + txnHandler.updateCompactorState(ci, openTxn()); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index dc39f5ef61..adee6c885d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1190,39 +1190,14 @@ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { return compactionsByState; } public static void runWorker(HiveConf hiveConf) throws MetaException { - runCompactorThread(hiveConf, CompactorThreadType.WORKER); + TxnCommandsBaseForTests.runWorker(hiveConf); } public static void runCleaner(HiveConf hiveConf) throws MetaException { - runCompactorThread(hiveConf, CompactorThreadType.CLEANER); + TxnCommandsBaseForTests.runCleaner(hiveConf); } public static void runInitiator(HiveConf hiveConf) throws MetaException { - runCompactorThread(hiveConf, CompactorThreadType.INITIATOR); + TxnCommandsBaseForTests.runInitiator(hiveConf); } - private enum CompactorThreadType {INITIATOR, WORKER, CLEANER} - private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type) - throws MetaException { - AtomicBoolean stop = new AtomicBoolean(true); - CompactorThread t = null; - switch (type) { - case INITIATOR: - t = new Initiator(); - break; - case WORKER: - t = new Worker(); - break; - case CLEANER: - t = new Cleaner(); - break; - default: - throw new IllegalArgumentException("Unknown type: " + type); - } - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - } - /** * HIVE-12352 has details * @throws Exception diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 7535f84a5e..a8fabdb40d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -22,9 +22,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.util.BitSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -377,12 +381,12 @@ public void testCleaner2() throws Exception { runStatementOnDriver("insert into T values(2,5)");//makes delta_3_3 in T5 runStatementOnDriver("alter table T compact 'minor'"); - TestTxnCommands2.runWorker(hiveConf); + runWorker(hiveConf); /* at this point delete|delta_0000001_0000003_v0000022 are visible to everyone so cleaner removes all files shadowed by them (which is everything in this case) */ - TestTxnCommands2.runCleaner(hiveConf); + runCleaner(hiveConf); expectedList = new String[] { "/t/delete_delta_0000001_0000003_v0000022", @@ -406,4 +410,69 @@ private static void checkExpectedFiles(FileStatus[] actualList, String[] expecte } Assert.assertTrue("not found set: " + expectedSet + " unexpected set: " + unexpectedSet, expectedSet.isEmpty() && unexpectedSet.isEmpty()); } + @Test + public void testCompactionAbort() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + dropTable(new String[] {"T"}); + //note: transaction names T1, T2, etc below, are logical, the actual txnid will be different + runStatementOnDriver("create table T (a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2 + + //create failed compaction attempt so that compactor txn is aborted + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + runStatementOnDriver("alter table T compact 'minor'"); + runWorker(hiveConf); + + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", + 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0th compaction state", + TxnStore.FAILED_RESPONSE, resp.getCompacts().get(0).getState()); + GetOpenTxnsResponse openResp = txnHandler.getOpenTxns(); + Assert.assertEquals(openResp.toString(), 1, openResp.getOpen_txnsSize()); + //check that the compactor txn is aborted + Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0)); + + runCleaner(hiveConf); + runInitiator(hiveConf);//to make sure any (which is not in this case) + // 'empty aborted' TXNS metadata is removed + openResp = txnHandler.getOpenTxns(); + Assert.assertEquals(openResp.toString(), 1, openResp.getOpen_txnsSize()); + //we still have 1 aborted (compactor) txn + Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0)); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_COMPONENTS")); + //this returns 1 row since we only have 1 compaction executed + int highestCompactWriteId = TxnDbUtil.countQueryAgent(hiveConf, + "select CC_HIGHEST_WRITE_ID from COMPLETED_COMPACTIONS"); + /** + * See {@link org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler#updateCompactorState(CompactionInfo, long)} + * for notes on why CC_HIGHEST_WRITE_ID=TC_WRITEID + */ + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_COMPONENTS where TC_WRITEID=" + highestCompactWriteId)); + //now make a successful compactor run so that next Cleaner run actually cleans + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); + runStatementOnDriver("alter table T compact 'minor'"); + runWorker(hiveConf); + + resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", + 2, resp.getCompactsSize()); + //check both combinations - don't know what order the db returns them in + Assert.assertTrue("Unexpected compaction state", + (TxnStore.FAILED_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(0).getState()) + && TxnStore.CLEANING_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(1).getState())) || + (TxnStore.CLEANING_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(0).getState()) && + TxnStore.FAILED_RESPONSE.equalsIgnoreCase(resp.getCompacts().get(1).getState()))); + + //delete metadata about aborted txn from txn_components and files (if any) + runCleaner(hiveConf); + runInitiator(hiveConf);//to clean 'empty aborted' + openResp = txnHandler.getOpenTxns(); + //now the aborted compactor txn is gone + Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize()); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 52453a2ec4..287aeaecb0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -29,6 +29,10 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread; +import org.apache.hadoop.hive.ql.txn.compactor.Initiator; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -42,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class TxnCommandsBaseForTests { private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class); @@ -152,13 +157,38 @@ protected String getWarehouseDir() { protected String makeValuesClause(int[][] rows) { return TestTxnCommands2.makeValuesClause(rows); } - - void runWorker(HiveConf hiveConf) throws MetaException { - TestTxnCommands2.runWorker(hiveConf); + public static void runWorker(HiveConf hiveConf) throws MetaException { + runCompactorThread(hiveConf, CompactorThreadType.WORKER); } - - void runCleaner(HiveConf hiveConf) throws MetaException { - TestTxnCommands2.runCleaner(hiveConf); + public static void runCleaner(HiveConf hiveConf) throws MetaException { + runCompactorThread(hiveConf, CompactorThreadType.CLEANER); + } + public static void runInitiator(HiveConf hiveConf) throws MetaException { + runCompactorThread(hiveConf, CompactorThreadType.INITIATOR); + } + private enum CompactorThreadType {INITIATOR, WORKER, CLEANER} + private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type) + throws MetaException { + AtomicBoolean stop = new AtomicBoolean(true); + CompactorThread t = null; + switch (type) { + case INITIATOR: + t = new Initiator(); + break; + case WORKER: + t = new Worker(); + break; + case CLEANER: + t = new Cleaner(); + break; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); } protected List runStatementOnDriver(String stmt) throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 467851a590..46c79d680e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -59,8 +59,9 @@ public void cleanupAfterMajorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); startCleaner(); @@ -91,8 +92,9 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); startCleaner(); @@ -121,8 +123,9 @@ public void cleanupAfterMinorTableCompaction() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); startCleaner(); @@ -160,8 +163,9 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); startCleaner(); @@ -199,7 +203,8 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); startCleaner(); @@ -227,8 +232,9 @@ public void droppedTable() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "dt", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); // Drop table will clean the table entry from the compaction queue and hence cleaner have no effect ms.dropTable("default", "dt"); @@ -255,8 +261,9 @@ public void droppedPartition() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); + ci.runAs = System.getProperty("user.name"); + txnHandler.updateCompactorState(ci, openTxn()); txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); // Drop partition will clean the partition entry from the compaction queue and hence cleaner have no effect ms.dropPartition("default", "dp", Collections.singletonList("today"), true); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 7202cc6416..f3f24eb161 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -115,46 +115,6 @@ public CompactionTxnHandler() { return findPotentialCompactions(maxAborted); } } - - /** - * Sets the user to run as. This is for the case - * where the request was generated by the user and so the worker must set this value later. - * @param cq_id id of this entry in the queue - * @param user user to run the jobs as - */ - @Override - @RetrySemantics.Idempotent - public void setRunAs(long cq_id, String user) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; - LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); - if (updCnt != 1) { - LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue, " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")"); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); - } - } catch (RetryException e) { - setRunAs(cq_id, user); - } - } - /** * This will grab the next compaction request off of * the queue, and assign it to the worker. @@ -450,7 +410,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { */ s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; - if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; + if (info.highestWriteId != 0) s += " and tc_writeid <= ?";//todo: if tc_writeid is NULL, last conjunct makes the where clause false, so we never if (info.partName != null) s += " and tc_partition = ?"; pStmt = dbConn.prepareStatement(s); @@ -809,36 +769,65 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { return findColumnsWithStats(ci); } } - - /** - * Record the highest txn id that the {@code ci} compaction job will pay attention to. - * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. - */ @Override - @RetrySemantics.Idempotent - public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException { + public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException { Connection dbConn = null; Statement stmt = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId + - " WHERE CQ_ID = " + ci.id); + String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + + ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + + " WHERE CQ_ID = " + ci.id; + if(LOG.isDebugEnabled()) { + LOG.debug("About to execute: " + sqlText); + } + int updCount = stmt.executeUpdate(sqlText); + if(updCount != 1) { + throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); + } + /*We make an entry in TXN_COMPONENTS for the partition/table that the compactor is + * working on in case this txn aborts and so we need to ensure that its TXNS entry is + * not removed until Cleaner has removed any files that this txn may have written, i.e. + * make it work the same way as any other write. TC_WRITEID is set to the highest + * WriteId that this compactor run considered since there compactor doesn't allocate + * a new write id (so as not to invalidate result set caches/materialized views) but + * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to + * the level to which aborted files/data has been cleaned.*/ + sqlText = "insert into TXN_COMPONENTS(" + + "TC_TXNID, " + + "TC_DATABASE, " + + "TC_TABLE, " + + (ci.partName == null ? "" : "TC_PARTITION, ") + + "TC_WRITEID, " + + "TC_OPERATION_TYPE)" + + " VALUES(" + + compactionTxnId + "," + + quoteString(ci.dbname) + "," + + quoteString(ci.tableName) + "," + + (ci.partName == null ? "" : quoteString(ci.partName) + ",") + + /*todo: this really needs an explanation*/ + ci.highestWriteId + ", " + + quoteChar(OperationType.COMPACT.getSqlConst()) + ")"; + if(LOG.isDebugEnabled()) { + LOG.debug("About to execute: " + sqlText); + } + updCount = stmt.executeUpdate(sqlText); if(updCount != 1) { throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); } dbConn.commit(); } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")"); + checkRetryable(dbConn, e, "updateCompactorState(" + ci + "," + compactionTxnId +")"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); } } catch (RetryException ex) { - setCompactionHighestWriteId(ci, highestWriteId); + updateCompactorState(ci, compactionTxnId); } } private static class RetentionCounters { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 3928d69fec..2a6290315a 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,7 +27,6 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; -import java.sql.PreparedStatement; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -164,7 +163,7 @@ // Transaction states static final protected char TXN_ABORTED = 'a'; static final protected char TXN_OPEN = 'o'; - //todo: make these like OperationType and remove above char constatns + //todo: make these like OperationType and remove above char constants enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} public enum TxnType { @@ -197,17 +196,20 @@ public int getValue() { static private boolean doRetryOnConnPool = false; private List transactionalListeners; - - private enum OpertaionType { - SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'); + + /** + * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE + */ + enum OperationType { + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), COMPACT('c'); private final char sqlConst; - OpertaionType(char sqlConst) { + OperationType(char sqlConst) { this.sqlConst = sqlConst; } public String toString() { return Character.toString(sqlConst); } - public static OpertaionType fromString(char sqlConst) { + public static OperationType fromString(char sqlConst) { switch (sqlConst) { case 's': return SELECT; @@ -217,24 +219,29 @@ public static OpertaionType fromString(char sqlConst) { return UPDATE; case 'd': return DELETE; + case 'c': + return COMPACT; default: throw new IllegalArgumentException(quoteChar(sqlConst)); } } - public static OpertaionType fromDataOperationType(DataOperationType dop) { + public static OperationType fromDataOperationType(DataOperationType dop) { switch (dop) { case SELECT: - return OpertaionType.SELECT; + return OperationType.SELECT; case INSERT: - return OpertaionType.INSERT; + return OperationType.INSERT; case UPDATE: - return OpertaionType.UPDATE; + return OperationType.UPDATE; case DELETE: - return OpertaionType.DELETE; + return OperationType.DELETE; default: throw new IllegalArgumentException("Unexpected value: " + dop); } } + char getSqlConst() { + return sqlConst; + } } // Maximum number of open transactions that's allowed @@ -1124,7 +1131,7 @@ public void commitTxn(CommitTxnRequest rqst) rs = null; } else { conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + - quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; + quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); } @@ -1181,8 +1188,8 @@ public void commitTxn(CommitTxnRequest rqst) // part of this commitTxn() op " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all - " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + - " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); + " and (committed.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + + " OR cur.ws_operation_type=" + quoteChar(OperationType.UPDATE.sqlConst) + ")")); if (rs.next()) { //found a conflict String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; @@ -1226,8 +1233,12 @@ public void commitTxn(CommitTxnRequest rqst) // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + - "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " + - "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid; + "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid," + + " tc_database, tc_table, tc_partition, tc_writeid, '" + isUpdateDelete + + "' from TXN_COMPONENTS where tc_txnid = " + txnid + + //we only track compactor activity in TXN_COMPONENTS to handle the case where the + //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS + " AND tc_operation_type <> " + quoteChar(OperationType.COMPACT.sqlConst); LOG.debug("Going to execute insert <" + s + ">"); if ((stmt.executeUpdate(s)) < 1) { @@ -2408,7 +2419,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc rows.add(txnid + ", ?, " + (tblName == null ? "null" : "?") + ", " + (partName == null ? "null" : "?")+ "," + - quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," + + quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + (writeId == null ? "null" : writeId)); List params = new ArrayList<>(); params.add(dbName); @@ -3117,7 +3128,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep //-1 because 'null' literal doesn't work for all DBs... "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + - "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; //todo: sort by cq_id? //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) //to sort so that currently running jobs are at the end of the list (bottom of screen) //and currently running ones are in sorted by start time @@ -3201,9 +3212,9 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) shouldNeverHappen(rqst.getTxnid()); } //for RU this may be null so we should default it to 'u' which is most restrictive - OpertaionType ot = OpertaionType.UPDATE; + OperationType ot = OperationType.UPDATE; if(rqst.isSetOperationType()) { - ot = OpertaionType.fromDataOperationType(rqst.getOperationType()); + ot = OperationType.fromDataOperationType(rqst.getOperationType()); } Long writeId = rqst.getWriteid(); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index ca47a44fe3..e840758c9d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -323,13 +323,14 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old Set findPotentialCompactions(int maxAborted) throws MetaException; /** - * Sets the user to run as. This is for the case - * where the request was generated by the user and so the worker must set this value later. - * @param cq_id id of this entry in the queue - * @param user user to run the jobs as + * This updates COMPACTION_QUEUE. Set runAs username for the case where the request was + * generated by the user and so the worker must set this value later. Sets highestWriteId so that + * cleaner doesn't clean above what compactor has processed. Updates TXN_COMPONENTS so that + * we know where {@code compactionTxnId} was writing to in case it aborts. + * @param compactionTxnId - txnid in which Compactor is running */ @RetrySemantics.Idempotent - void setRunAs(long cq_id, String user) throws MetaException; + public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException; /** * This will grab the next compaction request off of @@ -433,12 +434,6 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.ReadOnly List findColumnsWithStats(CompactionInfo ci) throws MetaException; - /** - * Record the highest write id that the {@code ci} compaction job will pay attention to. - */ - @RetrySemantics.Idempotent - void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException; - /** * For any given compactable entity (partition, table if not partitioned) the history of compactions * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the