diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 498d59c359..1dfd12fda4 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -158,6 +158,9 @@ private void initialize(String cmRoot, String externalTableWarehouseRoot, String for (Map.Entry entry : overridesForHiveConf.entrySet()) { hiveConf.set(entry.getKey(), entry.getValue()); } + TxnDbUtil.cleanDb(hiveConf); + // This will properly init the metastore db + TxnDbUtil.prepDb(hiveConf); MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true, true); @@ -191,8 +194,6 @@ private void initialize(String cmRoot, String externalTableWarehouseRoot, String SessionState.start(new CliSessionState(hiveConf)); client = new HiveMetaStoreClient(hiveConf); - TxnDbUtil.cleanDb(hiveConf); - TxnDbUtil.prepDb(hiveConf); // change the value for the next instance. ++uniqueIdentifier; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..23512e252e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -151,7 +151,7 @@ public void run() { recoverFailedCompactions(true); // Clean anything from the txns table that has no components left in txn_components. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 15fcfc0e35..69de63b3cf 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -512,9 +512,13 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() // Check that we are cleaning up the empty aborted transactions GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); + // Create one aborted for low water mark + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); + assertEquals(3, txnList.getOpen_txnsSize()); rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); rqst.setPartitionname("bar"); @@ -529,7 +533,9 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() txnHandler.markCleaned(ci); txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); + // The open txn will became the low water mark + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); } diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 72f095d264..21bb2318cf 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -192,30 +192,39 @@ public void testAbortTxn() throws Exception { boolean gotException = false; try { txnHandler.abortTxn(new AbortTxnRequest(2)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; - //if this wasn't an empty txn, we'd get a better msg - Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); + // this is the last committed, so it is still in the txns table + Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); gotException = false; txnHandler.commitTxn(new CommitTxnRequest(3)); try { txnHandler.abortTxn(new AbortTxnRequest(3)); - } - catch(NoSuchTxnException ex) { + } catch(NoSuchTxnException ex) { gotException = true; //txn 3 is not empty txn, so we get a better msg Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); gotException = false; try { - txnHandler.abortTxn(new AbortTxnRequest(4)); + txnHandler.abortTxn(new AbortTxnRequest(2)); + } catch(NoSuchTxnException ex) { + gotException = true; + // now the second transaction is cleared and since it was empty, we do not recognize it anymore + Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); } - catch(NoSuchTxnException ex) { + Assert.assertTrue(gotException); + + gotException = false; + try { + txnHandler.abortTxn(new AbortTxnRequest(4)); + } catch(NoSuchTxnException ex) { gotException = true; Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index a1f59a8927..a82f019b8c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -793,13 +793,7 @@ public void testTimeOutReaper() throws Exception { slr = txnHandler.showLocks(new ShowLocksRequest()); Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size()); } - private static void pause(int timeMillis) { - try { - Thread.sleep(timeMillis); - } - catch (InterruptedException e) { - } - } + @Test public void exchangePartition() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f3834cca6b..5cbd0af492 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1380,6 +1380,8 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp // now compact and see if compaction still preserves the data correctness runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'"); runWorker(hiveConf); + // create a low water mark aborted transaction and clean the older ones + createAbortLowWaterMark(); runCleaner(hiveConf); // Cleaner would remove the obsolete files. // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned. @@ -1403,6 +1405,13 @@ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProp Assert.assertEquals(Arrays.asList(expectedResult), rs); } + protected void createAbortLowWaterMark() throws Exception{ + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("select * from " + Table.ACIDTBL); + Thread.sleep(1000); + runInitiator(hiveConf); + } + @Test public void testETLSplitStrategyForACID() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL"); @@ -2102,7 +2111,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); @@ -2116,7 +2125,7 @@ public void testCleanerForTxnToWriteId() throws Exception { txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 51b0fa336f..950d756296 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -479,11 +479,14 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + pause(1000); + runInitiator(hiveConf); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -500,11 +503,13 @@ public void testCompactionAbort() throws Exception { runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); - assertTableIsEmpty("TXNS"); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + Thread.sleep(1000); + runInitiator(hiveConf); assertTableIsEmpty("TXN_COMPONENTS"); runCleaner(hiveConf); - assertTableIsEmpty("TXNS"); + assertOneTxn(); assertTableIsEmpty("TXN_COMPONENTS"); } @@ -512,4 +517,8 @@ private void assertTableIsEmpty(String table) throws Exception { Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table)); } + private void assertOneTxn() throws Exception { + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1, + TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index 6525ffc00a..484312ad18 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -465,8 +465,11 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + pause(1000); + runInitiator(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); // Initiate a major compaction request on the table. runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); @@ -475,13 +478,16 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception runWorker(hiveConf); verifyDirAndResult(2, true); + // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain + pause(1000); + runInitiator(hiveConf); // Run Cleaner. runCleaner(hiveConf); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); verifyDirAndResult(0, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 1435269ed3..c84d471725 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -192,6 +192,13 @@ private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType ty t.init(stop, looped); t.run(); } + protected static void pause(int timeMillis) { + try { + Thread.sleep(timeMillis); + } + catch (InterruptedException e) { + } + } protected List runStatementOnDriver(String stmt) throws Exception { LOG.info("Running the query: " + stmt); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 80fb1aff78..bb8ed1af37 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1100,11 +1100,19 @@ public void testWriteSetTracking4() throws Exception { adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); - locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 0, locks.size()); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + // Now we can clean the write_set houseKeeper.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** * overlapping txns updating the same resource but 1st one rolls back; 2nd commits @@ -1177,10 +1185,19 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); txnMgr.commitTxn(); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); + // Now we can clean the write_set MetastoreTaskThread writeSetService = new AcidWriteSetService(); writeSetService.setConf(conf); writeSetService.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 9a9ab53fcc..fa53962871 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -103,8 +103,9 @@ public void setup() throws Exception { conf = new HiveConf(); TxnDbUtil.setConfValues(conf); TxnDbUtil.cleanDb(conf); - ms = new HiveMetaStoreClient(conf); + // this will initialize the metastore db txnHandler = TxnUtils.getTxnStore(conf); + ms = new HiveMetaStoreClient(conf); tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 1151466f8c..c38e37ac3e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -218,14 +218,14 @@ public void cleanEmptyAbortedTxns() throws Exception { req.setTxnid(txnid); LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - + txnHandler.setOpenTxnTimeOutMillis(30000); conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); - + txnHandler.setOpenTxnTimeOutMillis(1000); startInitiator(); openTxns = txnHandler.getOpenTxns(); diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3bfb0e69cb..35b0a18270 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1193,6 +1193,8 @@ public static ConfVars getMetaConf(String name) { "class is used to store and retrieve transactions and locks"), TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS, "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS, + "Time before an open transaction operation should commit, otherwise it is considered invalid and rolled back"), URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "", "If set, fully qualified class name of resolver for hive metastore uri's"), USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java index 3e56ad513c..609367cac0 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java @@ -72,4 +72,18 @@ public static boolean needsInBatching(DatabaseProduct dbType) { public static boolean hasJoinOperationOrderBug(DatabaseProduct dbType) { return dbType == DERBY || dbType == ORACLE || dbType == POSTGRES; } + public static String getHiveSchemaPostfix(DatabaseProduct dbType) { + switch (dbType) { + case SQLSERVER: + return "mssql"; + case DERBY: + case MYSQL: + case POSTGRES: + case ORACLE: + return dbType.name().toLowerCase(); + case OTHER: + default: + return null; + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 49b737ecf9..e51339cd00 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -57,9 +57,24 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object * @return List PreparedStatement objects for fully formed INSERT INTO ... statements */ + public List createInsertValuesPreparedStmt(Connection dbConn, + String tblColumns, List rows, + List> paramsList) throws SQLException { + return createInsertValuesPreparedStmt(dbConn, tblColumns, rows, paramsList, null); + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object + * @param generatedKeys list of fields to return for generatedKeys + * @return List PreparedStatement objects for fully formed INSERT INTO ... statements + */ public List createInsertValuesPreparedStmt(Connection dbConn, String tblColumns, List rows, - List> paramsList) + List> paramsList, String[] generatedKeys) throws SQLException { if (rows == null || rows.size() == 0) { return Collections.emptyList(); @@ -75,7 +90,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { try { for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) { String sql = insertStmts.get(stmtIdx); - PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null); + PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null, generatedKeys); if (paramsList != null) { int paramIdx = 1; int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx); @@ -99,7 +114,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -110,7 +125,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB. * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' @@ -263,8 +278,30 @@ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaEx * @throws SQLException */ public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters) - throws SQLException { - PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + throws SQLException { + return prepareStmtWithParameters(dbConn, sql, parameters, null); + } + + /** + * Make PreparedStatement object with list of String type parameters to be set. + * It is assumed the input sql string have the number of "?" equal to number of parameters + * passed as input. + * @param dbConn - Connection object + * @param sql - SQL statement with "?" for input parameters. + * @param parameters - List of String type parameters to be set in PreparedStatement object + * @param generatedKeys - list of fields to return at getGeneratedKeys + * @return PreparedStatement type object + * @throws SQLException + */ + public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters, + String[] generatedKeys) throws SQLException { + PreparedStatement pst; + if (generatedKeys !=null && generatedKeys.length > 0) { + pst = dbConn.prepareStatement(addEscapeCharacters(sql), generatedKeys); + } else { + pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + } + if ((parameters == null) || parameters.isEmpty()) { return pst; } @@ -292,4 +329,60 @@ public String addEscapeCharacters(String s) { return s; } + /** + * Creates an unlock statement for table locks. + * Most dbms do not have this feature, the lock will be released on the end of the transaction. + * @return null or the statement to run if the dbProduct is unknown + * @throws MetaException + */ + public String createUnLockTableStatement() throws MetaException{ + switch (dbProduct) { + case MYSQL: + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "UNLOCK TABLES"; + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return null; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + + /** + * Creates a lock table statement based on the dbProduct in shared read / exclusive mode. + * @param txnLockTable table to lock + * @param shared shared or exclusive lock + * @return + * @throws MetaException if the dbProduct is unknown + */ + public String createLockTableStatement(String txnLockTable, boolean shared) throws MetaException{ + + switch (dbProduct) { + case MYSQL: + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "LOCK TABLES \"" + txnLockTable + "\" " + (shared ? "READ" : "WRITE"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 19a95b64db..b86948b3b2 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -333,21 +333,16 @@ public long findMinOpenTxnId() throws MetaException { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + long hwm = getHighWaterMark(stmt); + long minOpenTxnId = 0; + String s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - rs.next(); - long minOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - return hwm; + try (ResultSet rs = stmt.executeQuery(s)) { + rs.next(); + minOpenTxnId = rs.getLong(1); + if (rs.wasNull()) { + return hwm; + } } //since generating new txnid uses select for update on single row in NEXT_TXN_ID assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")"; @@ -586,19 +581,28 @@ public void cleanTxnToWriteIdTable() throws MetaException { */ @Override @RetrySemantics.SafeToRetry - public void cleanEmptyAbortedTxns() throws MetaException { + public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { + LOG.info("Start to clean empty aborted or committed TXNS"); try { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - //Aborted is a terminal state, so nothing about the txn can change + //Aborted and committed are terminal states, so nothing about the txn can change //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + /** + * Only delete aborted transaction in a way that guarantees two things: + * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window + * 2. never deletes the maximum TXN that is before the TXN_OPENTXN_TIMEOUT window + */ + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + + " \"TXN_ID\" < (SELECT MAX(\"TXN_ID\") FROM \"TXNS\"" + + " WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(getDbProduct()) + " - " + getOpenTxnTimeOutMillis() + "))"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -615,16 +619,15 @@ public void cleanEmptyAbortedTxns() throws MetaException { // Delete from TXNS. prefix.append("DELETE FROM \"TXNS\" WHERE "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); int rc = stmt.executeUpdate(query); - LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); + LOG.info("Removed " + rc + " empty Aborted and Committed transactions from TXNS"); } - LOG.info("Aborted transactions removed from TXNS: " + txnids); + LOG.info("Aborted and committed transactions removed from TXNS: " + txnids); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -638,7 +641,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { close(rs, stmt, dbConn); } } catch (RetryException e) { - cleanEmptyAbortedTxns(); + cleanEmptyAbortedAndCommittedTxns(); } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 620c77e589..f9cf7f104b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.metastore.txn; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; import java.sql.Connection; import java.sql.Driver; import java.sql.PreparedStatement; @@ -29,12 +32,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumMap; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Scanner; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.IMetaStoreSchemaInfo; +import org.apache.hadoop.hive.metastore.MetaStoreSchemaInfoFactory; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -83,318 +92,38 @@ public static void setConfValues(Configuration conf) { MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, true); } + /** + * Prepares the metastore database for unit tests. + * Runs the latest init schema against the database configured in the CONNECT_URL_KEY param. + * Ignores any duplication (table, index etc.) So it can be called multiple times for the same database. + * @param conf Metastore configuration + * @throws Exception + */ public static synchronized void prepDb(Configuration conf) throws Exception { - // This is a bogus hack because it copies the contents of the SQL file - // intended for creating derby databases, and thus will inexorably get - // out of date with it. I'm open to any suggestions on how to make this - // read the file in a build friendly way. - Connection conn = null; Statement stmt = null; try { conn = getConnection(conf); + String s = conn.getMetaData().getDatabaseProductName(); + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(s); stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + - " TXN_STATE char(1) NOT NULL," + - " TXN_STARTED bigint NOT NULL," + - " TXN_LAST_HEARTBEAT bigint NOT NULL," + - " TXN_USER varchar(128) NOT NULL," + - " TXN_HOST varchar(128) NOT NULL," + - " TXN_TYPE integer)"); - - stmt.execute("CREATE TABLE TXN_COMPONENTS (" + - " TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID)," + - " TC_DATABASE varchar(128) NOT NULL," + - " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767)," + - " TC_OPERATION_TYPE char(1) NOT NULL," + - " TC_WRITEID bigint)"); - stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + - " CTC_TXNID bigint NOT NULL," + - " CTC_DATABASE varchar(128) NOT NULL," + - " CTC_TABLE varchar(128)," + - " CTC_PARTITION varchar(767)," + - " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," + - " CTC_WRITEID bigint," + - " CTC_UPDATE_DELETE char(1) NOT NULL)"); - stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); - - stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" + - " T2W_TXNID bigint NOT NULL," + - " T2W_DATABASE varchar(128) NOT NULL," + - " T2W_TABLE varchar(256) NOT NULL," + - " T2W_WRITEID bigint NOT NULL)"); - stmt.execute("CREATE TABLE NEXT_WRITE_ID (" + - " NWI_DATABASE varchar(128) NOT NULL," + - " NWI_TABLE varchar(256) NOT NULL," + - " NWI_NEXT bigint NOT NULL)"); - - stmt.execute("CREATE TABLE MIN_HISTORY_LEVEL (" + - " MHL_TXNID bigint NOT NULL," + - " MHL_MIN_OPEN_TXNID bigint NOT NULL," + - " PRIMARY KEY(MHL_TXNID))"); - - stmt.execute("CREATE TABLE HIVE_LOCKS (" + - " HL_LOCK_EXT_ID bigint NOT NULL," + - " HL_LOCK_INT_ID bigint NOT NULL," + - " HL_TXNID bigint NOT NULL," + - " HL_DB varchar(128) NOT NULL," + - " HL_TABLE varchar(128)," + - " HL_PARTITION varchar(767)," + - " HL_LOCK_STATE char(1) NOT NULL," + - " HL_LOCK_TYPE char(1) NOT NULL," + - " HL_LAST_HEARTBEAT bigint NOT NULL," + - " HL_ACQUIRED_AT bigint," + - " HL_USER varchar(128) NOT NULL," + - " HL_HOST varchar(128) NOT NULL," + - " HL_HEARTBEAT_COUNT integer," + - " HL_AGENT_INFO varchar(128)," + - " HL_BLOCKEDBY_EXT_ID bigint," + - " HL_BLOCKEDBY_INT_ID bigint," + - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); - stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); - - stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); - - stmt.execute("CREATE TABLE COMPACTION_QUEUE (" + - " CQ_ID bigint PRIMARY KEY," + - " CQ_DATABASE varchar(128) NOT NULL," + - " CQ_TABLE varchar(128) NOT NULL," + - " CQ_PARTITION varchar(767)," + - " CQ_STATE char(1) NOT NULL," + - " CQ_TYPE char(1) NOT NULL," + - " CQ_TBLPROPERTIES varchar(2048)," + - " CQ_WORKER_ID varchar(128)," + - " CQ_START bigint," + - " CQ_RUN_AS varchar(128)," + - " CQ_HIGHEST_WRITE_ID bigint," + - " CQ_META_INFO varchar(2048) for bit data," + - " CQ_HADOOP_JOB_ID varchar(32)," + - " CQ_ERROR_MESSAGE clob)"); - - stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); - - stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + - " CC_ID bigint PRIMARY KEY," + - " CC_DATABASE varchar(128) NOT NULL," + - " CC_TABLE varchar(128) NOT NULL," + - " CC_PARTITION varchar(767)," + - " CC_STATE char(1) NOT NULL," + - " CC_TYPE char(1) NOT NULL," + - " CC_TBLPROPERTIES varchar(2048)," + - " CC_WORKER_ID varchar(128)," + - " CC_START bigint," + - " CC_END bigint," + - " CC_RUN_AS varchar(128)," + - " CC_HIGHEST_WRITE_ID bigint," + - " CC_META_INFO varchar(2048) for bit data," + - " CC_HADOOP_JOB_ID varchar(32)," + - " CC_ERROR_MESSAGE clob)"); - - stmt.execute("CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (" - + "CC_DATABASE,CC_TABLE,CC_PARTITION)"); - - stmt.execute("CREATE TABLE AUX_TABLE (" + - " MT_KEY1 varchar(128) NOT NULL," + - " MT_KEY2 bigint NOT NULL," + - " MT_COMMENT varchar(255)," + - " PRIMARY KEY(MT_KEY1, MT_KEY2))"); - - stmt.execute("CREATE TABLE WRITE_SET (" + - " WS_DATABASE varchar(128) NOT NULL," + - " WS_TABLE varchar(128) NOT NULL," + - " WS_PARTITION varchar(767)," + - " WS_TXNID bigint NOT NULL," + - " WS_COMMIT_ID bigint NOT NULL," + - " WS_OPERATION_TYPE char(1) NOT NULL)" - ); - - stmt.execute("CREATE TABLE REPL_TXN_MAP (" + - " RTM_REPL_POLICY varchar(256) NOT NULL, " + - " RTM_SRC_TXN_ID bigint NOT NULL, " + - " RTM_TARGET_TXN_ID bigint NOT NULL, " + - " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))" - ); - - stmt.execute("CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (" + - " MRL_TXN_ID BIGINT NOT NULL, " + - " MRL_DB_NAME VARCHAR(128) NOT NULL, " + - " MRL_TBL_NAME VARCHAR(256) NOT NULL, " + - " MRL_LAST_HEARTBEAT BIGINT NOT NULL, " + - " PRIMARY KEY(MRL_TXN_ID))" - ); - - try { - stmt.execute("CREATE TABLE \"APP\".\"TBLS\" (\"TBL_ID\" BIGINT NOT NULL, " + - " \"CREATE_TIME\" INTEGER NOT NULL, \"DB_ID\" BIGINT, \"LAST_ACCESS_TIME\" INTEGER NOT NULL, " + - " \"OWNER\" VARCHAR(767), \"OWNER_TYPE\" VARCHAR(10), \"RETENTION\" INTEGER NOT NULL, " + - " \"SD_ID\" BIGINT, \"TBL_NAME\" VARCHAR(256), \"TBL_TYPE\" VARCHAR(128), " + - " \"VIEW_EXPANDED_TEXT\" LONG VARCHAR, \"VIEW_ORIGINAL_TEXT\" LONG VARCHAR, " + - " \"IS_REWRITE_ENABLED\" CHAR(1) NOT NULL DEFAULT \'N\', " + - " \"WRITE_ID\" BIGINT DEFAULT 0, " + - " PRIMARY KEY (TBL_ID))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("TBLS table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"DBS\" (\"DB_ID\" BIGINT NOT NULL, \"DESC\" " + - "VARCHAR(4000), \"DB_LOCATION_URI\" VARCHAR(4000) NOT NULL, \"NAME\" VARCHAR(128), " + - "\"OWNER_NAME\" VARCHAR(128), \"OWNER_TYPE\" VARCHAR(10), " + - "\"CTLG_NAME\" VARCHAR(256) NOT NULL, PRIMARY KEY (DB_ID))"); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("TBLS table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"PARTITIONS\" (" + - " \"PART_ID\" BIGINT NOT NULL, \"CREATE_TIME\" INTEGER NOT NULL, " + - " \"LAST_ACCESS_TIME\" INTEGER NOT NULL, \"PART_NAME\" VARCHAR(767), " + - " \"SD_ID\" BIGINT, \"TBL_ID\" BIGINT, " + - " \"WRITE_ID\" BIGINT DEFAULT 0, " + - " PRIMARY KEY (PART_ID))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("PARTITIONS table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"TABLE_PARAMS\" (" + - " \"TBL_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT NULL, " + - " \"PARAM_VALUE\" CLOB, " + - " PRIMARY KEY (TBL_ID, PARAM_KEY))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("TABLE_PARAMS table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"PARTITION_PARAMS\" (" + - " \"PART_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT NULL, " + - " \"PARAM_VALUE\" CLOB, " + - " PRIMARY KEY (PART_ID, PARAM_KEY))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("PARTITION_PARAMS table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\" VARCHAR(256) NOT " + - - "NULL, \"NEXT_VAL\" BIGINT NOT NULL)" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("SEQUENCE_TABLE table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\" BIGINT NOT NULL, " + - - "\"NEXT_EVENT_ID\" BIGINT NOT NULL)" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("NOTIFICATION_SEQUENCE table already exist, ignoring"); - } else { - throw e; - } - } - - try { - stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_LOG\" (\"NL_ID\" BIGINT NOT NULL, " + - "\"DB_NAME\" VARCHAR(128), \"EVENT_ID\" BIGINT NOT NULL, \"EVENT_TIME\" INTEGER NOT" + - - " NULL, \"EVENT_TYPE\" VARCHAR(32) NOT NULL, \"MESSAGE\" CLOB, \"TBL_NAME\" " + - "VARCHAR" + - "(256), \"MESSAGE_FORMAT\" VARCHAR(16))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("NOTIFICATION_LOG table already exist, ignoring"); - } else { - throw e; - } + if (checkDbPrepared(stmt)) { + return; } - - stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " + - "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', " + - "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" + - ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" + - ".model.MNotificationLog')"); - - stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" + - " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " + - "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")"); - - try { - stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" + - "WNL_ID bigint NOT NULL," + - "WNL_TXNID bigint NOT NULL," + - "WNL_WRITEID bigint NOT NULL," + - "WNL_DATABASE varchar(128) NOT NULL," + - "WNL_TABLE varchar(128) NOT NULL," + - "WNL_PARTITION varchar(1024) NOT NULL," + - "WNL_TABLE_OBJ clob NOT NULL," + - "WNL_PARTITION_OBJ clob," + - "WNL_FILES clob," + - "WNL_EVENT_TIME integer NOT NULL," + - "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))" - ); - } catch (SQLException e) { - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring"); - } else { - throw e; - } + String schemaRootPath = getSchemaRootPath(); + IMetaStoreSchemaInfo metaStoreSchemaInfo = + MetaStoreSchemaInfoFactory.get(conf, schemaRootPath, DatabaseProduct.getHiveSchemaPostfix(dbProduct)); + String initFile = metaStoreSchemaInfo.generateInitFileName(null); + try (InputStream is = new FileInputStream( + metaStoreSchemaInfo.getMetaStoreScriptDir() + File.separator + initFile)) { + importSQL(stmt, is); } - - stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " + - "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " + - "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" + - ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" + - ".model.MTxnWriteNotificationLog')"); } catch (SQLException e) { try { conn.rollback(); } catch (SQLException re) { LOG.error("Error rolling back: " + re.getMessage()); } - - // Another thread might have already created these tables. - if (e.getMessage() != null && e.getMessage().contains("already exists")) { - LOG.info("Txn tables already exist, returning"); - return; - } - // This might be a deadlock, if so, let's retry if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { LOG.warn("Caught deadlock, retrying db creation"); @@ -408,6 +137,77 @@ public static synchronized void prepDb(Configuration conf) throws Exception { } } + private static boolean checkDbPrepared(Statement stmt) { + /* + * The prepDb is called many times in some test cases involuntary, + * since it is always called in TxnHandler.setConf(). + * This could consume a lots of time, so if the TXNS table is already created we assume the database is ready. + */ + try { + stmt.execute("SELECT * FROM \"TXNS\""); + } catch (SQLException e) { + return false; + } + return true; + } + + private static void importSQL(Statement stmt, InputStream in) throws SQLException { + // function already exists, table already exists, index already exists, duplicate key + Set knownErrors = new HashSet<>(); + // derby + knownErrors.addAll(Arrays.asList("X0Y68", "X0Y32", "X0Y44", "42Z93", "23505")); + // postgres + knownErrors.addAll(Arrays.asList("42P07", "42P16", "42710")); + // mssql + knownErrors.addAll(Arrays.asList("S0000", "S0001", "23000")); + // mysql + knownErrors.addAll(Arrays.asList("42S01", "HY000")); + // oracle + knownErrors.addAll(Arrays.asList("42000")); + Scanner s = new Scanner(in); + s.useDelimiter("(;(\r)?\n)|(--.*\n)"); + while (s.hasNext()) { + String line = s.next(); + + if (line.trim().length() > 0) { + try { + LOG.info(line); + stmt.execute(line); + } catch (SQLException e) { + if (knownErrors.contains(e.getSQLState())) { + LOG.info("Ignoring sql error {}", e.getMessage()); + } else { + throw e; + } + } + } + } + } + private static String getSchemaRootPath() { + String hiveRoot = System.getProperty("hive.root"); + if (StringUtils.isNotEmpty(hiveRoot)) { + return ensurePathEndsInSlash(hiveRoot) + "standalone-metastore/metastore-server/target/tmp/"; + } else { + return ensurePathEndsInSlash(System.getProperty("test.tmp.dir", "target/tmp")); + } + } + + private static String ensurePathEndsInSlash(String path) { + if (path == null) { + throw new NullPointerException("Path cannot be null"); + } + if (path.endsWith(File.separator)) { + return path; + } else { + return path + File.separator; + } + } + + /** + * Drops the transaction related tables only, from the database configured in the CONNECT_URL_KEY param. + * @param conf Metastore configuration + * @throws Exception + */ public static void cleanDb(Configuration conf) throws Exception { int retryCount = 0; while(++retryCount <= 3) { @@ -419,7 +219,7 @@ public static void cleanDb(Configuration conf) throws Exception { stmt = conn.createStatement(); // We want to try these, whether they succeed or fail. - success &= dropIndex(stmt, "HL_TXNID_INDEX", retryCount); + success &= dropIndex(conn, stmt, "HL_TXNID_INDEX", "HIVE_LOCKS", retryCount); success &= dropTable(stmt, "TXN_COMPONENTS", retryCount); success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); @@ -453,11 +253,18 @@ public static void cleanDb(Configuration conf) throws Exception { throw new RuntimeException("Failed to clean up txn tables"); } - private static boolean dropIndex(Statement stmt, String index, int retryCount) { + private static boolean dropIndex(Connection conn, Statement stmt, String index, String table, int retryCount) { try { - stmt.execute("DROP INDEX " + index); + String s = conn.getMetaData().getDatabaseProductName(); + DatabaseProduct dbProduct = DatabaseProduct.determineDatabaseProduct(s); + if (dbProduct == MYSQL) { + stmt.execute("DROP INDEX " + index + " ON " + table); + } else { + stmt.execute("DROP INDEX " + index); + } } catch (SQLException e) { - if (!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { + if (!(("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode()) || + ("42000".equals(e.getSQLState()) && 1091 == e.getErrorCode()))) { //42X65/3000 means index doesn't exist LOG.error("Unable to drop index {} {} State={} code={} retryCount={}", index, e.getMessage(), e.getSQLState(), e.getErrorCode(), retryCount); @@ -661,4 +468,28 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { } return affectedRowsByQuery; } + + /** + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements. + * @param dbProduct DBMS type + * @return true if supports + * @throws MetaException + */ + public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException { + switch (dbProduct) { + case DERBY: + case SQLSERVER: + // The getGeneratedKeys is not supported for multi line insert + return false; + case ORACLE: + case MYSQL: + case POSTGRES: + return true; + case OTHER: + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c3a0..a9c2a4ff5f 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -51,6 +51,7 @@ import javax.sql.DataSource; +import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @@ -68,7 +69,59 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; @@ -166,8 +219,10 @@ static final protected char MINOR_TYPE = 'i'; // Transaction states - static final protected char TXN_ABORTED = 'a'; - static final protected char TXN_OPEN = 'o'; + protected static final char TXN_ABORTED = 'a'; + protected static final char TXN_OPEN = 'o'; + protected static final char TXN_COMMITTED = 'c'; + //todo: make these like OperationType and remove above char constants enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} @@ -188,6 +243,9 @@ private static DataSource connPoolMutex; private static boolean doRetryOnConnPool = false; + private static final String MANUAL_RETRY = "ManualRetry"; + private static final String TXN_LOCK_TABLE = "NEXT_TXN_ID"; + // Query definitions private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + @@ -269,6 +327,8 @@ char getSqlConst() { // (End user) Transaction timeout, in milliseconds. private long timeout; + // Timeout for opening a transaction + private long openTxnTimeOutMillis; private String identifierQuoteString; // quotes to use for quoting tables, where necessary private long retryInterval; @@ -348,6 +408,8 @@ public void setConf(Configuration conf) { deadlockRetryInterval = retryInterval / 10; maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( TransactionalMetaStoreEventListener.class, @@ -358,6 +420,9 @@ public void setConf(Configuration conf) { throw new RuntimeException(e); } } + protected static DatabaseProduct getDbProduct() { + return dbProduct; + } @Override public Configuration getConf() { @@ -368,59 +433,82 @@ public Configuration getConf() { @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { - // We need to figure out the current transaction number and the list of + // We need to figure out the HighWaterMark and the list of // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // database we'll look at the highWareMark first. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** - * This method can run at READ_COMMITTED as long as long as - * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. - * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with - * adding corresponding entries into TXNS. The reason is that any txnid below HWM - * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + /* + * This method need guarantees from + * {@link #openTxns(OpenTxnRequest)} and {@link #commitTxn(CommitTxnRequest)}. + * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark + * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark. + * Every transaction that is not found between these will be considered as open, since it may appear later. + * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and + * commitTxn must ensure, that no committed transaction will be removed before the time period expires. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); List txnInfos = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + - "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm; + + String s = + "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\"" + + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* The LowWaterMark query must be the last one to be consistent with READ_COMMITTED. + * Between the two queries a cleaner might have run and removed aborted or committed transactions and so + * raised the LWM, but we will just ignore those committed transactions. + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + /* + * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator + * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table. + * If there is a pending openTxns, that is already acquired it's sequenceId but not yet committed the insert + * into the TXNS table, will have either a lower txn_id than HWM and will be listed in the openTxn list, + * or will have a higher txn_id and don't effect this getOpenTxns() call. + */ + long hwm = 0; + long openTxnLowBoundary = lowWaterMark; + boolean overLowWaterMark = false; + while (rs.next()) { + long txnId = rs.getLong(1); + hwm = txnId; + if (txnId > lowWaterMark) { + // From this point we will consider every gap as an open transaction + overLowWaterMark = true; + } + if (overLowWaterMark) { + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + openTxnLowBoundary++; + } + + } char c = rs.getString(2).charAt(0); TxnState state; switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; + case TXN_COMMITTED: + // This is only here, to avoid adding this txnId as possible gap + continue; - case TXN_OPEN: - state = TxnState.OPEN; - break; + case TXN_ABORTED: + state = TxnState.ABORTED; + break; - default: - throw new MetaException("Unexpected transaction state " + c + - " found in txns table"); + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); txnInfo.setStartedTime(rs.getLong(5)); txnInfo.setLastHeartbeatTime(rs.getLong(6)); txnInfos.add(txnInfo); @@ -432,8 +520,8 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "getOpenTxnsInfo"); - throw new MetaException("Unable to select from transaction database: " + getMessage(e) - + StringUtils.stringifyException(e)); + throw new MetaException( + "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } @@ -442,46 +530,59 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { } } + @Override @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // We need to figure out the current transaction number and the list of open transactions. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** + /* * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); + + List openList = new ArrayList<>(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\""; + String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* + * The LowWaterMark query must be the last one to be consistent with READ_COMMITTED. + * Between the two queries a cleaner might have run and removed aborted or committed transactions and so + * raised the LWM, but we will just ignore those. + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + long hwm = 0; + long openTxnLowBoundary = lowWaterMark; + boolean overLowWaterMark = false; long minOpenTxn = Long.MAX_VALUE; BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); + hwm = txnId; + if (txnId > lowWaterMark) { + // From this point we will consider every gap as an open transaction + overLowWaterMark = true; + } + if (overLowWaterMark) { + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + openList.add(openTxnLowBoundary); + minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); + openTxnLowBoundary++; + } + + } char txnState = rs.getString(2).charAt(0); + if (txnState == TXN_COMMITTED) { + continue; + } if (txnState == TXN_OPEN) { minOpenTxn = Math.min(minOpenTxn, txnId); } @@ -497,7 +598,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { dbConn.rollback(); ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if(minOpenTxn < Long.MAX_VALUE) { + if (minOpenTxn < Long.MAX_VALUE) { otr.setMin_open_txn(minOpenTxn); } return otr; @@ -544,14 +645,23 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + TxnLockHandle txnLock = null; try { - lockInternal(); - /** + // lockInternal(); + /* * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running. - * SELECT ... FOR UPDATE is used to prevent - * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. + * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause + * performance degradation with high transaction load. + * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every + * transaction missing from the TXNS table in that period open, and prevent opening transaction outside + * the period. + * Example: At t[0] there is one open transaction in the TXNS table, T[1]. + * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. + * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. + * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s + * open transaction list. T[1] because it is presented as open in TXNS, + * T[2] because it is a missing sequence. * * In the current design, there can be several metastore instances running in a given Warehouse. * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, @@ -569,20 +679,56 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); + /* + * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. + * This is achieved by requesting a shared table lock here, and an exclusive one at commit. + * Since table locks are working in Derby, we don't need the lockInternal call here. + * Example: Suppose we have two transactions with update like x = x+1. + * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] + * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], + * and we will have a lost update problem + */ + txnLock = acquireTxnLock(true); + // Measure the time from acquiring the sequence value, till committing in the TXNS table + StopWatch generateTransactionWatch = new StopWatch(); + generateTransactionWatch.start(); + List txnIds = openTxns(dbConn, stmt, rqst); LOG.debug("Going to commit"); dbConn.commit(); + generateTransactionWatch.stop(); + long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS); + TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; + if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) { + /* + * The commit was too slow, we can not allow this to continue (except if it is read only, + * since that can not cause dirty reads). + * When calculating the snapshot for a given transaction, we look back for possible open transactions + * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period. + * We can not allow a write transaction, that was slower than that to continue, + * because there can be other transactions running, that didn't considered this transactionId open, + * this could cause dirty reads. + */ + LOG.info("OpenTxnTimeOut exceeded commit duration {}, deleting transactionIds: {}", elapsedMillis, txnIds); + deleteInvalidOpenTransactions(dbConn, txnIds); + /* + * We do not throw RetryException directly, to not circumvent the max retry limit + */ + throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY); + } return new OpenTxnsResponse(txnIds); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); - unlockInternal(); + if (txnLock != null) { + txnLock.releaseLocks(); + } + // unlockInternal(); } } catch (RetryException e) { return openTxns(rqst); @@ -592,7 +738,6 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { private List openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) throws SQLException, MetaException { int numTxns = rqst.getNum_txns(); - ResultSet rs = null; List insertPreparedStmts = null; TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; try { @@ -611,40 +756,77 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } - String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - List txnIds = new ArrayList<>(numTxns); + /* + * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert. + * But it is support every used dbms for single line insert, even if the metadata says otherwise. + * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field, + * then the keys are selected beck with that batchid. + */ + boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct); + genKeySupport = genKeySupport || (numTxns == 1); List rows = new ArrayList<>(); List params = new ArrayList<>(); params.add(rqst.getUser()); params.add(rqst.getHostname()); + String batchId = "noGenKeySupport"; + if (!genKeySupport) { + params.add(batchId); + } List> paramsList = new ArrayList<>(numTxns); - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); + for (long i = 0; i < numTxns; i++) { + if (genKeySupport) { + rows.add(quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + TxnDbUtil.getEpochFn(dbProduct) + + ",?,?," + txnType.getValue()); + } else { + rows.add(quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + TxnDbUtil.getEpochFn(dbProduct) + + ",?,?," + txnType.getValue() + ",?"); + } paramsList.add(params); } + String tblColumns; + if (genKeySupport) { + tblColumns = "\"TXNS\" (\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")"; + } else { + tblColumns = "\"TXNS\" (\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\", \"TXN_META_INFO\")"; + } + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); + tblColumns, rows, paramsList, genKeySupport ? new String[] {"TXN_ID"} : null); + for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + pst.executeUpdate(); + if (genKeySupport) { + try (ResultSet generatedKeys = pst.getGeneratedKeys()) { + while (generatedKeys.next()) { + txnIds.add(generatedKeys.getLong(1)); + } + } + } else { + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_META_INFO\" = " + quoteString(batchId); + try (ResultSet rs = stmt.executeQuery(s)) { + while (rs.next()) { + txnIds.add(rs.getLong(1)); + } + } + s = "UPDATE \"TXNS\" SET \"TXN_META_INFO\" = NULL WHERE \"TXN_META_INFO\" = " + quoteString(batchId); + stmt.executeUpdate(s); + } + } + + assert txnIds.size() == numTxns; + long first = Long.MAX_VALUE; + for (Long txn : txnIds) { + if (txn < first) { + first = txn; + } } + String s; // Need to register minimum open txnid for current transactions into MIN_HISTORY table. // For a single txn we can do it in a single insert. With multiple txns calculating the // minOpenTxnId for every insert is not cost effective, so caching the value @@ -697,7 +879,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { paramsList.clear(); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { - rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); paramsList.add(params); } @@ -720,10 +902,94 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { pst.close(); } } - close(rs); } } + public void deleteInvalidOpenTransactions(Connection dbConn, List txnIds) throws MetaException { + if (txnIds.size() == 0) { + return; + } + try { + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + prefix.append("DELETE FROM \"TXNS\" WHERE "); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false); + for (String s : queries) { + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + } + } catch (RetryException ex) { + deleteInvalidOpenTransactions(dbConn, txnIds); + } + } + + @Override + public long getOpenTxnTimeOutMillis() { + return openTxnTimeOutMillis; + } + + @Override + public void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis) { + this.openTxnTimeOutMillis = openTxnTimeOutMillis; + } + + protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException { + long maxTxnId; + String s = + "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"" + " WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - " + + openTxnTimeOutMillis + ")"; + try (Statement stmt = dbConn.createStatement()) { + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) { + if (!maxTxnIdRs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + maxTxnId = maxTxnIdRs.getLong(1); + if (maxTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + } + return maxTxnId; + } + + protected long getHighWaterMark(Statement stmt) throws SQLException, MetaException { + String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\""; + LOG.debug("Going to execute query <" + s + ">"); + long maxOpenTxnId; + try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) { + if (!maxOpenTxnIdRs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + maxOpenTxnId = maxOpenTxnIdRs.getLong(1); + if (maxOpenTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + return maxOpenTxnId; + } + private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Connection dbConn) throws SQLException { PreparedStatement pst = null; @@ -749,7 +1015,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); return targetTxnIdList; - } catch (SQLException e) { + } catch (SQLException e) { LOG.warn("failed to get target txn ids " + e.getMessage()); throw e; } finally { @@ -1123,7 +1389,8 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - ResultSet commitIdRs = null, rs; + ResultSet rs = null; + TxnLockHandle txnLock = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -1193,11 +1460,8 @@ public void commitTxn(CommitTxnRequest rqst) * at the same time and no new txns start until all 3 commit. * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. */ - commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"")); - if (!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); - } - long commitId = commitIdRs.getLong(1); + txnLock = acquireTxnLock(false); + long commitId = getHighWaterMark(stmt); Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); /** * "select distinct" is used below because @@ -1269,9 +1533,8 @@ public void commitTxn(CommitTxnRequest rqst) } else { //no conflicting operations, proceed with the rest of commit sequence } - } - else { - /** + } else { + /* * current txn didn't update/delete anything (may have inserted), so just proceed with commit * * We only care about commit id for write txns, so for RO (when supported) txns we don't @@ -1281,6 +1544,7 @@ public void commitTxn(CommitTxnRequest rqst) * If RO < W, then there is no reads-from relationship. * In replication flow we don't expect any write write conflict as it should have been handled at source. */ + assert true; } String s; @@ -1378,7 +1642,10 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(commitIdRs, stmt, dbConn); + close(null, stmt, dbConn); + if (txnLock != null) { + txnLock.releaseLocks(); + } unlockInternal(); } } catch (RetryException e) { @@ -1390,7 +1657,9 @@ private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLExc List queries = Arrays.asList( "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, - "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, + // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate + "UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid, + // "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); executeQueriesInBatch(stmt, queries, conf); @@ -1501,7 +1770,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } unlockInternal(); @@ -1543,7 +1812,9 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; List params = Arrays.asList(names[0], names[1]); - String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; + String s = + "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND " + + "\"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = "+ writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -2011,46 +2282,36 @@ public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) @Override @RetrySemantics.SafeToRetry - public void performWriteSetGC() { + public void performWriteSetGC() throws MetaException{ Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""); - if(!rs.next()) { - throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); - } - long highestAllocatedTxnId = rs.getLong(1); - close(rs); + + long minOpenTxn; rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); - if(!rs.next()) { + if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } - long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark - long lowestOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - //if here then there are no Open txns and highestAllocatedTxnId must be - //resolved (i.e. committed or aborted), either way - //there are no open txns with id <= highestAllocatedTxnId - //the +1 is there because "delete ..." below has < (which is correct for the case when - //there is an open txn - //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with - //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. - commitHighWaterMark = highestAllocatedTxnId + 1; - } - else { - commitHighWaterMark = lowestOpenTxnId; + minOpenTxn = rs.getLong(1); + if (rs.wasNull()) { + minOpenTxn = Long.MAX_VALUE; } + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + /** + * We try to find the highest transactionId below everything was committed or aborted. + * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, + * because it is guaranteed there won't be open transactions below that. + */ + long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1); int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); - } - finally { + } finally { close(rs, stmt, dbConn); } } @@ -4219,7 +4480,7 @@ public int compare(LockType t1, LockType t2) { private void checkQFileTestHack() { boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || - MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); + MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); // Set up the transaction/locking db in the derby metastore @@ -4227,11 +4488,7 @@ private void checkQFileTestHack() { try { TxnDbUtil.prepDb(conf); } catch (Exception e) { - // We may have already created the tables and thus don't need to redo it. - if (e.getMessage() != null && !e.getMessage().contains("already exists")) { - throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage(), e); - } + throw new RuntimeException("Unable to set up transaction database for" + " testing: " + e.getMessage(), e); } } } @@ -4585,7 +4842,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) } /** - * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot: + * Returns the state of the transaction if it's able to determine it. Some cases where it cannot: * 1. txnid was Aborted/Committed and then GC'd (compacted) * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) */ @@ -4610,6 +4867,9 @@ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, if (txnState == TXN_ABORTED) { return TxnStatus.ABORTED; } + if (txnState == TXN_COMMITTED) { + return TxnStatus.COMMITTED; + } assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; } return TxnStatus.OPEN; @@ -5011,11 +5271,15 @@ private static synchronized DataSource setupJdbcConnectionPool(Configuration con static boolean isRetryable(Configuration conf, Exception ex) { if(ex instanceof SQLException) { SQLException sqlException = (SQLException)ex; - if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) { + // Manual retry exception was thrown + return true; + } + if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) { //in MSSQL this means Communication Link Failure return true; } - if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || + if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || sqlException.getMessage().contains("consistent read failure; rollback data not available")) { return true; } @@ -5124,7 +5388,7 @@ public MutexAPI getMutexAPI() { } @Override - public LockHandle acquireLock(String key) throws MetaException { + public LockHandle acquireLock(String key) throws MetaException{ /** * The implementation here is a bit kludgey but done so that code exercised by unit tests * (which run against Derby which has no support for select for update) is as similar to @@ -5194,10 +5458,47 @@ public LockHandle acquireLock(String key) throws MetaException { return acquireLock(key); } } + + @Override public void acquireLock(String key, LockHandle handle) { //the idea is that this will use LockHandle.dbConn throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented"); } + + /** + * Acquire the global txn lock, used to mutex the openTxn and commitTxn. + * @param shared either SHARED_READ or EXCLUSIVE + * @return lockhandle to release the lock + * @throws MetaException + */ + public TxnLockHandle acquireTxnLock(boolean shared) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + try { + try { + String sqlStmt = sqlGenerator.createLockTableStatement(TXN_LOCK_TABLE, shared); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + stmt = dbConn.createStatement(); + if (LOG.isDebugEnabled()) { + LOG.debug("About to execute SQL: " + sqlStmt); + } + stmt.execute(sqlStmt); + LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared); + return new TxnLockHandle(dbConn, stmt); + } catch (SQLException ex) { + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + checkRetryable(dbConn, ex, "acquireTxnLock(" + shared + ")"); + throw new MetaException( + "Unable to lock TxnLock due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex)); + } + } catch (RetryException ex) { + return acquireTxnLock(shared); + } + + } + private static final class LockHandleImpl implements LockHandle { private final Connection dbConn; private final Statement stmt; @@ -5234,6 +5535,33 @@ public void releaseLocks() { } } + private static final class TxnLockHandle { + private final Connection dbConn; + private final Statement stmt; + + TxnLockHandle(Connection conn, Statement stmt) { + this.dbConn = conn; + this.stmt = stmt; + } + + public void releaseLocks() throws MetaException { + try { + String s = sqlGenerator.createUnLockTableStatement(); + if (s != null) { + // Most dbms do not require an other statement to release the lock, the rollback will do that + stmt.execute(s); + } + } catch (SQLException ex) { + LOG.error("Unable to release table locks", ex); + throw new MetaException("Unable release table locks " + StringUtils.stringifyException(ex)); + } + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + LOG.debug("TXN lock unlocked by " + quoteString(TxnHandler.hostname)); + } + } + private static class NoPoolConnectionPool implements DataSource { // Note that this depends on the fact that no-one in this class calls anything but // getConnection. If you want to use any of the Logger or wrap calls you'll have to diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 41d2e7924b..cb660443fa 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -396,12 +396,13 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old void cleanTxnToWriteIdTable() throws MetaException; /** - * Clean up aborted transactions from txns that have no components in txn_components. The reson such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such + * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called, + * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window. */ @RetrySemantics.SafeToRetry - void cleanEmptyAbortedTxns() throws MetaException; + void cleanEmptyAbortedAndCommittedTxns() throws MetaException; /** * This will take all entries assigned to workers @@ -452,7 +453,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * transaction metadata once it becomes unnecessary. */ @RetrySemantics.SafeToRetry - void performWriteSetGC(); + void performWriteSetGC() throws MetaException; /** * Determine if there are enough consecutive failures compacting a table or partition that no @@ -471,6 +472,12 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @VisibleForTesting long setTimeout(long milliseconds); + @VisibleForTesting + long getOpenTxnTimeOutMillis(); + + @VisibleForTesting + void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis); + @RetrySemantics.Idempotent MutexAPI getMutexAPI(); @@ -483,7 +490,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old */ interface MutexAPI { /** - * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * The {@code key} is name of the lock. Will acquire an exclusive lock or block. It returns * a handle which must be used to release the lock. Each invocation returns a new handle. */ LockHandle acquireLock(String key) throws MetaException; diff --git standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 05adbe9003..dcc6186a71 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -234,7 +234,8 @@ CREATE TABLE "APP"."CTLGS" ( "CREATE_TIME" INTEGER); -- Insert a default value. The location is TBD. Hive will fix this when it starts -INSERT INTO "APP"."CTLGS" VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); +INSERT INTO "APP"."CTLGS" ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME") +VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL); -- ---------------------------------------------- -- DML Statements @@ -522,7 +523,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -534,6 +535,9 @@ CREATE TABLE TXNS ( TXN_TYPE integer ); +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 35a2e641b2..877b97f315 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -64,5 +64,17 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +ALTER TABLE TXNS DROP COLUMN TXN_ID; +ALTER TABLE TXNS ADD COLUMN TXN_ID BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY (START WITH 1, INCREMENT BY 1); +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +-- TODO +ALTER TABLE TXNS ALTER TXN_ID RESTART WITH 1000000000; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index f3c74bf74f..ba99d76a60 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1102,21 +1102,24 @@ CREATE TABLE NEXT_TXN_ID( INSERT INTO NEXT_TXN_ID VALUES(1); CREATE TABLE TXNS( - TXN_ID bigint NOT NULL, - TXN_STATE char(1) NOT NULL, - TXN_STARTED bigint NOT NULL, - TXN_LAST_HEARTBEAT bigint NOT NULL, - TXN_USER nvarchar(128) NOT NULL, - TXN_HOST nvarchar(128) NOT NULL, + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, TXN_AGENT_INFO nvarchar(128) NULL, TXN_META_INFO nvarchar(128) NULL, TXN_HEARTBEAT_COUNT int NULL, TXN_TYPE int NULL, PRIMARY KEY CLUSTERED ( - TXN_ID ASC + TXN_ID ASC ) ); +SET IDENTITY_INSERT TXNS ON; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 228bb7ca80..6a61aa987d 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -67,6 +67,52 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E -- HIVE-22995 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; + +CREATE TABLE TMP_TXNS( + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, + TXN_AGENT_INFO nvarchar(128) NULL, + TXN_META_INFO nvarchar(128) NULL, + TXN_HEARTBEAT_COUNT int NULL, + TXN_TYPE int NULL, +PRIMARY KEY CLUSTERED +( + TXN_ID ASC +) +); + +SET IDENTITY_INSERT TMP_TXNS ON; +INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE) +SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX; + +SET IDENTITY_INSERT TMP_TXNS OFF; + +CREATE TABLE TMP_TXN_COMPONENTS( + TC_TXNID bigint NOT NULL, + TC_DATABASE nvarchar(128) NOT NULL, + TC_TABLE nvarchar(128) NULL, + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint +); +INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS; + +DROP TABLE TXN_COMPONENTS; +DROP TABLE TXNS; + +Exec sp_rename 'TMP_TXNS', 'TXNS'; +Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS'; + +ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); +CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 626d88899e..7b61591d9c 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY AUTO_INCREMENT, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -1001,6 +1001,9 @@ CREATE TABLE TXNS ( TXN_TYPE int ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL, TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 35da7b57b3..5171826c25 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -68,6 +68,22 @@ ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ; -- HIVE-22995 ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET latin1 COLLATE latin1_bin; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +SET FOREIGN_KEY_CHECKS = 0; +ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT; +SET FOREIGN_KEY_CHECKS = 1; +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS; +SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc); +PREPARE stmt FROM @s; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index a25f4e4b71..cdc64fa054 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -972,7 +972,7 @@ ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_FK2 FOREIGN KEY (TBL_ID -- Transaction and lock tables ------------------------------ CREATE TABLE TXNS ( - TXN_ID NUMBER(19) PRIMARY KEY, + TXN_ID NUMBER(19) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, TXN_STATE char(1) NOT NULL, TXN_STARTED NUMBER(19) NOT NULL, TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL, @@ -984,6 +984,9 @@ CREATE TABLE TXNS ( TXN_TYPE number(10) ) ROWDEPENDENCIES; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '_', '_'); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID NUMBER(19) NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE VARCHAR2(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index d462b4a568..283abe15fd 100644 --- standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -68,6 +68,17 @@ ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19); -- HIVE-22995 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS; +DECLARE max_txn NUMBER; +BEGIN + SELECT MAX(TXN_ID) + 1 INTO max_txn FROM TXNS; + EXECUTE IMMEDIATE 'CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH ' || max_txn || ' CACHE 20'; +END; + +ALTER TABLE TXNS MODIFY TXN_ID default TXNS_TXN_ID_SEQ.nextval; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 206634085c..d6a2ce4d49 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC; -- Transaction and lock tables ------------------------------ CREATE TABLE "TXNS" ( - "TXN_ID" bigint PRIMARY KEY, + "TXN_ID" bigserial PRIMARY KEY, "TXN_STATE" char(1) NOT NULL, "TXN_STARTED" bigint NOT NULL, "TXN_LAST_HEARTBEAT" bigint NOT NULL, @@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" ( "TXN_HEARTBEAT_COUNT" integer, "TXN_TYPE" integer ); +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE "TXN_COMPONENTS" ( "TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"), diff --git standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index a50a071f34..2f750efa45 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -199,6 +199,13 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +CREATE SEQUENCE TXNS_TXN_ID_SEQ MINVALUE 0 OWNED BY TXNS.TXN_ID; +select setval('TXNS_TXN_ID_SEQ', (SELECT MAX(TXN_ID) FROM TXNS)); +ALTER TABLE TXNS ALTER TXN_ID SET DEFAULT nextval('TXNS_TXN_ID_SEQ'); + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java index a6d22d19ef..cd08253c99 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java @@ -275,7 +275,7 @@ public int upgradeToLatest() { "-dbType", getDbType(), "-userName", - HIVE_USER, + getHiveUser(), "-passWord", getHivePassword(), "-url", diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java index 0b070e19ac..21c5de1fb9 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java @@ -24,7 +24,7 @@ @Override public String getDockerImageName() { - return "orangehrm/oracle-xe-11g"; + return "pvargacl/oracle-xe-18.4.0"; } @Override @@ -32,10 +32,6 @@ public String getDockerImageName() { return buildArray( "-p", "1521:1521", - "-e", - "DEFAULT_SYS_PASS=" + getDbRootPassword(), - "-e", - "ORACLE_ALLOW_REMOTE=true", "-d" ); } @@ -72,11 +68,16 @@ public String getInitialJdbcUrl() { @Override public boolean isContainerReady(String logOutput) { - return logOutput.contains("Oracle started successfully!"); + return logOutput.contains("DATABASE IS READY TO USE!"); } @Override public String getHivePassword() { return HIVE_PASSWORD; } + + @Override + public String getHiveUser() { + return "c##"+ super.getHiveUser(); + } } diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java new file mode 100644 index 0000000000..58c5095a31 --- /dev/null +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Test openTxn and getOpenTxnList calls on TxnStore. + */ +public class TestOpenTxn { + + private static final Logger LOG = LoggerFactory.getLogger(TestOpenTxn.class); + + private Configuration conf = MetastoreConf.newMetastoreConf(); + private TxnStore txnHandler; + + @Before + public void setUp() throws Exception { + // This will init the metastore db + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(conf); + } + + @Test + public void testSingleOpen() throws MetaException { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Assert.assertEquals(1, txnId); + } + + @Test + public void testGap() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + + } + + @Test + public void testGapWithOldOpen() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + Thread.sleep(1100); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + } + + @Test + public void testGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Thread.sleep(1100); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); + } + + private void deleteTransaction(long txnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID=" + txnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + // TODO before the minimum_history_level rebase, we can not run parallel tests + /* + @Test + public void testParallelOpen() throws MetaException { + final OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + + List threadsList = new ArrayList<>(); + IntStream.range(1,10).forEach(i -> { + threadsList.add(new Thread(() -> { + try { + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + LOG.info("Next txnId {} {}", txnId, System.currentTimeMillis()); + txnHandler.getOpenTxns(); + Thread.sleep(1000); + CommitTxnRequest ctr = new CommitTxnRequest(txnId); + txnHandler.commitTxn(ctr); + } catch (Exception ex) { + LOG.error("Error", ex); + } + })); + }); + threadsList.forEach( t -> t.start()); + GetOpenTxnsInfoResponse infos = txnHandler.getOpenTxnsInfo(); + Assert.assertEquals(0, infos.getOpen_txnsSize()); + } + + */ +}