diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2995afad23..405d738659 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -45,10 +45,12 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import java.util.stream.Stream; import javax.sql.DataSource; @@ -2375,14 +2377,11 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc Connection dbConn = null; try { Statement stmt = null; - PreparedStatement pStmt = null; - List insertPreparedStmts = null; - ResultSet rs = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - long txnid = rqst.getTxnid(); stmt = dbConn.createStatement(); + long txnid = rqst.getTxnid(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); @@ -2391,189 +2390,21 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc shouldNeverHappen(txnid); } } + // we insert txn components before getting the lock ID to avoid locking the next_lock_id table for too long + insertTxnComponents(txnid, rqst, dbConn); + // we insert all hive locks with a generated, negative extLockId, which later will be replaced in a single update + long tempLockId = insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst); + /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ - String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + long extLockId = getNextLockId(dbConn, stmt); + // increment lock ID, and replace the temporary ext lock ID with the real one in hive_locks + incrementLockIdAndUpdateHiveLocks(stmt, extLockId, tempLockId); - if (txnid > 0) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - // For each component in this lock request, - // add an entry to the txn_components table - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { - //we don't prevent using non-acid resources in a txn but we do lock them - continue; - } - boolean updateTxnComponents; - if(!lc.isSetOperationType()) { - //request came from old version of the client - updateTxnComponents = true;//this matches old behavior - } - else { - switch (lc.getOperationType()) { - case INSERT: - case UPDATE: - case DELETE: - if(!lc.isSetIsDynamicPartitionWrite()) { - //must be old client talking, i.e. we don't know if it's DP so be conservative - updateTxnComponents = true; - } - else { - /** - * we know this is part of DP operation and so we'll get - * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. - */ - updateTxnComponents = !lc.isIsDynamicPartitionWrite(); - } - break; - case SELECT: - updateTxnComponents = false; - break; - case NO_TXN: - /*this constant is a bit of a misnomer since we now always have a txn context. It - just means the operation is such that we don't care what tables/partitions it - affected as it doesn't trigger a compaction or conflict detection. A better name - would be NON_TRANSACTIONAL.*/ - updateTxnComponents = false; - break; - default: - //since we have an open transaction, only 4 values above are expected - throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() - + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); - } - } - if(!updateTxnComponents) { - continue; - } - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - Long writeId = null; - if (tblName != null) { - // It is assumed the caller have already allocated write id for adding/updating data to - // the acid tables. However, DDL operatons won't allocate write id and hence this query - // may return empty result sets. - // Get the write id allocated by this txn for the given table writes - s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; - pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); - LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", - quoteString(dbName), quoteString(tblName)); - rs = pStmt.executeQuery(); - if (rs.next()) { - writeId = rs.getLong(1); - } - } - rows.add(txnid + ", ?, " + - (tblName == null ? "null" : "?") + ", " + - (partName == null ? "null" : "?")+ "," + - quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + - (writeId == null ? "null" : writeId)); - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - closeStmt(pst); - } - insertPreparedStmts = null; - } - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - long intLockId = 0; - long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHB + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); - } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + - "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + - "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - } dbConn.commit(); success = true; return new ConnectionLockIdPair(dbConn, extLockId); @@ -2584,13 +2415,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, null); + closeStmt(stmt); if (!success) { /* This needs to return a "live" connection to be used by operation that follows it. Thus it only closes Connection on failure/retry. */ @@ -2603,6 +2428,226 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc return enqueueLockWithRetry(rqst); } } + + private long getNextLockId(Connection dbConn, Statement stmt) throws SQLException, MetaException { + String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet rs = stmt.executeQuery(s)) { + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + return rs.getLong(1); + } + } + + private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId, long tempLockId) throws SQLException { + String incrCmd = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); + String updateLocksCmd = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = " + extLockId + " WHERE \"HL_LOCK_EXT_ID\" = " + tempLockId; + LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">"); + stmt.addBatch(incrCmd); + stmt.addBatch(updateLocksCmd); + stmt.executeBatch(); + } + + private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn) throws SQLException { + if (txnid > 0) { + try (PreparedStatement pstmt = dbConn.prepareStatement("INSERT INTO \"TXN_COMPONENTS\" (" + + "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" + + " VALUES (?, ?, ?, ?, ?, ?)")) { + // For each component in this lock request, + // add an entry to the txn_components table + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + if (!shouldUpdateTxnComponents(txnid, rqst, lc)) { + continue; + } + collectNextTxnComponentRow(txnid, dbConn, pstmt, lc); + } + pstmt.executeBatch(); + } + } + } + + private void collectNextTxnComponentRow(long txnid, Connection dbConn, PreparedStatement pstmt, LockComponent lc) throws SQLException { + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + Long writeId = getWriteId(txnid, dbConn, dbName, tblName); + + pstmt.setLong(1, txnid); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setString(4, partName); + pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString()); + pstmt.setLong(6, writeId); + pstmt.addBatch(); + } + + private void executeTxnComponentInserts(Connection dbConn, List rows, List> paramsList) throws SQLException { + List insertPreparedStmts = Collections.emptyList(); + try { + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", + rows, paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.executeUpdate(); + } + } finally { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } + } + + private Long getWriteId(long txnid, Connection dbConn, String dbName, String tblName) throws SQLException { + if (tblName != null) { + // It is assumed the caller have already allocated write id for adding/updating data to + // the acid tables. However, DDL operatons won't allocate write id and hence this query + // may return empty result sets. + // Get the write id allocated by this txn for the given table writes + String s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + try (PreparedStatement pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); + ResultSet rs = pStmt.executeQuery()) { + if (rs.next()) { + return rs.getLong(1); + } + } + } + return null; + } + + private boolean shouldUpdateTxnComponents(long txnid, LockRequest rqst, LockComponent lc) { + if(!lc.isSetOperationType()) { + //request came from old version of the client + return true; //this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + if(!lc.isSetIsDynamicPartitionWrite()) { + //must be old client talking, i.e. we don't know if it's DP so be conservative + return true; + } + else { + /** + * we know this is part of DP operation and so we'll get + * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list + * of partitions actually chaged. + */ + return !lc.isIsDynamicPartitionWrite(); + } + case SELECT: + return false; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + return false; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + } + + private long insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException { + List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); + long generatedExtLockId = generateTemporaryLockId(); + long intLockId = 0; + long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //old version of thrift client should have (lc.isSetOperationType() == false) but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables. + // It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + collectNextHiveLockRow(txnid, rqst, rows, paramsList, generatedExtLockId, intLockId, lastHB, lc); + } + executeHiveLockInserts(dbConn, rows, paramsList); + return generatedExtLockId; + } + + /* We generate a negative value for temp lock ID so that lock checks can safely ignore these temporary entries */ + private long generateTemporaryLockId() { + return -1 * ThreadLocalRandom.current().nextLong(1000, Long.MAX_VALUE); + } + + private void collectNextHiveLockRow(long txnid, LockRequest rqst, List rows, List> paramsList, + long extLockId, long intLockId, long lastHB, LockComponent lc) { + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + char lockChar = getLockChar(lc.getType()); + + rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + + ((tblName == null) ? "null" : "?") + ", " + + ((partName == null) ? "null" : "?") + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + lastHB + ", " + + ((rqst.getUser() == null) ? "null" : "?") + ", " + + ((rqst.getHostname() == null) ? "null" : "?") + ", " + + ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; + + List params = new ArrayList<>(); + params.add(dbName); + Stream.of(tblName, partName, rqst.getUser(), rqst.getHostname(), rqst.getAgentInfo()) + .filter(Objects::nonNull) + .forEach(params::add); + paramsList.add(params); + } + + private void executeHiveLockInserts(Connection dbConn, List rows, List> paramsList) throws SQLException { + List insertPreparedStmts = Collections.emptyList(); + try { + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + + "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + + "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.executeUpdate(); + } + } finally { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } + } + + private char getLockChar(LockType lockType) { + switch (lockType) { + case EXCLUSIVE: + return LOCK_EXCLUSIVE; + case SHARED_READ: + return LOCK_SHARED; + case SHARED_WRITE: + return LOCK_SEMI_SHARED; + default: + return 'z'; + } + } + private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); }