diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..23512e252e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -151,7 +151,7 @@ public void run() { recoverFailedCompactions(true); // Clean anything from the txns table that has no components left in txn_components. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 15fcfc0e35..53e26299f6 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -512,9 +512,14 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() // Check that we are cleaning up the empty aborted transactions GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); + // Create one aborted for low water mark + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); + assertEquals(3, txnList.getOpen_txnsSize()); + txnHandler.setOpenTxnTimeOutMillis(1000); rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); rqst.setPartitionname("bar"); @@ -529,9 +534,13 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() txnHandler.markCleaned(ci); txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); + // The open txn will became the low water mark + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); + txnHandler.setOpenTxnTimeOutMillis(1000); } @Test diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 1d211857bf..d96bc6d397 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -193,30 +193,40 @@ public void testAbortTxn() throws Exception { boolean gotException = false; try { txnHandler.abortTxn(new AbortTxnRequest(2)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; - //if this wasn't an empty txn, we'd get a better msg - Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); + // this is the last committed, so it is still in the txns table + Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); gotException = false; txnHandler.commitTxn(new CommitTxnRequest(3)); try { txnHandler.abortTxn(new AbortTxnRequest(3)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; //txn 3 is not empty txn, so we get a better msg Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); + txnHandler.setOpenTxnTimeOutMillis(1); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + txnHandler.setOpenTxnTimeOutMillis(1000); gotException = false; try { - txnHandler.abortTxn(new AbortTxnRequest(4)); + txnHandler.abortTxn(new AbortTxnRequest(2)); + } catch(NoSuchTxnException ex) { + gotException = true; + // now the second transaction is cleared and since it was empty, we do not recognize it anymore + Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); } - catch(NoSuchTxnException ex) { + Assert.assertTrue(gotException); + + gotException = false; + try { + txnHandler.abortTxn(new AbortTxnRequest(4)); + } catch(NoSuchTxnException ex) { gotException = true; Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage()); } @@ -1634,9 +1644,11 @@ private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPoli @Test public void testReplOpenTxn() throws Exception { int numTxn = 50000; - String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); + txnHandler.setOpenTxnTimeOutMillis(30000); List txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*"); + txnHandler.setOpenTxnTimeOutMillis(1000); assert(txnList.size() == numTxn); txnHandler.abortTxns(new AbortTxnsRequest(txnList)); } @@ -1644,7 +1656,7 @@ public void testReplOpenTxn() throws Exception { @Test public void testReplAllocWriteId() throws Exception { int numTxn = 2; - String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); List srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1) .boxed().collect(Collectors.toList()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 2c13e8dd03..f9eaa27ff9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -89,6 +89,7 @@ protected HiveConf hiveConf; protected Driver d; + private TxnStore txnHandler; protected enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart", "p"), @@ -151,6 +152,7 @@ void setUpWithTableProperties(String tableProperties) throws Exception { TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); File f = new File(TEST_WAREHOUSE_DIR); if (f.exists()) { FileUtil.fullyDelete(f); @@ -1380,6 +1382,8 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp // now compact and see if compaction still preserves the data correctness runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'"); runWorker(hiveConf); + // create a low water mark aborted transaction and clean the older ones + createAbortLowWaterMark(); runCleaner(hiveConf); // Cleaner would remove the obsolete files. // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned. @@ -1403,6 +1407,14 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp Assert.assertEquals(Arrays.asList(expectedResult), rs); } + protected void createAbortLowWaterMark() throws Exception{ + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("select * from " + Table.ACIDTBL); + // wait for metastore.txn.opentxn.timeout + Thread.sleep(1000); + runInitiator(hiveConf); + } + @Test public void testETLSplitStrategyForACID() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL"); @@ -2096,7 +2108,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); @@ -2109,7 +2121,7 @@ public void testCleanerForTxnToWriteId() throws Exception { txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 51b0fa336f..5b8c6701e1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -479,11 +479,15 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -500,11 +504,14 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -512,4 +519,8 @@ private void assertTableIsEmpty(String table) throws Exception { Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table)); } + private void assertOneTxn() throws Exception { + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1, + TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index 6525ffc00a..e88ef92c86 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -465,8 +465,12 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); // Initiate a major compaction request on the table. runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); @@ -475,13 +479,17 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception runWorker(hiveConf); verifyDirAndResult(2, true); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + hiveConf.set("metastore.txn.opentxn.timeout", "1"); + runInitiator(hiveConf); + hiveConf.set("metastore.txn.opentxn.timeout", "1000"); // Run Cleaner. runCleaner(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); verifyDirAndResult(0, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 1435269ed3..3ff68a3c7e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -57,6 +59,8 @@ public TestName testName = new TestName(); protected HiveConf hiveConf; Driver d; + private TxnStore txnHandler; + public enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), @@ -75,6 +79,10 @@ public String toString() { } } + public TxnStore getTxnStore() { + return txnHandler; + } + @Before public void setUp() throws Exception { setUpInternal(); @@ -106,6 +114,7 @@ void setUpInternal() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); TxnDbUtil.prepDb(hiveConf); File f = new File(getWarehouseDir()); if (f.exists()) { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java new file mode 100644 index 0000000000..067c149132 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries. + */ +public abstract class DbTxnManagerEndToEndTestBase { + + protected static HiveConf conf = new HiveConf(Driver.class); + protected HiveTxnManager txnMgr; + protected Context ctx; + protected Driver driver, driver2; + protected TxnStore txnHandler; + + public DbTxnManagerEndToEndTestBase() { + conf + .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + TxnDbUtil.setConfValues(conf); + } + @Before + public void setUp() throws Exception { + SessionState.start(conf); + ctx = new Context(conf); + driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); + TxnDbUtil.cleanDb(conf); + TxnDbUtil.prepDb(conf); + SessionState ss = SessionState.get(); + ss.initTxnMgr(conf); + txnMgr = ss.getTxnMgr(); + Assert.assertTrue(txnMgr instanceof DbTxnManager); + txnHandler = TxnUtils.getTxnStore(conf); + + } + @After + public void tearDown() throws Exception { + driver.close(); + driver2.close(); + if (txnMgr != null) { + txnMgr.closeTxnManager(); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 73d3b91585..5e5d4841dd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -32,21 +32,14 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.TestTxnCommands2; -import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.junit.Before; import org.junit.ComparisonFailure; import org.junit.Rule; import org.junit.Test; @@ -82,43 +75,7 @@ * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with * each thread. */ -public class TestDbTxnManager2 { - private static HiveConf conf = new HiveConf(Driver.class); - private HiveTxnManager txnMgr; - private Context ctx; - private Driver driver, driver2; - private TxnStore txnHandler; - - public TestDbTxnManager2() throws Exception { - conf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - TxnDbUtil.setConfValues(conf); - } - @Before - public void setUp() throws Exception { - SessionState.start(conf); - ctx = new Context(conf); - driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); - driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); - TxnDbUtil.cleanDb(conf); - TxnDbUtil.prepDb(conf); - SessionState ss = SessionState.get(); - ss.initTxnMgr(conf); - txnMgr = ss.getTxnMgr(); - Assert.assertTrue(txnMgr instanceof DbTxnManager); - txnHandler = TxnUtils.getTxnStore(conf); - - } - @After - public void tearDown() throws Exception { - driver.close(); - driver2.close(); - if (txnMgr != null) { - txnMgr.closeTxnManager(); - } - } +public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ /** * HIVE-16688 @@ -1100,11 +1057,19 @@ public void testWriteSetTracking4() throws Exception { adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); - locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 0, locks.size()); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + // Now we can clean the write_set houseKeeper.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** * overlapping txns updating the same resource but 1st one rolls back; 2nd commits @@ -1177,10 +1142,19 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); txnMgr.commitTxn(); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + // Now we can clean the write_set MetastoreTaskThread writeSetService = new AcidWriteSetService(); writeSetService.setConf(conf); writeSetService.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java new file mode 100644 index 0000000000..6583ef7bae --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Assert; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests to check the ACID properties and isolation level requirements. + */ +public class TestDbTxnManagerIsolationProperties extends DbTxnManagerEndToEndTestBase { + + @Test + public void basicOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to read and do not run it + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("insert into gap values(1,2)"); + long second = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", second > first); + driver2.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr); + driver.run(); + + FetchTask fetchTask = driver.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + @Test + public void gapOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to delete later + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + driver.run(); + // The second one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Make sure, that the time window is great enough to consider the gap open + txnHandler.setOpenTxnTimeOutMillis(30000); + // Create a gap + deleteTransactionId(first); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long third = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", third > first); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn()); + txnHandler.setOpenTxnTimeOutMillis(1000); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + long forth = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, forth); + driver.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + + + + @Test + public void multipleGapOpenTxnsNoDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create some TXN to delete later + OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(10, "user", "local")); + openTxns.getTxn_ids().stream().forEach( txnId -> { + silentCommitTxn(new CommitTxnRequest(txnId)); + }); + + long first = openTxns.getTxn_ids().get(0); + long last = openTxns.getTxn_ids().get(9); + // The next one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Make sure, that the time window is great enough to consider the gap open + txnHandler.setOpenTxnTimeOutMillis(30000); + // Create a gap + deleteTransactionId(first, last); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long next = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", next > last); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn()); + txnHandler.setOpenTxnTimeOutMillis(1000); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + next = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, next); + driver.run(); + + // Now we run our read query it should not see the write results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("No dirty read", 0, res.size()); + + } + + @Test + public void gapOpenTxnsDirtyRead() throws Exception { + driver.run(("drop table if exists gap")); + driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')"); + // Create one TXN to delete later + driver.compileAndRespond("select * from gap"); + long first = txnMgr.getCurrentTxnId(); + driver.run(); + //The second one we use for Low water mark + driver.run("select * from gap"); + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // Now we wait for the time window to move forward + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + // Create a gap + deleteTransactionId(first); + CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap"); + long third = txnMgr2.getCurrentTxnId(); + Assert.assertTrue("Sequence number goes onward", third > first); + ValidTxnList validTxns = txnMgr2.getValidTxns(); + Assert.assertNull("Expect to see no gap", validTxns.getMinOpenTxn()); + + // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call + // This should never happen + setBackSequence(first); + swapTxnManager(txnMgr); + driver.compileAndRespond("insert into gap values(1,2)"); + long forth = txnMgr.getCurrentTxnId(); + Assert.assertEquals(first, forth); + driver.run(); + + // Now we run our read query it should unfortunately see the results of the insert + swapTxnManager(txnMgr2); + driver2.run(); + + FetchTask fetchTask = driver2.getFetchTask(); + List res = new ArrayList(); + fetchTask.fetch(res); + Assert.assertEquals("Dirty read!", 1, res.size()); + + } + private void silentCommitTxn(CommitTxnRequest commitTxnRequest) { + try { + txnHandler.commitTxn(commitTxnRequest); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private void deleteTransactionId(long txnId) throws SQLException { + deleteTransactionId(txnId, txnId); + } + + private void deleteTransactionId(long minTxnId, long maxTxnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + private void setBackSequence(long txnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("ALTER TABLE TXNS ALTER TXN_ID RESTART WITH " + txnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + return SessionState.get().setTxnMgr(txnMgr); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 1151466f8c..e4ff14a140 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -218,18 +218,19 @@ public void cleanEmptyAbortedTxns() throws Exception { req.setTxnid(txnid); LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - + txnHandler.setOpenTxnTimeOutMillis(30000); conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); - + txnHandler.setOpenTxnTimeOutMillis(1); startInitiator(); openTxns = txnHandler.getOpenTxns(); - Assert.assertEquals(1, openTxns.getOpen_txnsSize()); + // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); } @Test diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index a874121e12..38874697db 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1199,6 +1199,8 @@ public static ConfVars getMetaConf(String name) { "class is used to store and retrieve transactions and locks"), TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS, "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS, + "Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"), URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "", "If set, fully qualified class name of resolver for hive metastore uri's"), USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 49b737ecf9..43e5f7fc5f 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** * Helper class that generates SQL queries with syntax specific to target DB @@ -57,9 +58,24 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object * @return List PreparedStatement objects for fully formed INSERT INTO ... statements */ + public List createInsertValuesPreparedStmt(Connection dbConn, + String tblColumns, List rows, + List> paramsList) throws SQLException { + return createInsertValuesPreparedStmt(dbConn, tblColumns, rows, paramsList, null); + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object + * @param generatedKeys list of fields to return for generatedKeys + * @return List PreparedStatement objects for fully formed INSERT INTO ... statements + */ public List createInsertValuesPreparedStmt(Connection dbConn, String tblColumns, List rows, - List> paramsList) + List> paramsList, String[] generatedKeys) throws SQLException { if (rows == null || rows.size() == 0) { return Collections.emptyList(); @@ -75,7 +91,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { try { for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) { String sql = insertStmts.get(stmtIdx); - PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null); + PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null, generatedKeys); if (paramsList != null) { int paramIdx = 1; int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx); @@ -99,7 +115,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -110,7 +126,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -263,8 +279,30 @@ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaEx * @throws SQLException */ public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters) - throws SQLException { - PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + throws SQLException { + return prepareStmtWithParameters(dbConn, sql, parameters, null); + } + + /** + * Make PreparedStatement object with list of String type parameters to be set. + * It is assumed the input sql string have the number of "?" equal to number of parameters + * passed as input. + * @param dbConn - Connection object + * @param sql - SQL statement with "?" for input parameters. + * @param parameters - List of String type parameters to be set in PreparedStatement object + * @param generatedKeys - list of fields to return at getGeneratedKeys + * @return PreparedStatement type object + * @throws SQLException + */ + public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters, + String[] generatedKeys) throws SQLException { + PreparedStatement pst; + if (generatedKeys !=null && generatedKeys.length > 0) { + pst = dbConn.prepareStatement(addEscapeCharacters(sql), generatedKeys); + } else { + pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + } + if ((parameters == null) || parameters.isEmpty()) { return pst; } @@ -292,4 +330,60 @@ public String addEscapeCharacters(String s) { return s; } + /** + * Creates an unlock statement for table locks. + * Most dbms do not have this feature, the lock will be released on the end of the transaction. + * @return null or the statement to run if the dbProduct is unknown + * @throws MetaException + */ + public Optional createUnLockTableStatement() throws MetaException{ + switch (dbProduct) { + case MYSQL: + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return Optional.of("UNLOCK TABLES"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return Optional.empty(); + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Creates a lock table statement based on the dbProduct in shared read / exclusive mode. + * @param txnLockTable table to lock + * @param shared shared or exclusive lock + * @return + * @throws MetaException if the dbProduct is unknown + */ + public String createLockTableStatement(String txnLockTable, boolean shared) throws MetaException{ + + switch (dbProduct) { + case MYSQL: + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "LOCK TABLES \"" + txnLockTable + "\" " + (shared ? "READ" : "WRITE"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 2344c2d5f6..17ae9bb2e3 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -223,7 +223,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { stmt = dbConn.createStatement(); String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = " - + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")" + + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")" + " WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); @@ -494,7 +494,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { // If there are no txns which are currently open or aborted in the system, then current value of // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + - "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " + + "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " + "UNION " + "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + "UNION " + @@ -504,7 +504,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); + throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } long minUncommitedTxnid = rs.getLong(1); @@ -533,25 +533,36 @@ public void cleanTxnToWriteIdTable() throws MetaException { } /** - * Clean up aborted transactions from txns that have no components in txn_components. The reason such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted / committed transactions from txns that have no components in txn_components. + * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally. + * The reason such aborted txns exist can be that now work was done in this txn + * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work) + * or due to {@link #markCleaned(CompactionInfo)} being called. */ @Override @RetrySemantics.SafeToRetry - public void cleanEmptyAbortedTxns() throws MetaException { + public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { + LOG.info("Start to clean empty aborted or committed TXNS"); try { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - //Aborted is a terminal state, so nothing about the txn can change + //Aborted and committed are terminal states, so nothing about the txn can change //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + /** + * Only delete aborted / committed transaction in a way that guarantees two things: + * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window + * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + + " \"TXN_ID\" < " + lowWaterMark; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -568,16 +579,15 @@ public void cleanEmptyAbortedTxns() throws MetaException { // Delete from TXNS. prefix.append("DELETE FROM \"TXNS\" WHERE "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); int rc = stmt.executeUpdate(query); - LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); + LOG.info("Removed " + rc + " empty Aborted and Committed transactions from TXNS"); } - LOG.info("Aborted transactions removed from TXNS: " + txnids); + LOG.info("Aborted and committed transactions removed from TXNS: " + txnids); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -591,7 +601,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { close(rs, stmt, dbConn); } } catch (RetryException e) { - cleanEmptyAbortedTxns(); + cleanEmptyAbortedAndCommittedTxns(); } } @@ -1147,12 +1157,12 @@ public long findMinOpenTxnIdForCleaner() throws MetaException{ + quoteChar(READY_FOR_CLEANING) + ") \"RES\""; } else { - query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\""; } LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); if (!rs.next()) { - throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); + throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } return rs.getLong(1); } catch (SQLException e) { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index bb29410e7d..f98136021b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -95,13 +95,19 @@ public static synchronized void prepDb(Configuration conf) throws Exception { conn = getConnection(conf); stmt = conn.createStatement(); stmt.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + + " TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY," + " TXN_STATE char(1) NOT NULL," + " TXN_STARTED bigint NOT NULL," + " TXN_LAST_HEARTBEAT bigint NOT NULL," + " TXN_USER varchar(128) NOT NULL," + " TXN_HOST varchar(128) NOT NULL," + + " TXN_AGENT_INFO varchar(128)," + + " TXN_META_INFO varchar(128)," + + " TXN_HEARTBEAT_COUNT integer," + " TXN_TYPE integer)"); + // We always need one row in TXNS to work as low water mark for opentransaction calculation + stmt.execute("INSERT INTO TXNS " + "(TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) " + + "VALUES(0, 'c', 0, 0, '', '')"); stmt.execute("CREATE TABLE TXN_COMPONENTS (" + " TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID)," + @@ -661,4 +667,28 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { } return affectedRowsByQuery; } + + /** + + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements. + + * @param dbProduct DBMS type + + * @return true if supports + + * @throws MetaException + + */ + public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException { + switch (dbProduct) { + case DERBY: + case SQLSERVER: + // The getGeneratedKeys is not supported for multi line insert + return false; + case ORACLE: + case MYSQL: + case POSTGRES: + return true; + case OTHER: + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index e7910c1c5d..41e00f1f21 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -52,6 +52,7 @@ import javax.sql.DataSource; +import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @@ -69,7 +70,59 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; @@ -169,8 +222,12 @@ static final protected char MINOR_TYPE = 'i'; // Transaction states - static final protected char TXN_ABORTED = 'a'; - static final protected char TXN_OPEN = 'o'; + protected static final char TXN_ABORTED = 'a'; + protected static final char TXN_OPEN = 'o'; + protected static final char TXN_COMMITTED = 'c'; + + private static final char TXN_TMP = '_'; + //todo: make these like OperationType and remove above char constants enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} @@ -186,6 +243,9 @@ private static DataSource connPool; private static DataSource connPoolMutex; + private static final String MANUAL_RETRY = "ManualRetry"; + private static final String TXN_LOCK_TABLE = "NEXT_TXN_ID"; + // Query definitions private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + @@ -201,6 +261,9 @@ private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + " VALUES (%s, ?, ?, ?, ?, %s)"; + private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " + + "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(?,%s,%s,?,?,?)"; private List transactionalListeners; @@ -264,9 +327,11 @@ char getSqlConst() { protected Configuration conf; private static DatabaseProduct dbProduct; private static SQLGenerator sqlGenerator; + private static long openTxnTimeOutMillis; // (End user) Transaction timeout, in milliseconds. private long timeout; + // Timeout for opening a transaction private int maxBatchSize; private String identifierQuoteString; // quotes to use for quoting tables, where necessary @@ -348,6 +413,8 @@ public void setConf(Configuration conf) { maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); + openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( TransactionalMetaStoreEventListener.class, @@ -358,6 +425,9 @@ public void setConf(Configuration conf) { throw new RuntimeException(e); } } + protected static DatabaseProduct getDbProduct() { + return dbProduct; + } @Override public Configuration getConf() { @@ -368,59 +438,74 @@ public Configuration getConf() { @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // We need to figure out the HighWaterMark and the list of open transactions. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** - * This method can run at READ_COMMITTED as long as long as - * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. - * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with - * adding corresponding entries into TXNS. The reason is that any txnid below HWM - * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + /* + * This method need guarantees from + * {@link #openTxns(OpenTxnRequest)} and {@link #commitTxn(CommitTxnRequest)}. + * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark + * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark. + * Every transaction that is not found between these will be considered as open, since it may appear later. + * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and + * commitTxn must ensure, that no committed transaction will be removed before the time period expires. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); List txnInfos = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + - "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm; + + String s = + "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" + + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* + * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator + * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table. + * If there is a pending openTxns, that is already acquired it's sequenceId but not yet committed the insert + * into the TXNS table, will have either a lower txn_id than HWM and will be listed in the openTxn list, + * or will have a higher txn_id and don't effect this getOpenTxns() call. + */ + long hwm = 0; + long openTxnLowBoundary = 0; + while (rs.next()) { + long txnId = rs.getLong(1); + long age = rs.getLong(7); + hwm = txnId; + if (age < getOpenTxnTimeOutMillis()) { + // We will consider every gap as an open transaction from the previous txnId + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + openTxnLowBoundary++; + } + } else { + openTxnLowBoundary = txnId; + } char c = rs.getString(2).charAt(0); TxnState state; switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; + case TXN_COMMITTED: + // This is only here, to avoid adding this txnId as possible gap + continue; + + case TXN_ABORTED: + state = TxnState.ABORTED; + break; - case TXN_OPEN: - state = TxnState.OPEN; - break; + case TXN_OPEN: + state = TxnState.OPEN; + break; - default: - throw new MetaException("Unexpected transaction state " + c + - " found in txns table"); + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); txnInfo.setStartedTime(rs.getLong(5)); txnInfo.setLastHeartbeatTime(rs.getLong(6)); txnInfos.add(txnInfo); @@ -432,8 +517,8 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "getOpenTxnsInfo"); - throw new MetaException("Unable to select from transaction database: " + getMessage(e) - + StringUtils.stringifyException(e)); + throw new MetaException( + "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } @@ -446,42 +531,47 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // We need to figure out the current transaction number and the list of open transactions. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** + /* * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); + List openList = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\""; + String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " + + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" + + " FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + long hwm = 0; + long openTxnLowBoundary = 0; long minOpenTxn = Long.MAX_VALUE; BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); + long age = rs.getLong(4); + hwm = txnId; + if (age < getOpenTxnTimeOutMillis()) { + // We will consider every gap as an open transaction from the previous txnId + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + openList.add(openTxnLowBoundary); + minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); + openTxnLowBoundary++; + } + } else { + openTxnLowBoundary = txnId; + } char txnState = rs.getString(2).charAt(0); + if (txnState == TXN_COMMITTED) { + continue; + } if (txnState == TXN_OPEN) { minOpenTxn = Math.min(minOpenTxn, txnId); } @@ -497,7 +587,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { dbConn.rollback(); ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if(minOpenTxn < Long.MAX_VALUE) { + if (minOpenTxn < Long.MAX_VALUE) { otr.setMin_open_txn(minOpenTxn); } return otr; @@ -544,14 +634,22 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + TxnLockHandle txnLock = null; try { - lockInternal(); - /** + /* * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running. - * SELECT ... FOR UPDATE is used to prevent - * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. + * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause + * performance degradation with high transaction load. + * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every + * transaction missing from the TXNS table in that period open, and prevent opening transaction outside + * the period. + * Example: At t[0] there is one open transaction in the TXNS table, T[1]. + * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. + * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. + * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s + * open transaction list. T[1] because it is presented as open in TXNS, + * T[2] because it is a missing sequence. * * In the current design, there can be several metastore instances running in a given Warehouse. * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, @@ -564,35 +662,70 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { * set could support a write-through cache for added performance. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - // Make sure the user has not requested an insane amount of txns. - int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); - if (numTxns > maxTxns) numTxns = maxTxns; - stmt = dbConn.createStatement(); - List txnIds = openTxns(dbConn, stmt, rqst); + /* + * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. + * This is achieved by requesting a shared table lock here, and an exclusive one at commit. + * Since table locks are working in Derby, we don't need the lockInternal call here. + * Example: Suppose we have two transactions with update like x = x+1. + * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] + * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], + * and we will have a lost update problem + */ + txnLock = acquireTxnLock(true); + // Measure the time from acquiring the sequence value, till committing in the TXNS table + StopWatch generateTransactionWatch = new StopWatch(); + generateTransactionWatch.start(); + + List txnIds = openTxns(dbConn, rqst); LOG.debug("Going to commit"); dbConn.commit(); + generateTransactionWatch.stop(); + long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS); + TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; + if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) { + /* + * The commit was too slow, we can not allow this to continue (except if it is read only, + * since that can not cause dirty reads). + * When calculating the snapshot for a given transaction, we look back for possible open transactions + * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period. + * We can not allow a write transaction, that was slower than TXN_OPENTXN_TIMEOUT to continue, + * because there can be other transactions running, that didn't considered this transactionId open, + * this could cause dirty reads. + */ + LOG.info("OpenTxnTimeOut exceeded commit duration {}, deleting transactionIds: {}", elapsedMillis, txnIds); + deleteInvalidOpenTransactions(dbConn, txnIds); + /* + * We do not throw RetryException directly, to not circumvent the max retry limit + */ + throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY); + } return new OpenTxnsResponse(txnIds); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); - unlockInternal(); + if (txnLock != null) { + txnLock.releaseLocks(); + } } } catch (RetryException e) { return openTxns(rqst); } } - private List openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) + private List openTxns(Connection dbConn, OpenTxnRequest rqst) throws SQLException, MetaException { int numTxns = rqst.getNum_txns(); - ResultSet rs = null; + // Make sure the user has not requested an insane amount of txns. + int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) { + numTxns = maxTxns; + } List insertPreparedStmts = null; TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; try { @@ -611,51 +744,44 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } - String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - List txnIds = new ArrayList<>(numTxns); - - List rows = new ArrayList<>(); - List params = new ArrayList<>(); - params.add(rqst.getUser()); - params.add(rqst.getHostname()); - List> paramsList = new ArrayList<>(numTxns); - - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + /* + * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert. + * But it is supported in every used dbms for single line insert, even if the metadata says otherwise. + * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field, + * then the keys are selected beck with that batchid. + */ + boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct); + genKeySupport = genKeySupport || (numTxns == 1); + + String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct), TxnDbUtil.getEpochFn(dbProduct)); + LOG.debug("Going to execute insert <" + insertQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) { + for (int i = 0; i < numTxns; ++i) { + ps.setString(1, genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP)); + ps.setString(2, rqst.getUser()); + ps.setString(3, rqst.getHostname()); + ps.setInt(4, txnType.getValue()); + ps.addBatch(); + + if ((i + 1) % maxBatchSize == 0) { + executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, txnIds, genKeySupport, ps); + } + } + if (numTxns % maxBatchSize != 0) { + executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, txnIds, genKeySupport, ps); + } } + assert txnIds.size() == numTxns; + if (rqst.isSetReplPolicy()) { - List rowsRepl = new ArrayList<>(); - for (PreparedStatement pst : insertPreparedStmts) { - pst.close(); - } - insertPreparedStmts.clear(); - params.clear(); - paramsList.clear(); + List rowsRepl = new ArrayList<>(numTxns); + List params = new ArrayList<>(1); + List> paramsList = new ArrayList<>(numTxns); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { - rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); paramsList.add(params); } @@ -678,10 +804,117 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { pst.close(); } } - close(rs); } } + private void executeTxnInsertBatchAndExtractGeneratedKeys(Connection dbConn, List txnIds, + boolean genKeySupport, PreparedStatement ps) throws SQLException { + ps.executeBatch(); + if (genKeySupport) { + try (ResultSet generatedKeys = ps.getGeneratedKeys()) { + while (generatedKeys.next()) { + txnIds.add(generatedKeys.getLong(1)); + } + } + } else { + try (PreparedStatement pstmt = dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) { + pstmt.setString(1, Character.toString(TXN_TMP)); + try (ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + txnIds.add(rs.getLong(1)); + } + } + } + try (PreparedStatement pstmt = dbConn + .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) { + pstmt.setString(1, Character.toString(TXN_OPEN)); + pstmt.setString(2, Character.toString(TXN_TMP)); + pstmt.executeUpdate(); + } + } + } + + public void deleteInvalidOpenTransactions(Connection dbConn, List txnIds) throws MetaException { + if (txnIds.size() == 0) { + return; + } + try { + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + prefix.append("DELETE FROM \"TXNS\" WHERE "); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false); + for (String s : queries) { + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + } + } catch (RetryException ex) { + deleteInvalidOpenTransactions(dbConn, txnIds); + } + } + + @Override + public long getOpenTxnTimeOutMillis() { + return openTxnTimeOutMillis; + } + + @Override + public void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis) { + this.openTxnTimeOutMillis = openTxnTimeOutMillis; + } + + protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException { + long maxTxnId; + String s = + "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"" + " WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - " + + openTxnTimeOutMillis + ")"; + try (Statement stmt = dbConn.createStatement()) { + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) { + maxTxnIdRs.next(); + maxTxnId = maxTxnIdRs.getLong(1); + if (maxTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + } + return maxTxnId; + } + + protected long getHighWaterMark(Statement stmt) throws SQLException, MetaException { + String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\""; + LOG.debug("Going to execute query <" + s + ">"); + long maxOpenTxnId; + try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) { + maxOpenTxnIdRs.next(); + maxOpenTxnId = maxOpenTxnIdRs.getLong(1); + if (maxOpenTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + return maxOpenTxnId; + } + private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Connection dbConn) throws SQLException { PreparedStatement pst = null; @@ -707,7 +940,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); return targetTxnIdList; - } catch (SQLException e) { + } catch (SQLException e) { LOG.warn("failed to get target txn ids " + e.getMessage()); throw e; } finally { @@ -1081,6 +1314,7 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; + TxnLockHandle txnLock = null; Long commitId = null; try { lockInternal(); @@ -1149,7 +1383,7 @@ public void commitTxn(CommitTxnRequest rqst) * even if it includes all of its columns * * First insert into write_set using a temporary commitID, which will be updated in a separate call, - * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long)}}. + * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long)}}. * This should decrease the scope of the S4U lock on the next_txn_id table. */ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); @@ -1163,13 +1397,8 @@ public void commitTxn(CommitTxnRequest rqst) * at the same time and no new txns start until all 3 commit. * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. */ - try (ResultSet commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""))) { - if (!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); - } - commitId = commitIdRs.getLong(1); - } - + txnLock = acquireTxnLock(false); + commitId = getHighWaterMark(stmt); /** * see if there are any overlapping txns that wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, @@ -1201,9 +1430,8 @@ public void commitTxn(CommitTxnRequest rqst) throw new TxnAbortedException(msg); } } - } - else { - /** + } else { + /* * current txn didn't update/delete anything (may have inserted), so just proceed with commit * * We only care about commit id for write txns, so for RO (when supported) txns we don't @@ -1213,6 +1441,7 @@ public void commitTxn(CommitTxnRequest rqst) * If RO < W, then there is no reads-from relationship. * In replication flow we don't expect any write write conflict as it should have been handled at source. */ + assert true; } if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { @@ -1243,7 +1472,7 @@ public void commitTxn(CommitTxnRequest rqst) } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId); + updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId); if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(rqst, stmt); } @@ -1262,8 +1491,10 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(null, stmt, dbConn); + if (txnLock != null) { + txnLock.releaseLocks(); + } unlockInternal(); } } catch (RetryException e) { @@ -1328,7 +1559,8 @@ private void moveTxnComponentsToCompleted(Statement stmt, long txnid, char isUpd } } - private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId) throws SQLException { + private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId) + throws SQLException { List queryBatch = new ArrayList<>(5); // update write_set with real commitId if (commitId != null) { @@ -1340,7 +1572,8 @@ private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnTyp queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid); } queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); - queryBatch.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid); + // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate + queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid); queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); // execute all in one batch @@ -1421,7 +1654,9 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx if (numAbortedWrites > 0) { // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted. - List txnIds = openTxns(dbConn, stmt, + // We don't use the txnLock, all of these transactions will be aborted in this one rdbm transaction + // So they will not effect the commitTxn in any way + List txnIds = openTxns(dbConn, new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName())); assert(numAbortedWrites == txnIds.size()); @@ -1481,7 +1716,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } unlockInternal(); @@ -1523,7 +1758,9 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; List params = Arrays.asList(names[0], names[1]); - String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; + String s = + "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND " + + "\"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = "+ writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -1991,46 +2228,37 @@ public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) @Override @RetrySemantics.SafeToRetry - public void performWriteSetGC() { + public void performWriteSetGC() throws MetaException { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""); - if(!rs.next()) { - throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); - } - long highestAllocatedTxnId = rs.getLong(1); - close(rs); + + long minOpenTxn; rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); - if(!rs.next()) { + if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } - long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark - long lowestOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - //if here then there are no Open txns and highestAllocatedTxnId must be - //resolved (i.e. committed or aborted), either way - //there are no open txns with id <= highestAllocatedTxnId - //the +1 is there because "delete ..." below has < (which is correct for the case when - //there is an open txn - //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with - //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. - commitHighWaterMark = highestAllocatedTxnId + 1; - } - else { - commitHighWaterMark = lowestOpenTxnId; + minOpenTxn = rs.getLong(1); + if (rs.wasNull()) { + minOpenTxn = Long.MAX_VALUE; } + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + /** + * We try to find the highest transactionId below everything was committed or aborted. + * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, + * because it is guaranteed there won't be open transactions below that. + */ + long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1); + LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark); int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); - } - finally { + } finally { close(rs, stmt, dbConn); } } @@ -4169,7 +4397,7 @@ public int compare(LockType t1, LockType t2) { private void checkQFileTestHack() { boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || - MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); + MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); // Set up the transaction/locking db in the derby metastore @@ -4177,11 +4405,7 @@ private void checkQFileTestHack() { try { TxnDbUtil.prepDb(conf); } catch (Exception e) { - // We may have already created the tables and thus don't need to redo it. - if (e.getMessage() != null && !e.getMessage().contains("already exists")) { - throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage(), e); - } + throw new RuntimeException("Unable to set up transaction database for" + " testing: " + e.getMessage(), e); } } } @@ -4529,7 +4753,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) } /** - * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot: + * Returns the state of the transaction if it's able to determine it. Some cases where it cannot: * 1. txnid was Aborted/Committed and then GC'd (compacted) * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) */ @@ -4554,6 +4778,9 @@ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, if (txnState == TXN_ABORTED) { return TxnStatus.ABORTED; } + if (txnState == TXN_COMMITTED) { + return TxnStatus.COMMITTED; + } assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; } return TxnStatus.OPEN; @@ -4954,11 +5181,15 @@ private static synchronized DataSource setupJdbcConnectionPool(Configuration con static boolean isRetryable(Configuration conf, Exception ex) { if(ex instanceof SQLException) { SQLException sqlException = (SQLException)ex; - if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) { + // Manual retry exception was thrown + return true; + } + if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) { //in MSSQL this means Communication Link Failure return true; } - if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || + if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || sqlException.getMessage().contains("consistent read failure; rollback data not available")) { return true; } @@ -5137,10 +5368,47 @@ public LockHandle acquireLock(String key) throws MetaException { return acquireLock(key); } } + + @Override public void acquireLock(String key, LockHandle handle) { //the idea is that this will use LockHandle.dbConn throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented"); } + + /** + * Acquire the global txn lock, used to mutex the openTxn and commitTxn. + * @param shared either SHARED_READ or EXCLUSIVE + * @return lockhandle to release the lock + * @throws MetaException + */ + public TxnLockHandle acquireTxnLock(boolean shared) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + try { + try { + String sqlStmt = sqlGenerator.createLockTableStatement(TXN_LOCK_TABLE, shared); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + stmt = dbConn.createStatement(); + if (LOG.isDebugEnabled()) { + LOG.debug("About to execute SQL: " + sqlStmt); + } + stmt.execute(sqlStmt); + LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared); + return new TxnLockHandle(dbConn, stmt); + } catch (SQLException ex) { + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + checkRetryable(dbConn, ex, "acquireTxnLock(" + shared + ")"); + throw new MetaException( + "Unable to lock TxnLock due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex)); + } + } catch (RetryException ex) { + return acquireTxnLock(shared); + } + + } + private static final class LockHandleImpl implements LockHandle { private final Connection dbConn; private final Statement stmt; @@ -5177,6 +5445,33 @@ public void releaseLocks() { } } + private static final class TxnLockHandle { + private final Connection dbConn; + private final Statement stmt; + + TxnLockHandle(Connection conn, Statement stmt) { + this.dbConn = conn; + this.stmt = stmt; + } + + public void releaseLocks() throws MetaException { + try { + Optional s = sqlGenerator.createUnLockTableStatement(); + if (s.isPresent()) { + // Most dbms do not require an other statement to release the lock, the rollback will do that + stmt.execute(s.get()); + } + } catch (SQLException ex) { + LOG.error("Unable to release table locks", ex); + throw new MetaException("Unable release table locks " + StringUtils.stringifyException(ex)); + } + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + LOG.debug("TXN lock unlocked by " + quoteString(TxnHandler.hostname)); + } + } + private static class NoPoolConnectionPool implements DataSource { // Note that this depends on the fact that no-one in this class calls anything but // getConnection. If you want to use any of the Logger or wrap calls you'll have to diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 87130a519d..ad1834338b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -44,7 +44,7 @@ /** * Prefix for key when committing with a key/value. */ - public static final String TXN_KEY_START = "_meta"; + String TXN_KEY_START = "_meta"; enum MUTEX_KEY { Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, @@ -386,12 +386,13 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old void cleanTxnToWriteIdTable() throws MetaException; /** - * Clean up aborted transactions from txns that have no components in txn_components. The reson such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such + * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called, + * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window. */ @RetrySemantics.SafeToRetry - void cleanEmptyAbortedTxns() throws MetaException; + void cleanEmptyAbortedAndCommittedTxns() throws MetaException; /** * This will take all entries assigned to workers @@ -442,7 +443,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * transaction metadata once it becomes unnecessary. */ @RetrySemantics.SafeToRetry - void performWriteSetGC(); + void performWriteSetGC() throws MetaException; /** * Determine if there are enough consecutive failures compacting a table or partition that no @@ -461,6 +462,12 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @VisibleForTesting long setTimeout(long milliseconds); + @VisibleForTesting + long getOpenTxnTimeOutMillis(); + + @VisibleForTesting + void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis); + @RetrySemantics.Idempotent MutexAPI getMutexAPI(); @@ -473,7 +480,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old */ interface MutexAPI { /** - * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * The {@code key} is name of the lock. Will acquire an exclusive lock or block. It returns * a handle which must be used to release the lock. Each invocation returns a new handle. */ LockHandle acquireLock(String key) throws MetaException; diff --git standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 1ace9d3ef0..6c3deddff1 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -234,7 +234,8 @@ CREATE TABLE "APP"."CTLGS" ( "CREATE_TIME" INTEGER); -- Insert a default value. The location is TBD. Hive will fix this when it starts -INSERT INTO "APP"."CTLGS" VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); +INSERT INTO "APP"."CTLGS" ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME") +VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); -- ---------------------------------------------- -- DML Statements @@ -522,7 +523,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -534,6 +535,9 @@ CREATE TABLE TXNS ( TXN_TYPE integer ); +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 8a3cd56658..27d960ef7c 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -68,5 +68,17 @@ ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000); ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +ALTER TABLE TXNS DROP COLUMN TXN_ID; +ALTER TABLE TXNS ADD COLUMN TXN_ID BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY (START WITH 1, INCREMENT BY 1); +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +-- TODO +ALTER TABLE TXNS ALTER TXN_ID RESTART WITH 1000000000; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 2e0177723d..1631eaa9f8 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1103,21 +1103,24 @@ CREATE TABLE NEXT_TXN_ID( INSERT INTO NEXT_TXN_ID VALUES(1); CREATE TABLE TXNS( - TXN_ID bigint NOT NULL, - TXN_STATE char(1) NOT NULL, - TXN_STARTED bigint NOT NULL, - TXN_LAST_HEARTBEAT bigint NOT NULL, - TXN_USER nvarchar(128) NOT NULL, - TXN_HOST nvarchar(128) NOT NULL, + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, TXN_AGENT_INFO nvarchar(128) NULL, TXN_META_INFO nvarchar(128) NULL, TXN_HEARTBEAT_COUNT int NULL, TXN_TYPE int NULL, PRIMARY KEY CLUSTERED ( - TXN_ID ASC + TXN_ID ASC ) ); +SET IDENTITY_INSERT TXNS ON; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 9f3951575b..359b085eb4 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -68,9 +68,55 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000); -- HIVE-23107 -ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL; +ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint NOT NULL; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; + +CREATE TABLE TMP_TXNS( + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, + TXN_AGENT_INFO nvarchar(128) NULL, + TXN_META_INFO nvarchar(128) NULL, + TXN_HEARTBEAT_COUNT int NULL, + TXN_TYPE int NULL, +PRIMARY KEY CLUSTERED +( + TXN_ID ASC +) +); + +SET IDENTITY_INSERT TMP_TXNS ON; +INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE) +SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX; + +SET IDENTITY_INSERT TMP_TXNS OFF; + +CREATE TABLE TMP_TXN_COMPONENTS( + TC_TXNID bigint NOT NULL, + TC_DATABASE nvarchar(128) NOT NULL, + TC_TABLE nvarchar(128) NULL, + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint +); +INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS; + +DROP TABLE TXN_COMPONENTS; +DROP TABLE TXNS; + +Exec sp_rename 'TMP_TXNS', 'TXNS'; +Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS'; + +ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); +CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 0512a45cad..63b966a51a 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY AUTO_INCREMENT, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -1001,6 +1001,9 @@ CREATE TABLE TXNS ( TXN_TYPE int ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL, TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 4b82e36ab4..5b07f0c3bf 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -72,6 +72,22 @@ ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET l ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +SET FOREIGN_KEY_CHECKS = 0; +ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT; +SET FOREIGN_KEY_CHECKS = 1; +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS; +SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc); +PREPARE stmt FROM @s; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index db398e5f66..b1591ee6f8 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -972,7 +972,7 @@ ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_FK2 FOREIGN KEY (TBL_ID -- Transaction and lock tables ------------------------------ CREATE TABLE TXNS ( - TXN_ID NUMBER(19) PRIMARY KEY, + TXN_ID NUMBER(19) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, TXN_STATE char(1) NOT NULL, TXN_STARTED NUMBER(19) NOT NULL, TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL, @@ -984,6 +984,9 @@ CREATE TABLE TXNS ( TXN_TYPE number(10) ) ROWDEPENDENCIES; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '_', '_'); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID NUMBER(19) NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE VARCHAR2(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index 1be83fc4a9..ff908f2db0 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -72,6 +72,17 @@ ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL; ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19); DROP TABLE MIN_HISTORY_LEVEL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS; +DECLARE max_txn NUMBER; +BEGIN + SELECT MAX(TXN_ID) + 1 INTO max_txn FROM TXNS; + EXECUTE IMMEDIATE 'CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH ' || max_txn || ' CACHE 20'; +END; + +ALTER TABLE TXNS MODIFY TXN_ID default TXNS_TXN_ID_SEQ.nextval; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index e6e30160af..5a04820172 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC; -- Transaction and lock tables ------------------------------ CREATE TABLE "TXNS" ( - "TXN_ID" bigint PRIMARY KEY, + "TXN_ID" bigserial PRIMARY KEY, "TXN_STATE" char(1) NOT NULL, "TXN_STARTED" bigint NOT NULL, "TXN_LAST_HEARTBEAT" bigint NOT NULL, @@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" ( "TXN_HEARTBEAT_COUNT" integer, "TXN_TYPE" integer ); +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE "TXN_COMPONENTS" ( "TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"), diff --git standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index b90cecb173..b3643c5d99 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -203,6 +203,13 @@ ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000); ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint; DROP TABLE "MIN_HISTORY_LEVEL"; +-- HIVE-23048 +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + SELECT COALESCE(MAX("CTC_TXNID"),0), 'c', 0, 0, '', '' FROM "COMPLETED_TXN_COMPONENTS"; +CREATE SEQUENCE "TXNS_TXN_ID_SEQ" MINVALUE 0 OWNED BY "TXNS"."TXN_ID"; +select setval('"TXNS_TXN_ID_SEQ"', (SELECT MAX("TXN_ID") FROM "TXNS")); +ALTER TABLE "TXNS" ALTER "TXN_ID" SET DEFAULT nextval('"TXNS_TXN_ID_SEQ"'); + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java index c1a1629548..2ebba393bc 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java @@ -36,6 +36,7 @@ public void upgrade() throws HiveMetaException { Assert.assertEquals(0, getRule().createUser()); Assert.assertEquals(0, getRule().installAVersion(FIRST_VERSION)); Assert.assertEquals(0, getRule().upgradeToLatest()); + Assert.assertEquals(0, getRule().validateSchema()); } protected abstract DatabaseRule getRule(); diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java index a6d22d19ef..e06011f6a5 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java @@ -68,11 +68,11 @@ public DatabaseRule() { public DatabaseRule setVerbose(boolean verbose) { this.verbose = verbose; return this; - }; + } public String getDb() { return HIVE_DB; - }; + } /** * URL to use when connecting as root rather than Hive @@ -147,7 +147,7 @@ public void after() { // stopAndRmDockerContainer protected String getDockerContainerName(){ return String.format("metastore-test-%s-install", getDbType()); - }; + } private ProcessResults runCmd(String[] cmd, long secondsToWait) throws IOException, InterruptedException { @@ -275,7 +275,7 @@ public int upgradeToLatest() { "-dbType", getDbType(), "-userName", - HIVE_USER, + getHiveUser(), "-passWord", getHivePassword(), "-url", @@ -289,4 +289,20 @@ public void install() { createUser(); installLatest(); } + + public int validateSchema() { + return new MetastoreSchemaTool().setVerbose(verbose).run(buildArray( + "-validate", + "-dbType", + getDbType(), + "-userName", + getHiveUser(), + "-passWord", + getHivePassword(), + "-url", + getJdbcUrl(), + "-driver", + getJdbcDriver() + )); + } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java index 0b070e19ac..21c5de1fb9 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java @@ -24,7 +24,7 @@ @Override public String getDockerImageName() { - return "orangehrm/oracle-xe-11g"; + return "pvargacl/oracle-xe-18.4.0"; } @Override @@ -32,10 +32,6 @@ public String getDockerImageName() { return buildArray( "-p", "1521:1521", - "-e", - "DEFAULT_SYS_PASS=" + getDbRootPassword(), - "-e", - "ORACLE_ALLOW_REMOTE=true", "-d" ); } @@ -72,11 +68,16 @@ public String getInitialJdbcUrl() { @Override public boolean isContainerReady(String logOutput) { - return logOutput.contains("Oracle started successfully!"); + return logOutput.contains("DATABASE IS READY TO USE!"); } @Override public String getHivePassword() { return HIVE_PASSWORD; } + + @Override + public String getHiveUser() { + return "c##"+ super.getHiveUser(); + } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java new file mode 100644 index 0000000000..8e1794a8a6 --- /dev/null +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Test openTxn and getOpenTxnList calls on TxnStore. + */ +public class TestOpenTxn { + + private Configuration conf = MetastoreConf.newMetastoreConf(); + private TxnStore txnHandler; + + @Before + public void setUp() throws Exception { + // This will init the metastore db + txnHandler = TxnUtils.getTxnStore(conf); + TxnDbUtil.prepDb(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(conf); + } + + @Test + public void testSingleOpen() throws MetaException { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Assert.assertEquals(1, txnId); + } + + @Test + public void testGap() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + + } + + @Test + public void testGapWithOldOpen() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + Thread.sleep(1000); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + } + + @Test + public void testGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); + } + + @Test + public void testMultiGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(new OpenTxnRequest(10, "me", "localhost")).getTxn_ids().get(0); + deleteTransaction(second, second + 9); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(11, openTxns.getOpen_txnsSize()); + } + + private void deleteTransaction(long txnId) throws SQLException { + deleteTransaction(txnId, txnId); + } + + private void deleteTransaction(long minTxnId, long maxTxnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + +}