diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f53aebe4ad..90bb1c0239 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -186,6 +186,8 @@ private static DataSource connPoolMutex; static private boolean doRetryOnConnPool = false; + private static final String EXT_LOCK_ID = "_EXT_LOCK_ID_"; + private List transactionalListeners; /** @@ -2361,6 +2363,204 @@ private TxnRecord(int txnType) { } } + /** + * Populate TXN_COMPONENTS + * + * @param txnid + * @param rqst + * @param dbConn + * @throws SQLException + */ + private void populateTxnComponents(long txnid, LockRequest rqst, + Connection dbConn) throws SQLException { + if (txnid <= 0) { + return; + } + + List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); + // For each component in this lock request, + // add an entry to the txn_components table + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + boolean updateTxnComponents; + if(!lc.isSetOperationType()) { + //request came from old version of the client + updateTxnComponents = true;//this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + if(!lc.isSetIsDynamicPartitionWrite()) { + //must be old client talking, i.e. we don't know if it's DP so be conservative + updateTxnComponents = true; + } + else { + /** + * we know this is part of DP operation and so we'll get + * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list + * of partitions actually chaged. + */ + updateTxnComponents = !lc.isIsDynamicPartitionWrite(); + } + break; + case SELECT: + updateTxnComponents = false; + break; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + updateTxnComponents = false; + break; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + if(!updateTxnComponents) { + continue; + } + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + Long writeId = null; + if (tblName != null) { + // It is assumed the caller have already allocated write id for adding/updating data to + // the acid tables. However, DDL operatons won't allocate write id and hence this query + // may return empty result sets. + // Get the write id allocated by this txn for the given table writes + String s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; + PreparedStatement pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, + Arrays.asList(dbName, tblName)); + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + ResultSet rs = pStmt.executeQuery(); + if (rs.next()) { + writeId = rs.getLong(1); + } + closeStmt(pStmt); + } + rows.add(txnid + ", ?, " + + (tblName == null ? "null" : "?") + ", " + + (partName == null ? "null" : "?")+ "," + + quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + + (writeId == null ? "null" : writeId)); + List params = new ArrayList<>(); + params.add(dbName); + if (tblName != null) { + params.add(tblName); + } + if (partName != null) { + params.add(partName); + } + paramsList.add(params); + } + List insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", + rows, paramsList); + for(PreparedStatement pst : insertPreparedStmts) { + pst.executeUpdate(); + closeStmt(pst); + } + } + + /** + * Prepare rows and param list for inserting into HIVE_LOCKS table for all + * components in lock request. + * + * @param rqst + * @param txnid + * @param dbConn + * @return rows and parameter list pair + * @throws MetaException + */ + private RowsParamListPair prepareRowsForHiveLocks(LockRequest rqst, + long txnid, Connection dbConn) throws MetaException { + + List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); + long intLockId = 0; + long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); + + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //old version of thrift client should have (lc.isSetOperationType() == false) but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables. + // It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + LockType lockType = lc.getType(); + char lockChar = 'z'; + switch (lockType) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + rows.add(EXT_LOCK_ID + ", " + intLockId + "," + txnid + ", ?, " + + ((tblName == null) ? "null" : "?") + ", " + + ((partName == null) ? "null" : "?") + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + lastHB + ", " + + ((rqst.getUser() == null) ? "null" : "?") + ", " + + ((rqst.getHostname() == null) ? "null" : "?") + ", " + + ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; + List params = new ArrayList<>(); + params.add(dbName); + if (tblName != null) { + params.add(tblName); + } + if (partName != null) { + params.add(partName); + } + if (rqst.getUser() != null) { + params.add(rqst.getUser()); + } + if (rqst.getHostname() != null) { + params.add(rqst.getHostname()); + } + if (rqst.getAgentInfo() != null) { + params.add(rqst.getAgentInfo()); + } + paramsList.add(params); + } + return new RowsParamListPair(rows, paramsList); + } + + private static final class RowsParamListPair { + private final List rows; + private final List> paramsList; + + private RowsParamListPair(List rows, List> paramList) { + this.rows = rows; + this.paramsList = paramList; + } + } + /** * This enters locks into the queue in {@link #LOCK_WAITING} mode. * @@ -2391,6 +2591,12 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc shouldNeverHappen(txnid); } } + + populateTxnComponents(txnid, rqst, dbConn); + // Prepare data needed to be inserted into HIVE_LOCKS upfront. + // Add ext_lock_id later, just before query execution. + RowsParamListPair preparedRows = prepareRowsForHiveLocks(rqst, txnid, dbConn); + /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, @@ -2411,162 +2617,13 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - if (txnid > 0) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - // For each component in this lock request, - // add an entry to the txn_components table - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { - //we don't prevent using non-acid resources in a txn but we do lock them - continue; - } - boolean updateTxnComponents; - if(!lc.isSetOperationType()) { - //request came from old version of the client - updateTxnComponents = true;//this matches old behavior - } - else { - switch (lc.getOperationType()) { - case INSERT: - case UPDATE: - case DELETE: - if(!lc.isSetIsDynamicPartitionWrite()) { - //must be old client talking, i.e. we don't know if it's DP so be conservative - updateTxnComponents = true; - } - else { - /** - * we know this is part of DP operation and so we'll get - * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. - */ - updateTxnComponents = !lc.isIsDynamicPartitionWrite(); - } - break; - case SELECT: - updateTxnComponents = false; - break; - case NO_TXN: - /*this constant is a bit of a misnomer since we now always have a txn context. It - just means the operation is such that we don't care what tables/partitions it - affected as it doesn't trigger a compaction or conflict detection. A better name - would be NON_TRANSACTIONAL.*/ - updateTxnComponents = false; - break; - default: - //since we have an open transaction, only 4 values above are expected - throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() - + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); - } - } - if(!updateTxnComponents) { - continue; - } - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - Long writeId = null; - if (tblName != null) { - // It is assumed the caller have already allocated write id for adding/updating data to - // the acid tables. However, DDL operatons won't allocate write id and hence this query - // may return empty result sets. - // Get the write id allocated by this txn for the given table writes - s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; - pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); - LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", - quoteString(dbName), quoteString(tblName)); - rs = pStmt.executeQuery(); - if (rs.next()) { - writeId = rs.getLong(1); - } - } - rows.add(txnid + ", ?, " + - (tblName == null ? "null" : "?") + ", " + - (partName == null ? "null" : "?")+ "," + - quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + - (writeId == null ? "null" : writeId)); - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - closeStmt(pst); - } - insertPreparedStmts = null; - } - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - long intLockId = 0; - long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHB + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); - } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); - } - paramsList.add(params); + // Replace EXT_LOCK_ID with real extLockId in all the rows + List rows = new ArrayList<>(preparedRows.rows.size()); + List> paramsList = preparedRows.paramsList; + for(String row : preparedRows.rows) { + rows.add(row.replaceAll(EXT_LOCK_ID, "" + extLockId)); } + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + @@ -4420,6 +4477,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId) // table locks for this db. boolean sawNull = false; strings.clear(); + Set partitions = new HashSet<>(strings); for (LockInfo info : locksBeingChecked) { if (info.table == null) { sawNull = true; @@ -4449,24 +4507,32 @@ private LockResponse checkLock(Connection dbConn, long extLockId) sawNull = true; break; } else { - strings.add(info.partition); - } - } - if (!sawNull) { - query.append(" AND (\"HL_PARTITION\" IS NULL OR \"HL_PARTITION\" IN("); - first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); + partitions.add(info.partition); } - query.append("))"); } } query.append(" AND \"HL_LOCK_EXT_ID\" < ").append(extLockId); + /* + TODO: Additional optimisation to consider later. + SHARED_WRITE --> SHARED_READ::ACQUIRED --> (KEEP_LOOKING) + Above state is slightly misleading. This can be optimised later and + can be included in the filter. + */ + // Check whether requested locks are SHARED_READs. + boolean areAllLocksBeingCheckedInSharedRead = true; + for(LockInfo lockInfo : locksBeingChecked) { + if (lockInfo.type != LockType.SHARED_READ) { + areAllLocksBeingCheckedInSharedRead = false; + break; + } + } + + if (areAllLocksBeingCheckedInSharedRead) { + // add additional filters + query.append(" AND \"HL_LOCK_TYPE\" = '").append(LOCK_EXCLUSIVE).append("'"); + } + LOG.debug("Going to execute query <" + query.toString() + ">"); stmt = dbConn.createStatement(); rs = stmt.executeQuery(query.toString()); @@ -4474,6 +4540,12 @@ private LockResponse checkLock(Connection dbConn, long extLockId) while (rs.next()) { lockSet.add(new LockInfo(rs)); } + + if (!lockSet.isEmpty() && !sawNull) { + // filter out locks, whose partition name does not match + lockSet.removeIf(info -> !partitions.contains(info.partition)); + } + // Turn the tree set into an array so we can move back and forth easily // in it. LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); @@ -4522,7 +4594,10 @@ private LockResponse checkLock(Connection dbConn, long extLockId) // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. LockAction lockAction = jumpTable.get(info.type).get(locks[i].type).get(locks[i].state); - LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); + if (LOG.isDebugEnabled()) { + LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + + " action: " + lockAction); + } switch (lockAction) { case WAIT: if(!ignoreConflict(info, locks[i])) { @@ -4571,6 +4646,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId) } return response; } + private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { if(locksBeingChecked == null || locksBeingChecked.isEmpty()) {