diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 06defdb910..0f50da4fcb 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -44,6 +44,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -2389,14 +2390,11 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc Connection dbConn = null; try { Statement stmt = null; - PreparedStatement pStmt = null; - List insertPreparedStmts = null; - ResultSet rs = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - long txnid = rqst.getTxnid(); stmt = dbConn.createStatement(); + long txnid = rqst.getTxnid(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); @@ -2405,189 +2403,20 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc shouldNeverHappen(txnid); } } + /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update. + This should minimize the scope of the S4U and decrease the table lock duration. */ + insertTxnComponents(txnid, rqst, dbConn); + long tempLockId = insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst); + /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ - String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + long extLockId = getNextLockId(dbConn, stmt); + incrementLockIdAndUpdateHiveLocks(stmt, extLockId, tempLockId); - if (txnid > 0) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - // For each component in this lock request, - // add an entry to the txn_components table - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { - //we don't prevent using non-acid resources in a txn but we do lock them - continue; - } - boolean updateTxnComponents; - if(!lc.isSetOperationType()) { - //request came from old version of the client - updateTxnComponents = true;//this matches old behavior - } - else { - switch (lc.getOperationType()) { - case INSERT: - case UPDATE: - case DELETE: - if(!lc.isSetIsDynamicPartitionWrite()) { - //must be old client talking, i.e. we don't know if it's DP so be conservative - updateTxnComponents = true; - } - else { - /** - * we know this is part of DP operation and so we'll get - * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. - */ - updateTxnComponents = !lc.isIsDynamicPartitionWrite(); - } - break; - case SELECT: - updateTxnComponents = false; - break; - case NO_TXN: - /*this constant is a bit of a misnomer since we now always have a txn context. It - just means the operation is such that we don't care what tables/partitions it - affected as it doesn't trigger a compaction or conflict detection. A better name - would be NON_TRANSACTIONAL.*/ - updateTxnComponents = false; - break; - default: - //since we have an open transaction, only 4 values above are expected - throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() - + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); - } - } - if(!updateTxnComponents) { - continue; - } - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - Long writeId = null; - if (tblName != null) { - // It is assumed the caller have already allocated write id for adding/updating data to - // the acid tables. However, DDL operatons won't allocate write id and hence this query - // may return empty result sets. - // Get the write id allocated by this txn for the given table writes - s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; - pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); - LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", - quoteString(dbName), quoteString(tblName)); - rs = pStmt.executeQuery(); - if (rs.next()) { - writeId = rs.getLong(1); - } - } - rows.add(txnid + ", ?, " + - (tblName == null ? "null" : "?") + ", " + - (partName == null ? "null" : "?")+ "," + - quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + - (writeId == null ? "null" : writeId)); - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - closeStmt(pst); - } - insertPreparedStmts = null; - } - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - long intLockId = 0; - String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString()); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHBString + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); - } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + - "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + - "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - } dbConn.commit(); success = true; return new ConnectionLockIdPair(dbConn, extLockId); @@ -2598,13 +2427,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, null); + closeStmt(stmt); if (!success) { /* This needs to return a "live" connection to be used by operation that follows it. Thus it only closes Connection on failure/retry. */ @@ -2617,6 +2440,204 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc return enqueueLockWithRetry(rqst); } } + + private long getNextLockId(Connection dbConn, Statement stmt) throws SQLException, MetaException { + String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet rs = stmt.executeQuery(s)) { + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + return rs.getLong(1); + } + } + + private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId, long tempLockId) throws SQLException { + String incrCmd = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); + // update hive locks entries with the real LOCK_EXT_ID (replace temp ID) + String updateLocksCmd = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = " + extLockId + " WHERE \"HL_LOCK_EXT_ID\" = " + tempLockId; + LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">"); + stmt.addBatch(incrCmd); + stmt.addBatch(updateLocksCmd); + stmt.executeBatch(); + } + + private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn) throws SQLException { + if (txnid > 0) { + try (PreparedStatement pstmt = dbConn.prepareStatement("INSERT INTO \"TXN_COMPONENTS\" (" + + "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" + + " VALUES (?, ?, ?, ?, ?, ?)")) { + // For each component in this lock request, + // add an entry to the txn_components table + int insertCounter = 0; + int maxInsertsPerBatch = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + if (!shouldUpdateTxnComponent(txnid, rqst, lc)) { + continue; + } + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + Long writeId = getWriteId(txnid, dbConn, dbName, tblName); + + pstmt.setLong(1, txnid); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setString(4, partName); + pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString()); + pstmt.setObject(6, writeId); + pstmt.addBatch(); + insertCounter++; + if (insertCounter % maxInsertsPerBatch == 0) { + LOG.debug("Executing TXN_COMPONENTS inserts in batch. Batch size: " + maxInsertsPerBatch); + pstmt.executeBatch(); + pstmt.clearBatch(); + } + } + if (!rqst.getComponent().isEmpty()) { + LOG.debug("Executing TXN_COMPONENTS inserts in batch. Batch size: " + insertCounter % maxInsertsPerBatch); + pstmt.executeBatch(); + } + } + } + } + + private Long getWriteId(long txnid, Connection dbConn, String dbName, String tblName) throws SQLException { + if (tblName != null) { + // It is assumed the caller have already allocated write id for adding/updating data to + // the acid tables. However, DDL operatons won't allocate write id and hence this query + // may return empty result sets. + // Get the write id allocated by this txn for the given table writes + String s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + try (PreparedStatement pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); + ResultSet rs = pStmt.executeQuery()) { + if (rs.next()) { + return rs.getLong(1); + } + } + } + return null; + } + + private boolean shouldUpdateTxnComponent(long txnid, LockRequest rqst, LockComponent lc) { + if(!lc.isSetOperationType()) { + //request came from old version of the client + return true; //this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + if(!lc.isSetIsDynamicPartitionWrite()) { + //must be old client talking, i.e. we don't know if it's DP so be conservative + return true; + } + else { + /** + * we know this is part of DP operation and so we'll get + * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list + * of partitions actually chaged. + */ + return !lc.isIsDynamicPartitionWrite(); + } + case SELECT: + return false; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + return false; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + } + + private long insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException { + long generatedExtLockId = generateTemporaryLockId(); + long intLockId = 0; + String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString()); + int maxInsertsPerBatch = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + try (PreparedStatement pstmt = dbConn.prepareStatement( + "INSERT INTO \"HIVE_LOCKS\" (" + + " \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //old version of thrift client should have (lc.isSetOperationType() == false) but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables. + // It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + char lockChar = getLockChar(lc.getType()); + + pstmt.setLong(1, generatedExtLockId); + pstmt.setLong(2, intLockId); + pstmt.setLong(3, txnid); + pstmt.setString(4, normalizeCase(lc.getDbname())); + pstmt.setString(5, normalizeCase(lc.getTablename())); + pstmt.setString(6, normalizeCase(lc.getPartitionname())); + pstmt.setString(7, Character.toString(LOCK_WAITING)); + pstmt.setString(8, Character.toString(lockChar)); + pstmt.setString(9, lastHBString); + pstmt.setString(10, rqst.getUser()); + pstmt.setString(11, rqst.getHostname()); + pstmt.setString(12, rqst.getAgentInfo()); + + pstmt.addBatch(); + if (intLockId % maxInsertsPerBatch == 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxInsertsPerBatch); + pstmt.executeBatch(); + pstmt.clearBatch(); + } + } + if (!rqst.getComponent().isEmpty()) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxInsertsPerBatch); + pstmt.executeBatch(); + } + } + return generatedExtLockId; + } + + /* We generate a negative value for temp lock ID so that lock checks can safely ignore these temporary entries */ + private long generateTemporaryLockId() { + return -1 * ThreadLocalRandom.current().nextLong(1000, Long.MAX_VALUE); + } + + private char getLockChar(LockType lockType) { + switch (lockType) { + case EXCLUSIVE: + return LOCK_EXCLUSIVE; + case SHARED_READ: + return LOCK_SHARED; + case SHARED_WRITE: + return LOCK_SEMI_SHARED; + default: + return 'z'; + } + } + private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } @@ -3280,7 +3301,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; - List insertPreparedStmts = null; try { try { lockInternal(); @@ -3299,23 +3319,30 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) } Long writeId = rqst.getWriteid(); - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (String partName : rqst.getPartitionnames()) { - rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId); - List params = new ArrayList<>(); - params.add(normalizeCase(rqst.getDbname())); - params.add(normalizeCase(rqst.getTablename())); - params.add(partName); - paramsList.add(params); - } - int modCount = 0; - //record partitions that were written to - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - modCount = pst.executeUpdate(); + try (PreparedStatement pstmt = dbConn.prepareStatement("INSERT INTO \"TXN_COMPONENTS\" (" + + "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" + + " VALUES (?, ?, ?, ?, ?, ?)")) { + int insertCounter = 0; + int maxInsertsPerBatch = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (String partName : rqst.getPartitionnames()) { + pstmt.setLong(1, rqst.getTxnid()); + pstmt.setString(2, normalizeCase(rqst.getDbname())); + pstmt.setString(3, normalizeCase(rqst.getTablename())); + pstmt.setString(4, partName); + pstmt.setString(5, quoteChar(ot.sqlConst)); + pstmt.setObject(6, writeId); + pstmt.addBatch(); + insertCounter++; + if (insertCounter % maxInsertsPerBatch == 0) { + LOG.debug("Executing TXN_COMPONENTS inserts in batch. Batch size: " + maxInsertsPerBatch); + pstmt.executeBatch(); + pstmt.clearBatch(); + } + } + if (!rqst.getPartitionnames().isEmpty()) { + LOG.debug("Executing TXN_COMPONENTS inserts in batch. Batch size: " + insertCounter % maxInsertsPerBatch); + pstmt.executeBatch(); + } } LOG.debug("Going to commit"); dbConn.commit(); @@ -3326,11 +3353,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for(PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } close(null, stmt, dbConn); unlockInternal(); }