diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 4a6fa6f620..580786832e 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -182,8 +183,6 @@ private static final int ALLOWED_REPEATED_DEADLOCKS = 10; private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); - private static final Long TEMP_HIVE_LOCK_ID = -1L; - private static final Long TEMP_COMMIT_ID = -1L; private static DataSource connPool; private static DataSource connPoolMutex; @@ -1136,6 +1135,7 @@ public void commitTxn(CommitTxnRequest rqst) String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; + long tempCommitId = generateTemporaryId(); if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy() && isUpdateOrDelete(stmt, conflictSQLSuffix)) { @@ -1151,12 +1151,12 @@ public void commitTxn(CommitTxnRequest rqst) * even if it includes all of its columns * * First insert into write_set using a temporary commitID, which will be updated in a separate call, - * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long)}}. + * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}. * This should decrease the scope of the S4U lock on the next_txn_id table. */ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); stmt.executeUpdate("INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + - " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + TEMP_COMMIT_ID + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); /** * This S4U will mutex with other commitTxn() and openTxns(). @@ -1245,7 +1245,7 @@ public void commitTxn(CommitTxnRequest rqst) } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId); + updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId); if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(rqst, stmt); } @@ -1330,12 +1330,12 @@ private void moveTxnComponentsToCompleted(Statement stmt, long txnid, char isUpd } } - private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId) throws SQLException { + private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException { List queryBatch = new ArrayList<>(5); // update write_set with real commitId if (commitId != null) { queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId + - " WHERE \"WS_COMMIT_ID\" = " + TEMP_COMMIT_ID + " AND \"WS_TXNID\" = " + txnid); + " WHERE \"WS_COMMIT_ID\" = " + tempId + " AND \"WS_TXNID\" = " + txnid); } // clean up txn related metadata if (txnType != TxnType.READ_ONLY) { @@ -2394,7 +2394,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update. This should minimize the scope of the S4U and decrease the table lock duration. */ insertTxnComponents(txnid, rqst, dbConn); - insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst); + long tempExtLockId = insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst); /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. @@ -2403,7 +2403,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ long extLockId = getNextLockIdForUpdate(dbConn, stmt); - incrementLockIdAndUpdateHiveLocks(stmt, extLockId); + incrementLockIdAndUpdateHiveLocks(stmt, extLockId, tempExtLockId); dbConn.commit(); success = true; @@ -2443,10 +2443,10 @@ private long getNextLockIdForUpdate(Connection dbConn, Statement stmt) throws SQ } } - private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId) throws SQLException { + private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId, long tempId) throws SQLException { String incrCmd = String.format(INCREMENT_NEXT_LOCK_ID_QUERY, (extLockId + 1)); // update hive locks entries with the real EXT_LOCK_ID (replace temp ID) - String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, TEMP_HIVE_LOCK_ID); + String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, tempId); LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">"); stmt.addBatch(incrCmd); stmt.addBatch(updateLocksCmd); @@ -2567,11 +2567,12 @@ private boolean shouldUpdateTxnComponent(long txnid, LockRequest rqst, LockCompo } } - private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException { + private long insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException { String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); long intLockId = 0; + long tempExtLockId = generateTemporaryId(); try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { for (LockComponent lc : rqst.getComponent()) { @@ -2589,7 +2590,7 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn intLockId++; String lockType = LockTypeUtil.getEncodingAsStr(lc.getType()); - pstmt.setLong(1, TEMP_HIVE_LOCK_ID); + pstmt.setLong(1, tempExtLockId); pstmt.setLong(2, intLockId); pstmt.setLong(3, txnid); pstmt.setString(4, normalizeCase(lc.getDbname())); @@ -2612,6 +2613,11 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn pstmt.executeBatch(); } } + return tempExtLockId; + } + + private long generateTemporaryId() { + return -1 * ThreadLocalRandom.current().nextLong(); } private static String normalizeCase(String s) {