diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index b4dac4346e..d89ee4db7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -99,8 +99,8 @@ Licensed to the Apache Software Foundation (ASF) under one private volatile DbLockManager lockMgr = null; /** - * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available - * transaction id. Thus is 1 is first transaction id. + * The Metastore TXNS sequence is initialized to 1. + * Thus is 1 is first transaction id. */ private volatile long txnId = 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..23512e252e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -151,7 +151,7 @@ public void run() { recoverFailedCompactions(true); // Clean anything from the txns table that has no components left in txn_components. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 7c8903fbae..7069dae393 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -512,9 +512,14 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() // Check that we are cleaning up the empty aborted transactions GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); + // Create one aborted for low water mark + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); + assertEquals(3, txnList.getOpen_txnsSize()); + txnHandler.setOpenTxnTimeOutMillis(1000); rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); rqst.setPartitionname("bar"); @@ -529,9 +534,13 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() txnHandler.markCleaned(ci); txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); + // The open txn will became the low water mark + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); + txnHandler.setOpenTxnTimeOutMillis(1000); } @Test diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 3916e88a9d..868da0c7a0 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -193,30 +193,40 @@ public void testAbortTxn() throws Exception { boolean gotException = false; try { txnHandler.abortTxn(new AbortTxnRequest(2)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; - //if this wasn't an empty txn, we'd get a better msg - Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); + // this is the last committed, so it is still in the txns table + Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); gotException = false; txnHandler.commitTxn(new CommitTxnRequest(3)); try { txnHandler.abortTxn(new AbortTxnRequest(3)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; //txn 3 is not empty txn, so we get a better msg Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + txnHandler.setOpenTxnTimeOutMillis(1000); gotException = false; try { - txnHandler.abortTxn(new AbortTxnRequest(4)); + txnHandler.abortTxn(new AbortTxnRequest(2)); + } catch(NoSuchTxnException ex) { + gotException = true; + // now the second transaction is cleared and since it was empty, we do not recognize it anymore + Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); } - catch(NoSuchTxnException ex) { + Assert.assertTrue(gotException); + + gotException = false; + try { + txnHandler.abortTxn(new AbortTxnRequest(4)); + } catch(NoSuchTxnException ex) { gotException = true; Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage()); } @@ -1672,9 +1682,11 @@ private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPoli @Test public void testReplOpenTxn() throws Exception { int numTxn = 50000; - String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); + txnHandler.setOpenTxnTimeOutMillis(30000); List txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*"); + txnHandler.setOpenTxnTimeOutMillis(1000); assert(txnList.size() == numTxn); txnHandler.abortTxns(new AbortTxnsRequest(txnList)); } @@ -1682,7 +1694,7 @@ public void testReplOpenTxn() throws Exception { @Test public void testReplAllocWriteId() throws Exception { int numTxn = 2; - String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); List srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1) .boxed().collect(Collectors.toList()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 2c13e8dd03..48bf8529fa 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -89,6 +89,7 @@ protected HiveConf hiveConf; protected Driver d; + private TxnStore txnHandler; protected enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart", "p"), @@ -151,6 +152,7 @@ void setUpWithTableProperties(String tableProperties) throws Exception { TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); File f = new File(TEST_WAREHOUSE_DIR); if (f.exists()) { FileUtil.fullyDelete(f); @@ -322,7 +324,6 @@ public void testOriginalFileReaderWhenNonAcidConvertedToAcid() throws Exception // 3. Perform a major compaction. runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); @@ -930,7 +931,6 @@ public void updateDeletePartitioned() throws Exception { int[][] tableData = {{1,2},{3,4},{5,6}}; runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData)); runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData)); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); @@ -953,7 +953,6 @@ public void testEmptyInTblproperties() throws Exception { runStatementOnDriver("update t1" + " set b = -2 where a = 1"); runStatementOnDriver("alter table t1 " + " compact 'MAJOR'"); runWorker(hiveConf); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); @@ -1029,7 +1028,6 @@ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tb hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); AtomicBoolean stop = new AtomicBoolean(true); //create failed compactions for(int i = 0; i < numFailedCompactions; i++) { @@ -1217,7 +1215,6 @@ private void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblPrope runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); //run Worker to execute compaction - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); runWorker(hiveConf); @@ -1277,7 +1274,6 @@ public void testFailHeartbeater() throws Exception { public void testOpenTxnsCounter() throws Exception { hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3); hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService(); @@ -1319,7 +1315,6 @@ public void testCompactWithDelete() throws Exception { runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2"); runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'"); runWorker(hiveConf); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); @@ -1380,6 +1375,8 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp // now compact and see if compaction still preserves the data correctness runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'"); runWorker(hiveConf); + // create a low water mark aborted transaction and clean the older ones + createAbortLowWaterMark(); runCleaner(hiveConf); // Cleaner would remove the obsolete files. // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned. @@ -1403,6 +1400,14 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp Assert.assertEquals(Arrays.asList(expectedResult), rs); } + protected void createAbortLowWaterMark() throws Exception{ + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("select * from " + Table.ACIDTBL); + // wait for metastore.txn.opentxn.timeout + Thread.sleep(1000); + runInitiator(hiveConf); + } + @Test public void testETLSplitStrategyForACID() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL"); @@ -2007,7 +2012,6 @@ public void testSchemaEvolutionCompaction() throws Exception { Assert.assertEquals("2\t2000\taa", res.get(1)); // Compact - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); CompactionRequest compactionRequest = new CompactionRequest("default", tblName, CompactionType.MAJOR); compactionRequest.setPartitionname("part=aa"); @@ -2054,7 +2058,6 @@ public void testCleanerForTxnToWriteId() throws Exception { Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); @@ -2096,7 +2099,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); @@ -2109,7 +2112,7 @@ public void testCleanerForTxnToWriteId() throws Exception { txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); @@ -2234,7 +2237,6 @@ public void testDeleteEventsCompaction() throws Exception { runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3)); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR)); runWorker(hiveConf); runCleaner(hiveConf); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 51b0fa336f..5b8c6701e1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -479,11 +479,15 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -500,11 +504,14 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -512,4 +519,8 @@ private void assertTableIsEmpty(String table) throws Exception { Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table)); } + private void assertOneTxn() throws Exception { + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1, + TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index 6525ffc00a..eac2c6307f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -465,8 +465,11 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + Thread.sleep(1000); + runInitiator(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); // Initiate a major compaction request on the table. runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); @@ -475,13 +478,16 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception runWorker(hiveConf); verifyDirAndResult(2, true); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + Thread.sleep(1000); + runInitiator(hiveConf); // Run Cleaner. runCleaner(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); verifyDirAndResult(0, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 1435269ed3..3ff68a3c7e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -57,6 +59,8 @@ public TestName testName = new TestName(); protected HiveConf hiveConf; Driver d; + private TxnStore txnHandler; + public enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), @@ -75,6 +79,10 @@ public String toString() { } } + public TxnStore getTxnStore() { + return txnHandler; + } + @Before public void setUp() throws Exception { setUpInternal(); @@ -106,6 +114,7 @@ void setUpInternal() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); TxnDbUtil.prepDb(hiveConf); File f = new File(getWarehouseDir()); if (f.exists()) { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java new file mode 100644 index 0000000000..b435e79c3c --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries. + */ +public abstract class DbTxnManagerEndToEndTestBase { + + protected static HiveConf conf = new HiveConf(Driver.class); + protected HiveTxnManager txnMgr; + protected Context ctx; + protected Driver driver, driver2; + protected TxnStore txnHandler; + + public DbTxnManagerEndToEndTestBase() { + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + TxnDbUtil.setConfValues(conf); + } + @BeforeClass + public static void setUpDB() throws Exception{ + TxnDbUtil.prepDb(conf); + } + + @Before + public void setUp() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); + SessionState.start(conf); + ctx = new Context(conf); + driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); + TxnDbUtil.cleanDb(conf); + SessionState ss = SessionState.get(); + ss.initTxnMgr(conf); + txnMgr = ss.getTxnMgr(); + Assert.assertTrue(txnMgr instanceof DbTxnManager); + txnHandler = TxnUtils.getTxnStore(conf); + + } + @After + public void tearDown() throws Exception { + driver.close(); + driver2.close(); + if (txnMgr != null) { + txnMgr.closeTxnManager(); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java index a085e9ff6f..12cbc2a417 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java @@ -62,8 +62,7 @@ public static void setupDb() throws Exception { MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY)); // Start the docker container and create the hive user rule.before(); - rule.createUser(); - // We do not run the install script, it will be called anyway before every test in prepDb + rule.install(); } @AfterClass diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index f90396b2a3..ed37d9943c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -32,22 +32,14 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.TestTxnCommands2; -import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ComparisonFailure; import org.junit.Rule; import org.junit.Test; @@ -82,47 +74,7 @@ * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with * each thread. */ -public class TestDbTxnManager2 { - protected static HiveConf conf = new HiveConf(Driver.class); - - private HiveTxnManager txnMgr; - private Context ctx; - private Driver driver; - private TxnStore txnHandler; - - public TestDbTxnManager2() { - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - TxnDbUtil.setConfValues(conf); - } - - @BeforeClass - public static void setUpDB() throws Exception{ - TxnDbUtil.prepDb(conf); - } - - @Before - public void setUp() throws Exception { - conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); - SessionState.start(conf); - ctx = new Context(conf); - driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); - TxnDbUtil.cleanDb(conf); - SessionState ss = SessionState.get(); - ss.initTxnMgr(conf); - txnMgr = ss.getTxnMgr(); - Assert.assertTrue(txnMgr instanceof DbTxnManager); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - driver.close(); - if (txnMgr != null) { - txnMgr.closeTxnManager(); - } - } +public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ /** * HIVE-16688 @@ -1237,6 +1189,13 @@ public void testWriteSetTracking4() throws Exception { locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 0, locks.size()); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + // Now we can clean the write_set houseKeeper.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\"")); } @@ -1311,6 +1270,13 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); txnMgr.commitTxn(); + /* + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + // Now we can clean the write_set MetastoreTaskThread writeSetService = new AcidWriteSetService(); writeSetService.setConf(conf); writeSetService.run(); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java new file mode 100644 index 0000000000..6c300695af --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Assert; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to check the ACID properties and isolation level requirements. + */ +public class TestDbTxnManagerIsolationProperties extends DbTxnManagerEndToEndTestBase { + + @Test + public void basicOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to read and do not run it + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("insert into gap values(1,2)"); + long second = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", second > first); + driver2.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr); + driver.run(); + + FetchTask fetchTask = driver.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + @Test + public void gapOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to delete later + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + driver.run(); + // The second one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Make sure, that the time window is great enough to consider the gap open + txnHandler.setOpenTxnTimeOutMillis(30000); + // Create a gap + deleteTransactionId(first); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long third = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", third > first); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn()); + txnHandler.setOpenTxnTimeOutMillis(1000); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + long forth = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, forth); + driver.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + + + + @Test + public void multipleGapOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create some TXN to delete later + OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(10, "user", "local")); + openTxns.getTxn_ids().stream().forEach(txnId -> { + silentCommitTxn(new CommitTxnRequest(txnId)); + }); + + long first = openTxns.getTxn_ids().get(0); + long last = openTxns.getTxn_ids().get(9); + // The next one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Make sure, that the time window is great enough to consider the gap open + txnHandler.setOpenTxnTimeOutMillis(30000); + // Create a gap + deleteTransactionId(first, last); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long next = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", next > last); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn()); + txnHandler.setOpenTxnTimeOutMillis(1000); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + next = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, next); + driver.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + + @Test + public void gapOpenTxnsDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to delete later + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + driver.run(); + //The second one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Now we wait for the time window to move forward + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + // Create a gap + deleteTransactionId(first); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long third = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", third > first); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertNull("Expect to see no gap", validTxns.getMinOpenTxn()); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + // This should never happen + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + long forth = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, forth); + driver.run(); + + // Now we run our read query it should unfortunately see the results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("Dirty read!", 1, res.size()); + + } + private void silentCommitTxn(CommitTxnRequest commitTxnRequest) { + try { + txnHandler.commitTxn(commitTxnRequest); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private void deleteTransactionId(long txnId) throws SQLException { + deleteTransactionId(txnId, txnId); + } + + private void deleteTransactionId(long minTxnId, long maxTxnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + private void setBackSequence(long txnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("ALTER TABLE TXNS ALTER TXN_ID RESTART WITH " + txnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + return SessionState.get().setTxnMgr(txnMgr); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 1151466f8c..e4ff14a140 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -218,18 +218,19 @@ public void cleanEmptyAbortedTxns() throws Exception { req.setTxnid(txnid); LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - + txnHandler.setOpenTxnTimeOutMillis(30000); conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); - + txnHandler.setOpenTxnTimeOutMillis(1); startInitiator(); openTxns = txnHandler.getOpenTxns(); - Assert.assertEquals(1, openTxns.getOpen_txnsSize()); + // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); } @Test diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index a874121e12..38874697db 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1199,6 +1199,8 @@ public static ConfVars getMetaConf(String name) { "class is used to store and retrieve transactions and locks"), TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS, "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS, + "Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"), URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "", "If set, fully qualified class name of resolver for hive metastore uri's"), USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 49b737ecf9..08ef5e912e 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -99,7 +99,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -110,7 +110,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -263,7 +263,7 @@ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaEx * @throws SQLException */ public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters) - throws SQLException { + throws SQLException { PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql)); if ((parameters == null) || parameters.isEmpty()) { return pst; @@ -292,4 +292,36 @@ public String addEscapeCharacters(String s) { return s; } + + /** + * Creates a lock statement for open/commit transaction based on the dbProduct in shared read / exclusive mode. + * @param shared shared or exclusive lock + * @return sql statement to execute + * @throws MetaException if the dbProduct is unknown + */ + public String createTxnLockStatement(boolean shared) throws MetaException{ + String txnLockTable = "TXN_LOCK_TBL"; + switch (dbProduct) { + case MYSQL: + // For Mysql we do not use lock table statement for two reasons + // It is not released automatically on commit/rollback + // It requires to lock every table that will be used by the statement + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "SELECT \"TXN_LOCK\" FROM \"" + txnLockTable + "\" " + (shared ? "LOCK IN SHARE MODE" : "FOR UPDATE"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 2344c2d5f6..a1bc10955a 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -223,7 +223,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { stmt = dbConn.createStatement(); String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = " - + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")" + + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")" + " WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); @@ -474,7 +474,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { } /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by - * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). + * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). */ @Override @RetrySemantics.SafeToRetry @@ -492,9 +492,9 @@ public void cleanTxnToWriteIdTable() throws MetaException { // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. // If there are no txns which are currently open or aborted in the system, then current value of - // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. + // max(TXNS.txn_id) could be min_uncommitted_txnid. String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + - "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " + + "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " + "UNION " + "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + "UNION " + @@ -504,7 +504,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); + throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } long minUncommitedTxnid = rs.getLong(1); @@ -533,25 +533,36 @@ public void cleanTxnToWriteIdTable() throws MetaException { } /** - * Clean up aborted transactions from txns that have no components in txn_components. The reason such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted / committed transactions from txns that have no components in txn_components. + * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally. + * The reason such aborted txns exist can be that now work was done in this txn + * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work) + * or due to {@link #markCleaned(CompactionInfo)} being called. */ @Override @RetrySemantics.SafeToRetry - public void cleanEmptyAbortedTxns() throws MetaException { + public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { + LOG.info("Start to clean empty aborted or committed TXNS"); try { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - //Aborted is a terminal state, so nothing about the txn can change + //Aborted and committed are terminal states, so nothing about the txn can change //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + /** + * Only delete aborted / committed transaction in a way that guarantees two things: + * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window + * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + + " \"TXN_ID\" < " + lowWaterMark; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -568,16 +579,15 @@ public void cleanEmptyAbortedTxns() throws MetaException { // Delete from TXNS. prefix.append("DELETE FROM \"TXNS\" WHERE "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); int rc = stmt.executeUpdate(query); - LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); + LOG.debug("Removed " + rc + " empty Aborted and Committed transactions from TXNS"); } - LOG.info("Aborted transactions removed from TXNS: " + txnids); + LOG.info("Aborted and committed transactions removed from TXNS: " + txnids); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -591,7 +601,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { close(rs, stmt, dbConn); } } catch (RetryException e) { - cleanEmptyAbortedTxns(); + cleanEmptyAbortedAndCommittedTxns(); } } @@ -1147,12 +1157,12 @@ public long findMinOpenTxnIdForCleaner() throws MetaException{ + quoteChar(READY_FOR_CLEANING) + ") \"RES\""; } else { - query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\""; } LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); + throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } return rs.getLong(1); } catch (SQLException e) { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 97a083399a..7a90316f3c 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -218,63 +218,107 @@ private static String ensurePathEndsInSlash(String path) { public static void cleanDb(Configuration conf) throws Exception { LOG.info("Cleaning transactional tables"); - int retryCount = 0; - while(++retryCount <= 3) { - boolean success = true; - Connection conn = null; - Statement stmt = null; + + boolean success = true; + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(conf); + stmt = conn.createStatement(); + if (!checkDbPrepared(stmt)){ + // Nothing to clean + return; + } + + // We want to try these, whether they succeed or fail. + success &= truncateTable(conn, stmt, "TXN_COMPONENTS"); + success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS"); + success &= truncateTable(conn, stmt, "TXNS"); + success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID"); + success &= truncateTable(conn, stmt, "NEXT_WRITE_ID"); + success &= truncateTable(conn, stmt, "HIVE_LOCKS"); + success &= truncateTable(conn, stmt, "NEXT_LOCK_ID"); + success &= truncateTable(conn, stmt, "COMPACTION_QUEUE"); + success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID"); + success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS"); + success &= truncateTable(conn, stmt, "AUX_TABLE"); + success &= truncateTable(conn, stmt, "WRITE_SET"); + success &= truncateTable(conn, stmt, "REPL_TXN_MAP"); + success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS"); try { - conn = getConnection(conf); - stmt = conn.createStatement(); - - // We want to try these, whether they succeed or fail. - success &= truncateTable(conn, stmt, "TXN_COMPONENTS"); - success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS"); - success &= truncateTable(conn, stmt, "TXNS"); - success &= truncateTable(conn, stmt, "NEXT_TXN_ID"); - success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID"); - success &= truncateTable(conn, stmt, "NEXT_WRITE_ID"); - success &= truncateTable(conn, stmt, "HIVE_LOCKS"); - success &= truncateTable(conn, stmt, "NEXT_LOCK_ID"); - success &= truncateTable(conn, stmt, "COMPACTION_QUEUE"); - success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID"); - success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS"); - success &= truncateTable(conn, stmt, "AUX_TABLE"); - success &= truncateTable(conn, stmt, "WRITE_SET"); - success &= truncateTable(conn, stmt, "REPL_TXN_MAP"); - success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS"); - try { - stmt.executeUpdate("INSERT INTO \"NEXT_TXN_ID\" VALUES(1)"); - stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)"); - stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)"); - } catch (SQLException e) { - if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) { - LOG.error("Error initializing NEXT_TXN_ID"); - success = false; - } + resetTxnSequence(conn, stmt); + stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)"); + stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)"); + } catch (SQLException e) { + if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) { + LOG.error("Error initializing sequence values", e); + success = false; } - /* - * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other - * table which are not txn related to generate primary key. So if these tables are dropped - * and other tables are not dropped, then it will create key duplicate error while inserting - * to other table. - */ - } finally { - closeResources(conn, stmt, null); - } - if(success) { - return; } + /* + * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other + * table which are not txn related to generate primary key. So if these tables are dropped + * and other tables are not dropped, then it will create key duplicate error while inserting + * to other table. + */ + } finally { + closeResources(conn, stmt, null); + } + if(success) { + return; } throw new RuntimeException("Failed to clean up txn tables"); } + private static void resetTxnSequence(Connection conn, Statement stmt) throws SQLException, MetaException{ + String dbProduct = conn.getMetaData().getDatabaseProductName(); + DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct); + switch (databaseProduct) { + + case DERBY: + stmt.execute("ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH 1"); + stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case MYSQL: + stmt.execute("ALTER TABLE \"TXNS\" AUTO_INCREMENT=1"); + stmt.execute("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES'"); + stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case POSTGRES: + stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + stmt.execute("ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART"); + break; + case ORACLE: + stmt.execute("ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH 1)"); + stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '_', '_')"); + break; + case SQLSERVER: + stmt.execute("DBCC CHECKIDENT ('txns', RESEED, 0)"); + stmt.execute("SET IDENTITY_INSERT TXNS ON"); + stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\"," + + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")" + + " VALUES(0, 'c', 0, 0, '', '')"); + break; + case OTHER: + default: + break; + } + } + private static boolean truncateTable(Connection conn, Statement stmt, String name) { try { // We can not use actual truncate due to some foreign keys, but we don't expect much data during tests String dbProduct = conn.getMetaData().getDatabaseProductName(); DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct); - if (databaseProduct == POSTGRES) { + if (databaseProduct == POSTGRES || databaseProduct == MYSQL) { stmt.execute("DELETE FROM \"" + name + "\""); } else { stmt.execute("DELETE FROM " + name); @@ -503,4 +547,28 @@ static void executeQueriesInBatchNoCount(DatabaseProduct dbProduct, Statement st } return affectedRowsByQuery; } + + /** + + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements. + + * @param dbProduct DBMS type + + * @return true if supports + + * @throws MetaException + + */ + public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException { + switch (dbProduct) { + case DERBY: + case SQLSERVER: + // The getGeneratedKeys is not supported for multi line insert + return false; + case ORACLE: + case MYSQL: + case POSTGRES: + return true; + case OTHER: + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index fe39b0b36e..815c405b36 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -55,6 +55,7 @@ import javax.sql.DataSource; +import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @@ -73,7 +74,59 @@ import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; import org.apache.hadoop.hive.metastore.LockTypeComparator; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; @@ -174,8 +227,12 @@ static final protected char MINOR_TYPE = 'i'; // Transaction states - static final protected char TXN_ABORTED = 'a'; - static final protected char TXN_OPEN = 'o'; + protected static final char TXN_ABORTED = 'a'; + protected static final char TXN_OPEN = 'o'; + protected static final char TXN_COMMITTED = 'c'; + + private static final char TXN_TMP = '_'; + //todo: make these like OperationType and remove above char constants enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} @@ -189,6 +246,8 @@ private static DataSource connPool; private static DataSource connPoolMutex; + private static final String MANUAL_RETRY = "ManualRetry"; + // Query definitions private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + @@ -204,11 +263,14 @@ private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + " VALUES (%s, ?, ?, ?, ?, %s)"; + private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " + + "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(?,%s,%s,?,?,?)"; private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " + - "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " + - "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; + "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " + + "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + - "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; + "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; private List transactionalListeners; @@ -273,9 +335,11 @@ char getSqlConst() { protected Configuration conf; private static DatabaseProduct dbProduct; private static SQLGenerator sqlGenerator; + private static long openTxnTimeOutMillis; // (End user) Transaction timeout, in milliseconds. private long timeout; + // Timeout for opening a transaction private int maxBatchSize; private String identifierQuoteString; // quotes to use for quoting tables, where necessary @@ -357,6 +421,8 @@ public void setConf(Configuration conf){ maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); + openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( TransactionalMetaStoreEventListener.class, @@ -377,59 +443,74 @@ public Configuration getConf() { @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // We need to figure out the HighWaterMark and the list of open transactions. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** - * This method can run at READ_COMMITTED as long as long as - * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. - * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with - * adding corresponding entries into TXNS. The reason is that any txnid below HWM - * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + /* + * This method need guarantees from + * {@link #openTxns(OpenTxnRequest)} and {@link #commitTxn(CommitTxnRequest)}. + * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark + * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark. + * Every transaction that is not found between these will be considered as open, since it may appear later. + * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and + * commitTxn must ensure, that no committed transaction will be removed before the time period expires. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); List txnInfos = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + - "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm; + + String s = + "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" + + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* + * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator + * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table. + * If there is a pending openTxns, that is already acquired it's sequenceId but not yet committed the insert + * into the TXNS table, will have either a lower txn_id than HWM and will be listed in the openTxn list, + * or will have a higher txn_id and don't effect this getOpenTxns() call. + */ + long hwm = 0; + long openTxnLowBoundary = 0; + while (rs.next()) { + long txnId = rs.getLong(1); + long age = rs.getLong(7); + hwm = txnId; + if (age < getOpenTxnTimeOutMillis()) { + // We will consider every gap as an open transaction from the previous txnId + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + openTxnLowBoundary++; + } + } else { + openTxnLowBoundary = txnId; + } char c = rs.getString(2).charAt(0); TxnState state; switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; + case TXN_COMMITTED: + // This is only here, to avoid adding this txnId as possible gap + continue; - case TXN_OPEN: - state = TxnState.OPEN; - break; + case TXN_ABORTED: + state = TxnState.ABORTED; + break; - default: - throw new MetaException("Unexpected transaction state " + c + - " found in txns table"); + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); txnInfo.setStartedTime(rs.getLong(5)); txnInfo.setLastHeartbeatTime(rs.getLong(6)); txnInfos.add(txnInfo); @@ -441,8 +522,8 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "getOpenTxnsInfo"); - throw new MetaException("Unable to select from transaction database: " + getMessage(e) - + StringUtils.stringifyException(e)); + throw new MetaException( + "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } @@ -455,42 +536,47 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // We need to figure out the current transaction number and the list of open transactions. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** + /* * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); + List openList = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\""; + String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " + + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" + + " FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + long hwm = 0; + long openTxnLowBoundary = 0; long minOpenTxn = Long.MAX_VALUE; BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); + long age = rs.getLong(4); + hwm = txnId; + if (age < getOpenTxnTimeOutMillis()) { + // We will consider every gap as an open transaction from the previous txnId + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + openList.add(openTxnLowBoundary); + minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); + openTxnLowBoundary++; + } + } else { + openTxnLowBoundary = txnId; + } char txnState = rs.getString(2).charAt(0); + if (txnState == TXN_COMMITTED) { + continue; + } if (txnState == TXN_OPEN) { minOpenTxn = Math.min(minOpenTxn, txnId); } @@ -506,7 +592,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { dbConn.rollback(); ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if(minOpenTxn < Long.MAX_VALUE) { + if (minOpenTxn < Long.MAX_VALUE) { otr.setMin_open_txn(minOpenTxn); } return otr; @@ -554,13 +640,20 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - lockInternal(); - /** + /* * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running. - * SELECT ... FOR UPDATE is used to prevent - * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. + * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause + * performance degradation with high transaction load. + * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every + * transaction missing from the TXNS table in that period open, and prevent opening transaction outside + * the period. + * Example: At t[0] there is one open transaction in the TXNS table, T[1]. + * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. + * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. + * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s + * open transaction list. T[1] because it is presented as open in TXNS, + * T[2] because it is a missing sequence. * * In the current design, there can be several metastore instances running in a given Warehouse. * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, @@ -573,35 +666,67 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { * set could support a write-through cache for added performance. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - // Make sure the user has not requested an insane amount of txns. - int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); - if (numTxns > maxTxns) numTxns = maxTxns; - stmt = dbConn.createStatement(); - List txnIds = openTxns(dbConn, stmt, rqst); + /* + * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. + * This is achieved by requesting a shared table lock here, and an exclusive one at commit. + * Since table locks are working in Derby, we don't need the lockInternal call here. + * Example: Suppose we have two transactions with update like x = x+1. + * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] + * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], + * and we will have a lost update problem + */ + acquireTxnLock(stmt, true); + // Measure the time from acquiring the sequence value, till committing in the TXNS table + StopWatch generateTransactionWatch = new StopWatch(); + generateTransactionWatch.start(); + + List txnIds = openTxns(dbConn, rqst); LOG.debug("Going to commit"); dbConn.commit(); + generateTransactionWatch.stop(); + long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS); + TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; + if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) { + /* + * The commit was too slow, we can not allow this to continue (except if it is read only, + * since that can not cause dirty reads). + * When calculating the snapshot for a given transaction, we look back for possible open transactions + * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period. + * We can not allow a write transaction, that was slower than TXN_OPENTXN_TIMEOUT to continue, + * because there can be other transactions running, that didn't considered this transactionId open, + * this could cause dirty reads. + */ + LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting transactionIds: {}", elapsedMillis, txnIds); + deleteInvalidOpenTransactions(dbConn, txnIds); + /* + * We do not throw RetryException directly, to not circumvent the max retry limit + */ + throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY); + } return new OpenTxnsResponse(txnIds); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); - unlockInternal(); } } catch (RetryException e) { return openTxns(rqst); } } - private List openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) + private List openTxns(Connection dbConn, OpenTxnRequest rqst) throws SQLException, MetaException { int numTxns = rqst.getNum_txns(); - ResultSet rs = null; + // Make sure the user has not requested an insane amount of txns. + int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) { + numTxns = maxTxns; + } List insertPreparedStmts = null; TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; try { @@ -620,51 +745,54 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } - String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - List txnIds = new ArrayList<>(numTxns); - - List rows = new ArrayList<>(); - List params = new ArrayList<>(); - params.add(rqst.getUser()); - params.add(rqst.getHostname()); - List> paramsList = new ArrayList<>(numTxns); - - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + /* + * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert. + * But it is supported in every used dbms for single line insert, even if the metadata says otherwise. + * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field, + * then the keys are selected beck with that batchid. + */ + boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct); + genKeySupport = genKeySupport || (numTxns == 1); + + String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct), + TxnDbUtil.getEpochFn(dbProduct)); + LOG.debug("Going to execute insert <" + insertQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) { + String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP); + if (numTxns == 1) { + ps.setString(1, state); + ps.setString(2, rqst.getUser()); + ps.setString(3, rqst.getHostname()); + ps.setInt(4, txnType.getValue()); + txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, false)); + } else { + for (int i = 0; i < numTxns; ++i) { + ps.setString(1, state); + ps.setString(2, rqst.getUser()); + ps.setString(3, rqst.getHostname()); + ps.setInt(4, txnType.getValue()); + ps.addBatch(); + + if ((i + 1) % maxBatchSize == 0) { + txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true)); + } + } + if (numTxns % maxBatchSize != 0) { + txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true)); + } + } } + assert txnIds.size() == numTxns; + if (rqst.isSetReplPolicy()) { - List rowsRepl = new ArrayList<>(); - for (PreparedStatement pst : insertPreparedStmts) { - pst.close(); - } - insertPreparedStmts.clear(); - params.clear(); - paramsList.clear(); + List rowsRepl = new ArrayList<>(numTxns); + List params = new ArrayList<>(1); + List> paramsList = new ArrayList<>(numTxns); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { - rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); paramsList.add(params); } @@ -687,10 +815,125 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { pst.close(); } } - close(rs); } } + private List executeTxnInsertBatchAndExtractGeneratedKeys(Connection dbConn, boolean genKeySupport, + PreparedStatement ps, boolean batch) throws SQLException { + List txnIds = new ArrayList<>(); + if (batch) { + ps.executeBatch(); + } else { + // For slight performance advantage we do not use the executeBatch, when we only have one row + ps.execute(); + } + if (genKeySupport) { + try (ResultSet generatedKeys = ps.getGeneratedKeys()) { + while (generatedKeys.next()) { + txnIds.add(generatedKeys.getLong(1)); + } + } + } else { + try (PreparedStatement pstmt = + dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) { + pstmt.setString(1, Character.toString(TXN_TMP)); + try (ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + txnIds.add(rs.getLong(1)); + } + } + } + try (PreparedStatement pstmt = dbConn + .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) { + pstmt.setString(1, Character.toString(TXN_OPEN)); + pstmt.setString(2, Character.toString(TXN_TMP)); + pstmt.executeUpdate(); + } + } + return txnIds; + } + + private void deleteInvalidOpenTransactions(Connection dbConn, List txnIds) throws MetaException { + if (txnIds.size() == 0) { + return; + } + try { + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + prefix.append("DELETE FROM \"TXNS\" WHERE "); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false); + for (String s : queries) { + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + } + } catch (RetryException ex) { + deleteInvalidOpenTransactions(dbConn, txnIds); + } + } + + @Override + public long getOpenTxnTimeOutMillis() { + return openTxnTimeOutMillis; + } + + @Override + public void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis) { + this.openTxnTimeOutMillis = openTxnTimeOutMillis; + } + + protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException { + long maxTxnId; + String s = + "SELECT MAX(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - " + + openTxnTimeOutMillis + ")"; + try (Statement stmt = dbConn.createStatement()) { + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) { + maxTxnIdRs.next(); + maxTxnId = maxTxnIdRs.getLong(1); + if (maxTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + } + return maxTxnId; + } + + private long getHighWaterMark(Statement stmt) throws SQLException, MetaException { + String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\""; + LOG.debug("Going to execute query <" + s + ">"); + long maxOpenTxnId; + try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) { + maxOpenTxnIdRs.next(); + maxOpenTxnId = maxOpenTxnIdRs.getLong(1); + if (maxOpenTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + return maxOpenTxnId; + } + private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Connection dbConn) throws SQLException { PreparedStatement pst = null; @@ -716,7 +959,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); return targetTxnIdList; - } catch (SQLException e) { + } catch (SQLException e) { LOG.warn("failed to get target txn ids " + e.getMessage()); throw e; } finally { @@ -1159,7 +1402,7 @@ public void commitTxn(CommitTxnRequest rqst) * even if it includes all of its columns * * First insert into write_set using a temporary commitID, which will be updated in a separate call, - * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}. + * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}. * This should decrease the scope of the S4U lock on the next_txn_id table. */ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); @@ -1173,13 +1416,8 @@ public void commitTxn(CommitTxnRequest rqst) * at the same time and no new txns start until all 3 commit. * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. */ - try (ResultSet commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""))) { - if (!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); - } - commitId = commitIdRs.getLong(1); - } - + acquireTxnLock(stmt, false); + commitId = getHighWaterMark(stmt); /** * see if there are any overlapping txns that wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, @@ -1211,9 +1449,8 @@ public void commitTxn(CommitTxnRequest rqst) throw new TxnAbortedException(msg); } } - } - else { - /** + } else { + /* * current txn didn't update/delete anything (may have inserted), so just proceed with commit * * We only care about commit id for write txns, so for RO (when supported) txns we don't @@ -1223,6 +1460,7 @@ public void commitTxn(CommitTxnRequest rqst) * If RO < W, then there is no reads-from relationship. * In replication flow we don't expect any write write conflict as it should have been handled at source. */ + assert true; } if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { @@ -1253,7 +1491,7 @@ public void commitTxn(CommitTxnRequest rqst) } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId); + updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId); if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(rqst, stmt); } @@ -1272,8 +1510,7 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(null, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -1338,7 +1575,8 @@ private void moveTxnComponentsToCompleted(Statement stmt, long txnid, char isUpd } } - private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException { + private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, + Long commitId, long tempId) throws SQLException { List queryBatch = new ArrayList<>(5); // update write_set with real commitId if (commitId != null) { @@ -1350,7 +1588,8 @@ private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnTyp queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid); } queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); - queryBatch.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid); + // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate + queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid); queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); // execute all in one batch @@ -1431,7 +1670,9 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx if (numAbortedWrites > 0) { // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted. - List txnIds = openTxns(dbConn, stmt, + // We don't use the txnLock, all of these transactions will be aborted in this one rdbm transaction + // So they will not effect the commitTxn in any way + List txnIds = openTxns(dbConn, new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName())); assert(numAbortedWrites == txnIds.size()); @@ -1491,7 +1732,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } unlockInternal(); @@ -1533,7 +1774,9 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; List params = Arrays.asList(names[0], names[1]); - String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; + String s = + "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND " + + "\"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = "+ writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -2001,46 +2244,37 @@ public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) @Override @RetrySemantics.SafeToRetry - public void performWriteSetGC() { + public void performWriteSetGC() throws MetaException { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""); - if(!rs.next()) { - throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); - } - long highestAllocatedTxnId = rs.getLong(1); - close(rs); + + long minOpenTxn; rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); - if(!rs.next()) { + if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } - long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark - long lowestOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - //if here then there are no Open txns and highestAllocatedTxnId must be - //resolved (i.e. committed or aborted), either way - //there are no open txns with id <= highestAllocatedTxnId - //the +1 is there because "delete ..." below has < (which is correct for the case when - //there is an open txn - //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with - //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. - commitHighWaterMark = highestAllocatedTxnId + 1; - } - else { - commitHighWaterMark = lowestOpenTxnId; + minOpenTxn = rs.getLong(1); + if (rs.wasNull()) { + minOpenTxn = Long.MAX_VALUE; } + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + /** + * We try to find the highest transactionId below everything was committed or aborted. + * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, + * because it is guaranteed there won't be open transactions below that. + */ + long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1); + LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark); int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); - } - finally { + } finally { close(rs, stmt, dbConn); } } @@ -4153,7 +4387,7 @@ public int compare(LockInfo info1, LockInfo info2) { private void checkQFileTestHack(){ boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || - MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); + MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); // Set up the transaction/locking db in the derby metastore @@ -4505,7 +4739,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) } /** - * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot: + * Returns the state of the transaction if it's able to determine it. Some cases where it cannot: * 1. txnid was Aborted/Committed and then GC'd (compacted) * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) */ @@ -4530,6 +4764,9 @@ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, if (txnState == TXN_ABORTED) { return TxnStatus.ABORTED; } + if (txnState == TXN_COMMITTED) { + return TxnStatus.COMMITTED; + } assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; } return TxnStatus.OPEN; @@ -4909,11 +5146,15 @@ private static synchronized DataSource setupJdbcConnectionPool(Configuration con static boolean isRetryable(Configuration conf, Exception ex) { if(ex instanceof SQLException) { SQLException sqlException = (SQLException)ex; - if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) { + // Manual retry exception was thrown + return true; + } + if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) { //in MSSQL this means Communication Link Failure return true; } - if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || + if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || sqlException.getMessage().contains("consistent read failure; rollback data not available")) { return true; } @@ -5092,10 +5333,25 @@ public LockHandle acquireLock(String key) throws MetaException { return acquireLock(key); } } + + @Override public void acquireLock(String key, LockHandle handle) { //the idea is that this will use LockHandle.dbConn throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented"); } + + /** + * Acquire the global txn lock, used to mutex the openTxn and commitTxn. + * @param stmt Statement to execute the lock on + * @param shared either SHARED_READ or EXCLUSIVE + * @throws SQLException + */ + private void acquireTxnLock(Statement stmt, boolean shared) throws SQLException, MetaException { + String sqlStmt = sqlGenerator.createTxnLockStatement(shared); + stmt.execute(sqlStmt); + LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared); + } + private static final class LockHandleImpl implements LockHandle { private final Connection dbConn; private final Statement stmt; @@ -5132,6 +5388,7 @@ public void releaseLocks() { } } + private static class NoPoolConnectionPool implements DataSource { // Note that this depends on the fact that no-one in this class calls anything but // getConnection. If you want to use any of the Logger or wrap calls you'll have to diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 87130a519d..e8ac71ae55 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -44,7 +44,7 @@ /** * Prefix for key when committing with a key/value. */ - public static final String TXN_KEY_START = "_meta"; + String TXN_KEY_START = "_meta"; enum MUTEX_KEY { Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, @@ -380,18 +380,19 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by - * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). */ @RetrySemantics.SafeToRetry void cleanTxnToWriteIdTable() throws MetaException; /** - * Clean up aborted transactions from txns that have no components in txn_components. The reson such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such + * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called, + * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window. */ @RetrySemantics.SafeToRetry - void cleanEmptyAbortedTxns() throws MetaException; + void cleanEmptyAbortedAndCommittedTxns() throws MetaException; /** * This will take all entries assigned to workers @@ -442,7 +443,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * transaction metadata once it becomes unnecessary. */ @RetrySemantics.SafeToRetry - void performWriteSetGC(); + void performWriteSetGC() throws MetaException; /** * Determine if there are enough consecutive failures compacting a table or partition that no @@ -461,6 +462,12 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @VisibleForTesting long setTimeout(long milliseconds); + @VisibleForTesting + long getOpenTxnTimeOutMillis(); + + @VisibleForTesting + void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis); + @RetrySemantics.Idempotent MutexAPI getMutexAPI(); @@ -473,7 +480,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old */ interface MutexAPI { /** - * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * The {@code key} is name of the lock. Will acquire an exclusive lock or block. It returns * a handle which must be used to release the lock. Each invocation returns a new handle. */ LockHandle acquireLock(String key) throws MetaException; diff --git standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 366b6f02c1..727abce951 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -234,9 +234,8 @@ CREATE TABLE "APP"."CTLGS" ( "CREATE_TIME" INTEGER); -- Insert a default value. The location is TBD. Hive will fix this when it starts -INSERT INTO "APP"."CTLGS" - ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME") - VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); +INSERT INTO "APP"."CTLGS" ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME") +VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); -- ---------------------------------------------- -- DML Statements @@ -524,7 +523,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -536,6 +535,9 @@ CREATE TABLE TXNS ( TXN_TYPE integer ); +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, @@ -559,10 +561,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); -CREATE TABLE NEXT_TXN_ID ( - NTXN_NEXT bigint NOT NULL +CREATE TABLE TXN_LOCK_TBL ( + TXN_LOCK bigint NOT NULL ); -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO TXN_LOCK_TBL VALUES(1); CREATE TABLE HIVE_LOCKS ( HL_LOCK_EXT_ID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 8a3cd56658..db2f43df1c 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -68,5 +68,19 @@ ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000); ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES (1000000000, 'c', 0, 0, '_', '_'); +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +ALTER TABLE TXNS DROP COLUMN TXN_ID; +ALTER TABLE TXNS ADD COLUMN TXN_ID BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY (START WITH 1000000001, INCREMENT BY 1); +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; + +RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 2e0177723d..6906bdf6b9 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1096,28 +1096,31 @@ CREATE TABLE NEXT_LOCK_ID( INSERT INTO NEXT_LOCK_ID VALUES(1); -CREATE TABLE NEXT_TXN_ID( - NTXN_NEXT bigint NOT NULL +CREATE TABLE TXN_LOCK_TBL( + TXN_LOCK bigint NOT NULL ); -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO TXN_LOCK_TBL VALUES(1); CREATE TABLE TXNS( - TXN_ID bigint NOT NULL, - TXN_STATE char(1) NOT NULL, - TXN_STARTED bigint NOT NULL, - TXN_LAST_HEARTBEAT bigint NOT NULL, - TXN_USER nvarchar(128) NOT NULL, - TXN_HOST nvarchar(128) NOT NULL, + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, TXN_AGENT_INFO nvarchar(128) NULL, TXN_META_INFO nvarchar(128) NULL, TXN_HEARTBEAT_COUNT int NULL, TXN_TYPE int NULL, PRIMARY KEY CLUSTERED ( - TXN_ID ASC + TXN_ID ASC ) ); +SET IDENTITY_INSERT TXNS ON; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 9f3951575b..7583c5cf66 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -68,9 +68,56 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000); -- HIVE-23107 -ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL; +ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint NOT NULL; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; + +CREATE TABLE TMP_TXNS( + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, + TXN_AGENT_INFO nvarchar(128) NULL, + TXN_META_INFO nvarchar(128) NULL, + TXN_HEARTBEAT_COUNT int NULL, + TXN_TYPE int NULL, +PRIMARY KEY CLUSTERED +( + TXN_ID ASC +) +); + +SET IDENTITY_INSERT TMP_TXNS ON; +INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE) +SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX; + +SET IDENTITY_INSERT TMP_TXNS OFF; + +CREATE TABLE TMP_TXN_COMPONENTS( + TC_TXNID bigint NOT NULL, + TC_DATABASE nvarchar(128) NOT NULL, + TC_TABLE nvarchar(128) NULL, + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint +); +INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS; + +DROP TABLE TXN_COMPONENTS; +DROP TABLE TXNS; + +Exec sp_rename 'TMP_TXNS', 'TXNS'; +Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS'; +Exec sp_rename 'NEXT_TXN_ID', 'TXN_LOCK_TBL'; + +ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); +CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 0512a45cad..b7f423c326 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY AUTO_INCREMENT, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -1001,6 +1001,9 @@ CREATE TABLE TXNS ( TXN_TYPE int ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL, TC_DATABASE varchar(128) NOT NULL, @@ -1025,10 +1028,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE; -CREATE TABLE NEXT_TXN_ID ( - NTXN_NEXT bigint NOT NULL +CREATE TABLE TXN_LOCK_TBL ( + TXN_LOCK bigint NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO TXN_LOCK_TBL VALUES(1); CREATE TABLE HIVE_LOCKS ( HL_LOCK_EXT_ID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 4b82e36ab4..78a841a499 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -72,6 +72,23 @@ ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET l ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +SET FOREIGN_KEY_CHECKS = 0; +ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT; +SET FOREIGN_KEY_CHECKS = 1; +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS; +SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc); +PREPARE stmt FROM @s; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; +RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index db398e5f66..0082dcd06c 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -972,7 +972,7 @@ ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_FK2 FOREIGN KEY (TBL_ID -- Transaction and lock tables ------------------------------ CREATE TABLE TXNS ( - TXN_ID NUMBER(19) PRIMARY KEY, + TXN_ID NUMBER(19) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, TXN_STATE char(1) NOT NULL, TXN_STARTED NUMBER(19) NOT NULL, TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL, @@ -984,6 +984,9 @@ CREATE TABLE TXNS ( TXN_TYPE number(10) ) ROWDEPENDENCIES; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '_', '_'); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID NUMBER(19) NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE VARCHAR2(128) NOT NULL, @@ -1007,10 +1010,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); -CREATE TABLE NEXT_TXN_ID ( - NTXN_NEXT NUMBER(19) NOT NULL +CREATE TABLE TXN_LOCK_TBL ( + TXN_LOCK NUMBER(19) NOT NULL ); -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO TXN_LOCK_TBL VALUES(1); CREATE TABLE HIVE_LOCKS ( HL_LOCK_EXT_ID NUMBER(19) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index 1be83fc4a9..51f52a44b9 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -72,6 +72,21 @@ ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL; ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19); DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES (1000000000, 'c', 0, 0, '_', '_'); +-- DECLARE max_txn NUMBER; +-- BEGIN +-- SELECT MAX(TXN_ID) + 1 INTO max_txn FROM TXNS; +-- EXECUTE IMMEDIATE 'CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH ' || max_txn || ' CACHE 20'; +-- END; +CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH 1000000001 CACHE 20; +ALTER TABLE TXNS MODIFY TXN_ID default TXNS_TXN_ID_SEQ.nextval; + +RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index e6e30160af..717e707407 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC; -- Transaction and lock tables ------------------------------ CREATE TABLE "TXNS" ( - "TXN_ID" bigint PRIMARY KEY, + "TXN_ID" bigserial PRIMARY KEY, "TXN_STATE" char(1) NOT NULL, "TXN_STARTED" bigint NOT NULL, "TXN_LAST_HEARTBEAT" bigint NOT NULL, @@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" ( "TXN_HEARTBEAT_COUNT" integer, "TXN_TYPE" integer ); +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE "TXN_COMPONENTS" ( "TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"), @@ -1693,10 +1695,10 @@ CREATE TABLE "COMPLETED_TXN_COMPONENTS" ( CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON "COMPLETED_TXN_COMPONENTS" USING btree ("CTC_DATABASE", "CTC_TABLE", "CTC_PARTITION"); -CREATE TABLE "NEXT_TXN_ID" ( - "NTXN_NEXT" bigint NOT NULL +CREATE TABLE "TXN_LOCK_TBL" ( + "TXN_LOCK" bigint NOT NULL ); -INSERT INTO "NEXT_TXN_ID" VALUES(1); +INSERT INTO "TXN_LOCK_TBL" VALUES(1); CREATE TABLE "HIVE_LOCKS" ( "HL_LOCK_EXT_ID" bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index b90cecb173..63a5c4422c 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -203,6 +203,15 @@ ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000); ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint; DROP TABLE "MIN_HISTORY_LEVEL"; +-- HIVE-23048 +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + SELECT COALESCE(MAX("CTC_TXNID"),0), 'c', 0, 0, '', '' FROM "COMPLETED_TXN_COMPONENTS"; +CREATE SEQUENCE "TXNS_TXN_ID_SEQ" MINVALUE 0 OWNED BY "TXNS"."TXN_ID"; +select setval('"TXNS_TXN_ID_SEQ"', (SELECT MAX("TXN_ID") FROM "TXNS")); +ALTER TABLE "TXNS" ALTER "TXN_ID" SET DEFAULT nextval('"TXNS_TXN_ID_SEQ"'); + +ALTER TABLE "NEXT_TXN_ID" RENAME TO "TXN_LOCK_TBL"; + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java index c1a1629548..2ebba393bc 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java @@ -36,6 +36,7 @@ public void upgrade() throws HiveMetaException { Assert.assertEquals(0, getRule().createUser()); Assert.assertEquals(0, getRule().installAVersion(FIRST_VERSION)); Assert.assertEquals(0, getRule().upgradeToLatest()); + Assert.assertEquals(0, getRule().validateSchema()); } protected abstract DatabaseRule getRule(); diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java index a6d22d19ef..e06011f6a5 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java @@ -68,11 +68,11 @@ public DatabaseRule() { public DatabaseRule setVerbose(boolean verbose) { this.verbose = verbose; return this; - }; + } public String getDb() { return HIVE_DB; - }; + } /** * URL to use when connecting as root rather than Hive @@ -147,7 +147,7 @@ public void after() { // stopAndRmDockerContainer protected String getDockerContainerName(){ return String.format("metastore-test-%s-install", getDbType()); - }; + } private ProcessResults runCmd(String[] cmd, long secondsToWait) throws IOException, InterruptedException { @@ -275,7 +275,7 @@ public int upgradeToLatest() { "-dbType", getDbType(), "-userName", - HIVE_USER, + getHiveUser(), "-passWord", getHivePassword(), "-url", @@ -289,4 +289,20 @@ public void install() { createUser(); installLatest(); } + + public int validateSchema() { + return new MetastoreSchemaTool().setVerbose(verbose).run(buildArray( + "-validate", + "-dbType", + getDbType(), + "-userName", + getHiveUser(), + "-passWord", + getHivePassword(), + "-url", + getJdbcUrl(), + "-driver", + getJdbcDriver() + )); + } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java index 0b070e19ac..21c5de1fb9 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java @@ -24,7 +24,7 @@ @Override public String getDockerImageName() { - return "orangehrm/oracle-xe-11g"; + return "pvargacl/oracle-xe-18.4.0"; } @Override @@ -32,10 +32,6 @@ public String getDockerImageName() { return buildArray( "-p", "1521:1521", - "-e", - "DEFAULT_SYS_PASS=" + getDbRootPassword(), - "-e", - "ORACLE_ALLOW_REMOTE=true", "-d" ); } @@ -72,11 +68,16 @@ public String getInitialJdbcUrl() { @Override public boolean isContainerReady(String logOutput) { - return logOutput.contains("Oracle started successfully!"); + return logOutput.contains("DATABASE IS READY TO USE!"); } @Override public String getHivePassword() { return HIVE_PASSWORD; } + + @Override + public String getHiveUser() { + return "c##"+ super.getHiveUser(); + } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java new file mode 100644 index 0000000000..8e1794a8a6 --- /dev/null +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Test openTxn and getOpenTxnList calls on TxnStore. + */ +public class TestOpenTxn { + + private Configuration conf = MetastoreConf.newMetastoreConf(); + private TxnStore txnHandler; + + @Before + public void setUp() throws Exception { + // This will init the metastore db + txnHandler = TxnUtils.getTxnStore(conf); + TxnDbUtil.prepDb(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(conf); + } + + @Test + public void testSingleOpen() throws MetaException { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Assert.assertEquals(1, txnId); + } + + @Test + public void testGap() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + + } + + @Test + public void testGapWithOldOpen() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + Thread.sleep(1000); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + } + + @Test + public void testGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); + } + + @Test + public void testMultiGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(new OpenTxnRequest(10, "me", "localhost")).getTxn_ids().get(0); + deleteTransaction(second, second + 9); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(11, openTxns.getOpen_txnsSize()); + } + + private void deleteTransaction(long txnId) throws SQLException { + deleteTransaction(txnId, txnId); + } + + private void deleteTransaction(long minTxnId, long maxTxnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + +}