diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..23512e252e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -151,7 +151,7 @@ public void run() { recoverFailedCompactions(true); // Clean anything from the txns table that has no components left in txn_components. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 15fcfc0e35..11ceffefa1 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -512,7 +512,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() // Check that we are cleaning up the empty aborted transactions GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(2, txnList.getOpen_txnsSize()); @@ -529,7 +529,7 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() txnHandler.markCleaned(ci); txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnList = txnHandler.getOpenTxns(); assertEquals(3, txnList.getOpen_txnsSize()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f3834cca6b..bd51d49f38 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -2102,7 +2102,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); @@ -2116,7 +2116,7 @@ public void testCleanerForTxnToWriteId() throws Exception { txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); - txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 80fb1aff78..e681a3d503 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.TestTxnCommands2; +import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -58,6 +59,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} @@ -1100,11 +1102,22 @@ public void testWriteSetTracking4() throws Exception { adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); - locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 0, locks.size()); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + Initiator initiator = new Initiator(); + initiator.setConf(conf); + initiator.init(new AtomicBoolean(true), new AtomicBoolean()); + initiator.run(); + // Now we can clean the write_set houseKeeper.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** * overlapping txns updating the same resource but 1st one rolls back; 2nd commits @@ -1177,10 +1190,22 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); txnMgr.commitTxn(); + /** + * The last transaction will always remain in the transaction table, so we will open an other one, + * wait for the timeout period to exceed, then start the initiator that will clean + */ + txnMgr.openTxn(ctx, "Long Running"); + Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); + Initiator initiator = new Initiator(); + initiator.setConf(conf); + initiator.init(new AtomicBoolean(true), new AtomicBoolean()); + initiator.run(); + // Now we can clean the write_set MetastoreTaskThread writeSetService = new AcidWriteSetService(); writeSetService.setConf(conf); writeSetService.run(); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); + txnMgr.rollbackTxn(); } /** diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3bfb0e69cb..35b0a18270 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1193,6 +1193,8 @@ public static ConfVars getMetaConf(String name) { "class is used to store and retrieve transactions and locks"), TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS, "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS, + "Time before an open transaction operation should commit, otherwise it is considered invalid and rolled back"), URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "", "If set, fully qualified class name of resolver for hive metastore uri's"), USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 49b737ecf9..64848a4058 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -29,6 +29,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -57,9 +58,24 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object * @return List PreparedStatement objects for fully formed INSERT INTO ... statements */ + public List createInsertValuesPreparedStmt(Connection dbConn, + String tblColumns, List rows, + List> paramsList) throws SQLException { + return createInsertValuesPreparedStmt(dbConn, tblColumns, rows, paramsList, false); + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object + * @param addGeneratedKeys set RETURN_GENERATED_KEYS param to the prepared stmt + * @return List PreparedStatement objects for fully formed INSERT INTO ... statements + */ public List createInsertValuesPreparedStmt(Connection dbConn, String tblColumns, List rows, - List> paramsList) + List> paramsList, boolean addGeneratedKeys) throws SQLException { if (rows == null || rows.size() == 0) { return Collections.emptyList(); @@ -75,7 +91,7 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { try { for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) { String sql = insertStmts.get(stmtIdx); - PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null); + PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null, addGeneratedKeys); if (paramsList != null) { int paramIdx = 1; int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx); @@ -263,8 +279,30 @@ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaEx * @throws SQLException */ public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters) + throws SQLException { + return prepareStmtWithParameters(dbConn, sql, parameters, false); + } + + /** + * Make PreparedStatement object with list of String type parameters to be set. + * It is assumed the input sql string have the number of "?" equal to number of parameters + * passed as input. + * @param dbConn - Connection object + * @param sql - SQL statement with "?" for input parameters. + * @param parameters - List of String type parameters to be set in PreparedStatement object + * @param addGeneratedKeys - set RETURN_GENERATED_KEYS param on preparedstatement + * @return PreparedStatement type object + * @throws SQLException + */ + public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters, boolean addGeneratedKeys) throws SQLException { - PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + PreparedStatement pst; + if (addGeneratedKeys) { + pst = dbConn.prepareStatement(addEscapeCharacters(sql), Statement.RETURN_GENERATED_KEYS); + } else { + pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + } + if ((parameters == null) || parameters.isEmpty()) { return pst; } @@ -292,4 +330,26 @@ public String addEscapeCharacters(String s) { return s; } + public String createLockTableStatement(String txnLockTable, boolean shared) throws MetaException{ + + switch (dbProduct) { + case MYSQL: + // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html + return "LOCK TABLES " + txnLockTable + " " + (shared ? "READ" : "WRITE"); + case POSTGRES: + // https://www.postgresql.org/docs/9.4/sql-lock.html + case DERBY: + // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html + case ORACLE: + // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm + return "LOCK TABLE " + txnLockTable + " IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE"; + case SQLSERVER: + // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15 + return "SELECT * FROM " + txnLockTable + "WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)"; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 19a95b64db..84aa9f4481 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * Extends the transaction handler with methods needed only by the compactor threads. These @@ -333,17 +334,10 @@ public long findMinOpenTxnId() throws MetaException { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + long hwm = getHighWaterMark(stmt); + String s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); rs.next(); long minOpenTxnId = rs.getLong(1); if(rs.wasNull()) { @@ -586,19 +580,28 @@ public void cleanTxnToWriteIdTable() throws MetaException { */ @Override @RetrySemantics.SafeToRetry - public void cleanEmptyAbortedTxns() throws MetaException { + public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { try { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - //Aborted is a terminal state, so nothing about the txn can change + //Aborted and committed are terminal states, so nothing about the txn can change //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + /** + * Only delete aborted transaction in a way that guarantees two things: + * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window + * 2. never deletes the maximum TXN that is before the TXN_OPENTXN_TIMEOUT window + */ + long openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf,ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + + " \"TXN_ID\" < (SELECT MAX(\"TXN_ID\") FROM \"TXNS\"" + + " WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - " + openTxnTimeOutMillis + "))" ; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -638,7 +641,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { close(rs, stmt, dbConn); } } catch (RetryException e) { - cleanEmptyAbortedTxns(); + cleanEmptyAbortedAndCommittedTxns(); } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 620c77e589..acdab663e8 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -95,13 +95,20 @@ public static synchronized void prepDb(Configuration conf) throws Exception { conn = getConnection(conf); stmt = conn.createStatement(); stmt.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + + " TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY," + " TXN_STATE char(1) NOT NULL," + " TXN_STARTED bigint NOT NULL," + " TXN_LAST_HEARTBEAT bigint NOT NULL," + " TXN_USER varchar(128) NOT NULL," + " TXN_HOST varchar(128) NOT NULL," + + " TXN_AGENT_INFO varchar(128)," + + " TXN_META_INFO varchar(128)," + + " TXN_HEARTBEAT_COUNT integer," + " TXN_TYPE integer)"); + // We always need one row in TXNS to work as low water mark for opentransaction calculation + stmt.execute("INSERT INTO TXNS " + + "(TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) " + + "VALUES(0, 'c', 0, 0, '', '')"); stmt.execute("CREATE TABLE TXN_COMPONENTS (" + " TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID)," + @@ -661,4 +668,23 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { } return affectedRowsByQuery; } + + public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException { + switch (dbProduct) { + + case DERBY: + case SQLSERVER: + // The getGeneratedKeys is not supported for multi line insert + return false; + case ORACLE: + case MYSQL: + case POSTGRES: + return true; + case OTHER: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + return false; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c3a0..fd64bd00df 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -51,6 +51,7 @@ import javax.sql.DataSource; +import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; @@ -68,7 +69,59 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; @@ -168,6 +221,8 @@ // Transaction states static final protected char TXN_ABORTED = 'a'; static final protected char TXN_OPEN = 'o'; + static final protected char TXN_COMMITTED = 'c'; + //todo: make these like OperationType and remove above char constants enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} @@ -188,6 +243,9 @@ private static DataSource connPoolMutex; private static boolean doRetryOnConnPool = false; + private static final String MANUAL_RETRY = "ManualRetry"; + private static final String TXN_LOCK_TABLE = "NEXT_TXN_ID"; + // Query definitions private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + @@ -264,11 +322,13 @@ char getSqlConst() { private int deadlockCnt; private long deadlockRetryInterval; protected Configuration conf; - private static DatabaseProduct dbProduct; + protected static DatabaseProduct dbProduct; private static SQLGenerator sqlGenerator; // (End user) Transaction timeout, in milliseconds. private long timeout; + // Timeout for opening a transaction + private long openTxnTimeOutMillis; private String identifierQuoteString; // quotes to use for quoting tables, where necessary private long retryInterval; @@ -348,6 +408,8 @@ public void setConf(Configuration conf) { deadlockRetryInterval = retryInterval / 10; maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( TransactionalMetaStoreEventListener.class, @@ -368,59 +430,75 @@ public Configuration getConf() { @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { - // We need to figure out the current transaction number and the list of + // We need to figure out the HighWaterMark and the list of // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. + // database we'll look at the highWareMark first. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - /** - * This method can run at READ_COMMITTED as long as long as - * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. - * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with - * adding corresponding entries into TXNS. The reason is that any txnid below HWM - * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + /* + * This method need guarantees from + * {@link #openTxns(OpenTxnRequest)} and {@link #commitTxn(CommitTxnRequest)}. + * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark + * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark. + * Every transaction that is not found between these will be considered as open, since it may appear later. + * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and + * commitTxn must ensure, that no committed transaction will be removed before the time period expires. */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); + long hwm = getHighWaterMark(stmt); List txnInfos = new ArrayList<>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + - "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm; + String s = + "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + + "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm + "ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* The LowWaterMark query must be the last one to be consistent with READ_COMMITTED. + * Between the two queries a cleaner might have run and removed aborted or committed transactions and so + * raised the LWM, but we will just ignore those committed transactions. + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + long openTxnLowBoundary = lowWaterMark; + boolean overLowWaterMark = false; + while (rs.next()) { + long txnId = rs.getLong(1); + if (txnId > lowWaterMark) { + // From this point we will consider every gap as an open transaction + overLowWaterMark = true; + } + if (overLowWaterMark) { + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + openTxnLowBoundary++; + } + + } char c = rs.getString(2).charAt(0); TxnState state; switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; + case TXN_COMMITTED: + // This is only here, to avoid adding this txnId as possible gap + continue; + + case TXN_ABORTED: + state = TxnState.ABORTED; + break; - case TXN_OPEN: - state = TxnState.OPEN; - break; + case TXN_OPEN: + state = TxnState.OPEN; + break; - default: - throw new MetaException("Unexpected transaction state " + c + - " found in txns table"); + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); txnInfo.setStartedTime(rs.getLong(5)); txnInfo.setLastHeartbeatTime(rs.getLong(6)); txnInfos.add(txnInfo); @@ -432,8 +510,8 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "getOpenTxnsInfo"); - throw new MetaException("Unable to select from transaction database: " + getMessage(e) - + StringUtils.stringifyException(e)); + throw new MetaException( + "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } @@ -442,6 +520,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { } } + @Override @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { @@ -454,34 +533,49 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { Statement stmt = null; ResultSet rs = null; try { - /** + /* * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); + + long hwm = getHighWaterMark(stmt); + List openList = new ArrayList<>(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\""; + String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm+ "ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + /* + * The LowWaterMark query must be the last one to be consistent with READ_COMMITTED. + * Between the two queries a cleaner might have run and removed aborted or committed transactions and so + * raised the LWM, but we will just ignore those. + */ + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + long openTxnLowBoundary = lowWaterMark; + boolean overLowWaterMark = false; long minOpenTxn = Long.MAX_VALUE; BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); + if (txnId > lowWaterMark) { + // From this point we will consider every gap as an open transaction + overLowWaterMark = true; + } + if (overLowWaterMark) { + openTxnLowBoundary++; + while (txnId > openTxnLowBoundary) { + // Add an empty open transaction for every missing value + openList.add(openTxnLowBoundary); + minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); + openTxnLowBoundary++; + } + + } char txnState = rs.getString(2).charAt(0); + if (txnState == TXN_COMMITTED) { + continue; + } if (txnState == TXN_OPEN) { minOpenTxn = Math.min(minOpenTxn, txnId); } @@ -497,7 +591,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { dbConn.rollback(); ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if(minOpenTxn < Long.MAX_VALUE) { + if (minOpenTxn < Long.MAX_VALUE) { otr.setMin_open_txn(minOpenTxn); } return otr; @@ -544,14 +638,23 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + LockHandle txnLock = null; try { - lockInternal(); - /** + // lockInternal(); + /* * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running. - * SELECT ... FOR UPDATE is used to prevent - * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * that looking at the TXNS table every open transaction could be identified below a given High Water Mark. + * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause + * performance degradation with high transaction load. + * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every + * transaction missing from the TXNS table in that period open, and prevent opening transaction outside + * the period. + * Example: At t[0] there is one open transaction in the TXNS table, T[1]. + * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10]. + * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3]. + * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s + * open transaction list. T[1] because it is presented as open in TXNS, + * T[2] because it is a missing sequence. * * In the current design, there can be several metastore instances running in a given Warehouse. * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, @@ -569,20 +672,56 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); + /* + * The openTxn and commitTxn must be mutexed, when committing a not read only transaction. + * This is achieved by requesting a shared table lock here, and an exclusive one at commit. + * Since table locks are working in Derby, we don't need the lockInternal call here. + * Example: Suppose we have two transactions with update like x = x+1. + * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3] + * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2], + * and we will have a lost update problem + */ + txnLock = acquireTxnLock(true); + // Measure the time from acquiring the sequence value, till committing in the TXNS table + StopWatch generateTransactionWatch = new StopWatch(); + generateTransactionWatch.start(); + List txnIds = openTxns(dbConn, stmt, rqst); LOG.debug("Going to commit"); dbConn.commit(); + generateTransactionWatch.stop(); + long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS); + TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; + if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) { + /* + * The commit was too slow, we can not allow this to continue (except if it is read only, + * since that can not cause dirty reads). + * When calculating the snapshot for a given transaction, we look back for possible open transactions + * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period. + * We can not allow a write transaction, that was slower than that to continue, + * because there can be other transactions running, that didn't considered this transactionId open, + * this could cause dirty reads. + */ + LOG.info("OpenTxnTimeOut exceeded, deleting transactionIds: {}", txnIds); + deleteInvalidOpenTransactions(dbConn, txnIds); + /* + * We do not throw RetryException directly, to not circumvent the max retry limit + */ + throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY); + } return new OpenTxnsResponse(txnIds); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); - unlockInternal(); + if (txnLock != null) { + txnLock.releaseLocks(); + } + // unlockInternal(); } } catch (RetryException e) { return openTxns(rqst); @@ -592,7 +731,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { private List openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) throws SQLException, MetaException { int numTxns = rqst.getNum_txns(); - ResultSet rs = null; + // ResultSet rs = null; List insertPreparedStmts = null; TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT; try { @@ -611,40 +750,90 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } - String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + // String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); + // LOG.debug("Going to execute query <" + s + ">"); + // rs = stmt.executeQuery(s); + // if (!rs.next()) { + // throw new MetaException("Transaction database not properly " + + // "configured, can't find next transaction id."); + // } + // long first = rs.getLong(1); + // s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); + // LOG.debug("Going to execute update <" + s + ">"); + // stmt.executeUpdate(s); List txnIds = new ArrayList<>(numTxns); + /* + * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert. + * But it is support every used dbms for single line insert, even if the metadata says otherwise. + * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field, + * then the keys are selected beck with that batchid. + */ + boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct); + genKeySupport = genKeySupport || (numTxns == 1); + List rows = new ArrayList<>(); List params = new ArrayList<>(); params.add(rqst.getUser()); params.add(rqst.getHostname()); + String batchId = "noGenKeySupport"; + if (!genKeySupport) { + params.add(batchId); + } List> paramsList = new ArrayList<>(numTxns); - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); + for (long i = 0; i < numTxns; i++) { + if (genKeySupport) { + rows.add(quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + TxnDbUtil.getEpochFn(dbProduct) + + ",?,?," + txnType.getValue()); + } else { + rows.add(quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + TxnDbUtil.getEpochFn(dbProduct) + + ",?,?," + txnType.getValue() + ",?"); + } paramsList.add(params); } + String tblColumns; + if (genKeySupport) { + tblColumns = "\"TXNS\" (\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")"; + } else { + tblColumns = "\"TXNS\" (\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\", \"TXN_META_INFO\")"; + } + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); + tblColumns, rows, paramsList, genKeySupport); + for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + pst.executeUpdate(); + if (genKeySupport) { + try (ResultSet generatedKeys = pst.getGeneratedKeys()) { + while (generatedKeys.next()) { + txnIds.add(generatedKeys.getLong(1)); + } + } + } else { + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_META_INFO\" = " + quoteString(batchId); + try (ResultSet rs = stmt.executeQuery(s)) { + while (rs.next()) { + txnIds.add(rs.getLong(1)); + } + } + s = "UPDATE \"TXNS\" SET \"TXN_META_INFO\" = NULL WHERE \"TXN_META_INFO\" = " + quoteString(batchId); + stmt.executeUpdate(s); + } } + assert txnIds.size() == numTxns; + long first = Long.MAX_VALUE; + for (Long txn : txnIds) { + if (txn < first) { + first = txn; + } + } + + String s; // Need to register minimum open txnid for current transactions into MIN_HISTORY table. // For a single txn we can do it in a single insert. With multiple txns calculating the // minOpenTxnId for every insert is not cost effective, so caching the value @@ -697,7 +886,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { paramsList.clear(); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { - rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); paramsList.add(params); } @@ -720,8 +909,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { pst.close(); } } - close(rs); + // close(rs); + } + } + + public void deleteInvalidOpenTransactions(Connection dbConn, List txnIds) throws MetaException { + if (txnIds.size() == 0) { + return; + } + try { + try { + Statement stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + prefix.append("DELETE FROM \"TXNS\" WHERE "); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false); + for (String s : queries) { + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); + } + } catch (RetryException ex) { + deleteInvalidOpenTransactions(dbConn, txnIds); + } + } + + @Override + public long getOpenTxnTimeOutMillis() { + return openTxnTimeOutMillis; + } + + protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException { + long maxTxnId; + String s = + "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"" + " WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - " + + openTxnTimeOutMillis + ")"; + try (Statement stmt = dbConn.createStatement()) { + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) { + if (!maxTxnIdRs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + maxTxnId = maxTxnIdRs.getLong(1); + if (maxTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } + } + } + return maxTxnId; + } + + protected long getHighWaterMark(Statement stmt) throws SQLException, MetaException { + String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\""; + LOG.debug("Going to execute query <" + s + ">"); + long maxOpenTxnId; + try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) { + if (!maxOpenTxnIdRs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + maxOpenTxnId = maxOpenTxnIdRs.getLong(1); + if (maxOpenTxnIdRs.wasNull()) { + /* + * TXNS always contains at least one transaction, + * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted + */ + throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)"); + } } + return maxOpenTxnId; } private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Connection dbConn) @@ -749,7 +1015,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); return targetTxnIdList; - } catch (SQLException e) { + } catch (SQLException e) { LOG.warn("failed to get target txn ids " + e.getMessage()); throw e; } finally { @@ -1123,7 +1389,8 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - ResultSet commitIdRs = null, rs; + ResultSet rs = null; + LockHandle txnLock = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -1193,11 +1460,8 @@ public void commitTxn(CommitTxnRequest rqst) * at the same time and no new txns start until all 3 commit. * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. */ - commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"")); - if (!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); - } - long commitId = commitIdRs.getLong(1); + txnLock = acquireTxnLock(false); + long commitId = getHighWaterMark(stmt); Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); /** * "select distinct" is used below because @@ -1269,8 +1533,7 @@ public void commitTxn(CommitTxnRequest rqst) } else { //no conflicting operations, proceed with the rest of commit sequence } - } - else { + } else { /** * current txn didn't update/delete anything (may have inserted), so just proceed with commit * @@ -1378,7 +1641,10 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(commitIdRs, stmt, dbConn); + close(null, stmt, dbConn); + if (txnLock != null) { + txnLock.releaseLocks(); + } unlockInternal(); } } catch (RetryException e) { @@ -1390,6 +1656,8 @@ private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLExc List queries = Arrays.asList( "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, + // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate + "UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid, "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); @@ -1501,7 +1769,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if(handle != null) { + if (handle != null) { handle.releaseLocks(); } unlockInternal(); @@ -1543,7 +1811,9 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; List params = Arrays.asList(names[0], names[1]); - String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; + String s = + "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + + writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -2011,46 +2281,36 @@ public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) @Override @RetrySemantics.SafeToRetry - public void performWriteSetGC() { + public void performWriteSetGC() throws MetaException{ Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""); - if(!rs.next()) { - throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); - } - long highestAllocatedTxnId = rs.getLong(1); - close(rs); + + long minOpenTxn; rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); - if(!rs.next()) { + if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } - long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark - long lowestOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - //if here then there are no Open txns and highestAllocatedTxnId must be - //resolved (i.e. committed or aborted), either way - //there are no open txns with id <= highestAllocatedTxnId - //the +1 is there because "delete ..." below has < (which is correct for the case when - //there is an open txn - //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with - //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. - commitHighWaterMark = highestAllocatedTxnId + 1; - } - else { - commitHighWaterMark = lowestOpenTxnId; + minOpenTxn = rs.getLong(1); + if (rs.wasNull()) { + minOpenTxn = Long.MAX_VALUE; } + long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn); + /** + * We try to find the highest transactionId below everything was committed or aborted. + * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary, + * because it is guaranteed there won't be open transactions below that. + */ + long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1); int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); - } - finally { + } finally { close(rs, stmt, dbConn); } } @@ -5011,11 +5271,15 @@ private static synchronized DataSource setupJdbcConnectionPool(Configuration con static boolean isRetryable(Configuration conf, Exception ex) { if(ex instanceof SQLException) { SQLException sqlException = (SQLException)ex; - if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) { + // Manual retry exception was thrown + return true; + } + if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) { //in MSSQL this means Communication Link Failure return true; } - if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || + if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || sqlException.getMessage().contains("consistent read failure; rollback data not available")) { return true; } @@ -5194,10 +5458,50 @@ public LockHandle acquireLock(String key) throws MetaException { return acquireLock(key); } } + + @Override public void acquireLock(String key, LockHandle handle) { //the idea is that this will use LockHandle.dbConn throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented"); } + + /** + * Acquire the global txn lock, used to mutex the openTxn and commitTxn + * @param shared either SHARED_READ or EXCLUSIVE + * @return lockhandle to release the lock + * @throws MetaException + */ + public LockHandle acquireTxnLock(boolean shared) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + try { + try { + String sqlStmt = sqlGenerator.createLockTableStatement(TXN_LOCK_TABLE, shared); + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + stmt = dbConn.createStatement(); + if (LOG.isDebugEnabled()) { + LOG.debug("About to execute SQL: " + sqlStmt); + } + stmt.execute(sqlStmt); + LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared); + return new TxnLockHandleImpl(dbConn, stmt); + } catch (SQLException ex) { + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + checkRetryable(dbConn, ex, "acquireTxnLock(" + shared + ")"); + throw new MetaException( + "Unable to lock TxnLock due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex)); + } finally { + unlockInternal(); + } + } catch (RetryException ex) { + return acquireTxnLock(shared); + } + + } + private static final class LockHandleImpl implements LockHandle { private final Connection dbConn; private final Statement stmt; @@ -5234,6 +5538,24 @@ public void releaseLocks() { } } + private static final class TxnLockHandleImpl implements LockHandle { + private final Connection dbConn; + private final Statement stmt; + + TxnLockHandleImpl(Connection conn, Statement stmt) { + this.dbConn = conn; + this.stmt = stmt; + } + + @Override + public void releaseLocks() { + rollbackDBConn(dbConn); + closeStmt(stmt); + closeDbConn(dbConn); + LOG.debug("TXN lock unlocked by " + quoteString(TxnHandler.hostname)); + } + } + private static class NoPoolConnectionPool implements DataSource { // Note that this depends on the fact that no-one in this class calls anything but // getConnection. If you want to use any of the Logger or wrap calls you'll have to diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 41d2e7924b..1d30593ad7 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -396,12 +396,13 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old void cleanTxnToWriteIdTable() throws MetaException; /** - * Clean up aborted transactions from txns that have no components in txn_components. The reson such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + * Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such + * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called, + * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window. */ @RetrySemantics.SafeToRetry - void cleanEmptyAbortedTxns() throws MetaException; + void cleanEmptyAbortedAndCommittedTxns() throws MetaException; /** * This will take all entries assigned to workers @@ -452,7 +453,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * transaction metadata once it becomes unnecessary. */ @RetrySemantics.SafeToRetry - void performWriteSetGC(); + void performWriteSetGC() throws MetaException; /** * Determine if there are enough consecutive failures compacting a table or partition that no @@ -471,6 +472,9 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @VisibleForTesting long setTimeout(long milliseconds); + @VisibleForTesting + long getOpenTxnTimeOutMillis(); + @RetrySemantics.Idempotent MutexAPI getMutexAPI(); @@ -483,7 +487,7 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old */ interface MutexAPI { /** - * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * The {@code key} is name of the lock. Will acquire an exclusive lock or block. It returns * a handle which must be used to release the lock. Each invocation returns a new handle. */ LockHandle acquireLock(String key) throws MetaException; diff --git standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 05adbe9003..5fc9f6b0c0 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -522,7 +522,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -534,6 +534,9 @@ CREATE TABLE TXNS ( TXN_TYPE integer ); +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 35a2e641b2..f3d3afd403 100644 --- standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -64,5 +64,14 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +ALTER TABLE TXNS DROP COLUMN TXN_ID; +-- TODO +ALTER TABLE TXNS ALTER TXN_ID RESTART WITH 1000000000; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index f3c74bf74f..a4ec120ab3 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1102,7 +1102,7 @@ CREATE TABLE NEXT_TXN_ID( INSERT INTO NEXT_TXN_ID VALUES(1); CREATE TABLE TXNS( - TXN_ID bigint NOT NULL, + TXN_ID bigint NOT NULL IDENTITY(1,1), TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -1117,6 +1117,9 @@ PRIMARY KEY CLUSTERED TXN_ID ASC ) ); +SET IDENTITY_INSERT TXNS ON; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 228bb7ca80..49bbe12592 100644 --- standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -67,6 +67,52 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E -- HIVE-22995 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; + +CREATE TABLE TMP_TXNS( + TXN_ID bigint NOT NULL IDENTITY(1,1), + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER nvarchar(128) NOT NULL, + TXN_HOST nvarchar(128) NOT NULL, + TXN_AGENT_INFO nvarchar(128) NULL, + TXN_META_INFO nvarchar(128) NULL, + TXN_HEARTBEAT_COUNT int NULL, + TXN_TYPE int NULL, +PRIMARY KEY CLUSTERED +( + TXN_ID ASC +) +); + +SET IDENTITY_INSERT TMP_TXNS ON; +INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE) +SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX; + +SET IDENTITY_INSERT TMP_TXNS OFF; + +CREATE TABLE TMP_TXN_COMPONENTS( + TC_TXNID bigint NOT NULL, + TC_DATABASE nvarchar(128) NOT NULL, + TC_TABLE nvarchar(128) NULL, + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint +); +INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS; + +DROP TABLE TXN_COMPONENTS; +DROP TABLE TXNS; + +Exec sp_rename 'TMP_TXNS', 'TXNS'; +Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS'; + +ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); +CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 626d88899e..7b61591d9c 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING -- Transaction and Lock Tables -- ---------------------------- CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, + TXN_ID bigint PRIMARY KEY AUTO_INCREMENT, TXN_STATE char(1) NOT NULL, TXN_STARTED bigint NOT NULL, TXN_LAST_HEARTBEAT bigint NOT NULL, @@ -1001,6 +1001,9 @@ CREATE TABLE TXNS ( TXN_TYPE int ) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + VALUES(0, 'c', 0, 0, '', ''); + CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint NOT NULL, TC_DATABASE varchar(128) NOT NULL, diff --git standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 35da7b57b3..5171826c25 100644 --- standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -68,6 +68,22 @@ ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ; -- HIVE-22995 ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET latin1 COLLATE latin1_bin; +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT; +UPDATE TXNS SET TXN_ID_TMP=TXN_ID; +SET FOREIGN_KEY_CHECKS = 0; +ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT; +SET FOREIGN_KEY_CHECKS = 1; +UPDATE TXNS SET TXN_ID=TXN_ID_TMP; +ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP; +SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS; +SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc); +PREPARE stmt FROM @s; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 206634085c..d6a2ce4d49 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC; -- Transaction and lock tables ------------------------------ CREATE TABLE "TXNS" ( - "TXN_ID" bigint PRIMARY KEY, + "TXN_ID" bigserial PRIMARY KEY, "TXN_STATE" char(1) NOT NULL, "TXN_STARTED" bigint NOT NULL, "TXN_LAST_HEARTBEAT" bigint NOT NULL, @@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" ( "TXN_HEARTBEAT_COUNT" integer, "TXN_TYPE" integer ); +INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST") + VALUES(0, 'c', 0, 0, '', ''); CREATE TABLE "TXN_COMPONENTS" ( "TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"), diff --git standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index a50a071f34..2f750efa45 100644 --- standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -199,6 +199,13 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000); +-- HIVE-23048 +INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST) + SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS; +CREATE SEQUENCE TXNS_TXN_ID_SEQ MINVALUE 0 OWNED BY TXNS.TXN_ID; +select setval('TXNS_TXN_ID_SEQ', (SELECT MAX(TXN_ID) FROM TXNS)); +ALTER TABLE TXNS ALTER TXN_ID SET DEFAULT nextval('TXNS_TXN_ID_SEQ'); + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0'; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java new file mode 100644 index 0000000000..552111dae3 --- /dev/null +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Test openTxn and getOpenTxnList calls on TxnStore. + */ +public class TestOpenTxn { + + static final private Logger LOG = LoggerFactory.getLogger(TestOpenTxn.class); + + private Configuration conf = MetastoreConf.newMetastoreConf(); + private TxnStore txnHandler; + + @Before + public void setUp() throws Exception { + TxnDbUtil.prepDb(conf); + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(conf); + } + + @Test + public void testSingleOpen() throws MetaException { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Assert.assertEquals(1, txnId); + } + + @Test + public void testGap() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + + } + + @Test + public void testGapWithOldOpen() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + txnHandler.openTxns(openTxnRequest); + Thread.sleep(1100); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(3, openTxns.getOpen_txnsSize()); + } + + @Test + public void testGapWithOldCommit() throws Exception { + OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + Thread.sleep(1100); + txnHandler.commitTxn(new CommitTxnRequest(first)); + long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + deleteTransaction(second); + txnHandler.openTxns(openTxnRequest); + GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); + Assert.assertEquals(2, openTxns.getOpen_txnsSize()); + } + + private void deleteTransaction(long txnId) throws SQLException { + DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf); + DataSource ds = dsp.create(conf); + Connection dbConn = ds.getConnection(); + Statement stmt = dbConn.createStatement(); + stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID=" + txnId); + dbConn.commit(); + stmt.close(); + dbConn.close(); + } + + // TODO before the minimum_history_level rebase, we can not run parallel tests + /* + @Test + public void testParallelOpen() throws MetaException { + final OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost"); + + List threadsList = new ArrayList<>(); + IntStream.range(1,10).forEach(i -> { + threadsList.add(new Thread(() -> { + try { + long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0); + LOG.info("Next txnId {} {}", txnId, System.currentTimeMillis()); + txnHandler.getOpenTxns(); + Thread.sleep(1000); + CommitTxnRequest ctr = new CommitTxnRequest(txnId); + txnHandler.commitTxn(ctr); + } catch (Exception ex) { + LOG.error("Error", ex); + } + })); + }); + threadsList.forEach( t -> t.start()); + GetOpenTxnsInfoResponse infos = txnHandler.getOpenTxnsInfo(); + Assert.assertEquals(0, infos.getOpen_txnsSize()); + } + + */ +}