diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index be4d63c4c9..536571772b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -183,6 +183,7 @@ private static final int ALLOWED_REPEATED_DEADLOCKS = 10; private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); private static final Long TEMP_HIVE_LOCK_ID = -1L; + private static final Long TEMP_COMMIT_ID = -1L; private static DataSource connPool; private static DataSource connPoolMutex; @@ -1081,7 +1082,7 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - ResultSet commitIdRs = null, rs; + Long commitId = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -1131,45 +1132,47 @@ public void commitTxn(CommitTxnRequest rqst) raiseTxnUnexpectedState(actualTxnStatus, txnid); } - String conflictSQLSuffix = null; - if (rqst.isSetReplPolicy()) { - rs = null; - } else { - conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + - quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; - rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, - "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix)); - } - if (rs != null && rs.next()) { + String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + + quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; + + if (txnRecord.type != TxnType.READ_ONLY + && !rqst.isSetReplPolicy() + && isUpdateOrDelete(stmt, conflictSQLSuffix)) { + isUpdateDelete = 'Y'; - close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it may create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of its columns + * + * First insert into write_set using a temporary commitID, which will be updated in a separate call, + * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long)}}. + * This should decrease the scope of the S4U lock on the next_txn_id table. + */ + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + stmt.executeUpdate("INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + TEMP_COMMIT_ID + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); + /** * This S4U will mutex with other commitTxn() and openTxns(). * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start * at the same time and no new txns start until all 3 commit. - * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. */ - commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"")); - if (!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + try (ResultSet commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""))) { + if (!commitIdRs.next()) { + throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + } + commitId = commitIdRs.getLong(1); } - long commitId = commitIdRs.getLong(1); - Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); - /** - * "select distinct" is used below because - * 1. once we get to multi-statement txns, we only care to record that something was updated once - * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create - * duplicate entries in TXN_COMPONENTS - * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct - * even if it includes all of it's columns - */ - stmt.executeUpdate( - "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + - " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + commitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); + /** - * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * see if there are any overlapping txns that wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, * committed.ws_commit_id <= current.ws_commit_id for all txns * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap @@ -1177,55 +1180,27 @@ public void commitTxn(CommitTxnRequest rqst) * [17,20] committed and [21,21] committing now - these do not overlap. * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) */ - rs = stmt.executeQuery - (sqlGenerator.addLimitClause(1, "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " + - "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", \"COMMITTED\".\"WS_PARTITION\", " + - "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", \"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " + - "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " + - "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND \"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " + - //For partitioned table we always track writes at partition level (never at table) - //and for non partitioned - always at table level, thus the same table should never - //have entries with partition key and w/o - "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR (\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) " + - "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + //txns overlap; could replace ws_txnid - // with txnid, though any decent DB should infer this - " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join only has rows we just inserted as - // part of this commitTxn() op - " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has committed txns - //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all - //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns - //where each does "delete X + insert X, where X is a row with the same PK. This is - //equivalent to an update of X but won't be in conflict unless D+D is in conflict. - //The same happens when Hive splits U=I+D early so it looks like 2 branches of a - //multi-insert stmt (an Insert and a Delete branch). It also 'feels' - // un-serializable to allow concurrent deletes - " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + - ", " + quoteChar(OperationType.DELETE.sqlConst) + - ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " - + quoteChar(OperationType.DELETE.sqlConst) + "))")); - if (rs.next()) { - //found a conflict - String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; - StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); - String partitionName = rs.getString(5); - if (partitionName != null) { - resource.append('/').append(partitionName); - } - String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + - " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); - close(rs); - //remove WRITE_SET info for current txn since it's about to abort - dbConn.rollback(undoWriteSetForCurrentTxn); - LOG.info(msg); - //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - throw new IllegalStateException(msg + " FAILED!"); + try (ResultSet rs = checkForWriteConflict(stmt, txnid)) { + if (rs.next()) { + //found a conflict, so let's abort the txn + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if (partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + commitId + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + throw new TxnAbortedException(msg); } - dbConn.commit(); - close(null, stmt, dbConn); - throw new TxnAbortedException(msg); - } else { - //no conflicting operations, proceed with the rest of commit sequence } } else { @@ -1241,26 +1216,9 @@ public void commitTxn(CommitTxnRequest rqst) */ } - String s; - if (!rqst.isSetReplPolicy()) { - // Move the record from txn_components into completed_txn_components so that the compactor - // knows where to look to compact. - s = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", \"CTC_DATABASE\", " + - "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\"," + - " \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", '" + isUpdateDelete + - "' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid + - //we only track compactor activity in TXN_COMPONENTS to handle the case where the - //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS - " AND \"TC_OPERATION_TYPE\" <> " + quoteChar(OperationType.COMPACT.sqlConst); - LOG.debug("Going to execute insert <" + s + ">"); - - if ((stmt.executeUpdate(s)) < 1) { - //this can be reasonable for an empty txn START/COMMIT or read-only txn - //also an IUD with DP that didn't match any rows. - LOG.info("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); - } - } else { + if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { + moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete); + } else if (rqst.isSetReplPolicy()) { if (rqst.isSetWriteEventInfos()) { String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { @@ -1287,37 +1245,9 @@ public void commitTxn(CommitTxnRequest rqst) } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - cleanUpTxnRelatedMetadata(txnid, stmt); - // update the key/value associated with the transaction if it has been - // set + updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId); if (rqst.isSetKeyValue()) { - if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) { - String errorMsg = "Error updating key/value in the sql backend with" - + " txnId=" + rqst.getTxnid() + "," - + " tableId=" + rqst.getKeyValue().getTableId() + "," - + " key=" + rqst.getKeyValue().getKey() + "," - + " value=" + rqst.getKeyValue().getValue() + "." - + " key should start with " + TXN_KEY_START + "."; - LOG.warn(errorMsg); - throw new IllegalArgumentException(errorMsg); - } - s = "UPDATE \"TABLE_PARAMS\" SET" - + " \"PARAM_VALUE\" = " + quoteString(rqst.getKeyValue().getValue()) - + " WHERE \"TBL_ID\" = " + rqst.getKeyValue().getTableId() - + " AND \"PARAM_KEY\" = " + quoteString(rqst.getKeyValue().getKey()); - LOG.debug("Going to execute update <" + s + ">"); - int affectedRows = stmt.executeUpdate(s); - if (affectedRows != 1) { - String errorMsg = "Error updating key/value in the sql backend with" - + " txnId=" + rqst.getTxnid() + "," - + " tableId=" + rqst.getKeyValue().getTableId() + "," - + " key=" + rqst.getKeyValue().getKey() + "," - + " value=" + rqst.getKeyValue().getValue() + "." - + " Only one row should have been affected but " - + affectedRows + " rows where affected."; - LOG.warn(errorMsg); - throw new IllegalStateException(errorMsg); - } + updateKeyValueAssociatedWithTxn(rqst, stmt); } if (transactionalListeners != null) { @@ -1326,7 +1256,6 @@ public void commitTxn(CommitTxnRequest rqst) } LOG.debug("Going to commit"); - close(rs); dbConn.commit(); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -1335,7 +1264,8 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(commitIdRs, stmt, dbConn); + closeStmt(stmt); + closeDbConn(dbConn); unlockInternal(); } } catch (RetryException e) { @@ -1343,13 +1273,110 @@ public void commitTxn(CommitTxnRequest rqst) } } - private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException { - List queries = Arrays.asList( - "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, - "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, - "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, - "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); - executeQueriesInBatch(stmt, queries, conf); + private boolean isUpdateOrDelete(Statement stmt, String conflictSQLSuffix) throws SQLException, MetaException { + try (ResultSet rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, + "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix))) { + return rs.next(); + } + } + + private ResultSet checkForWriteConflict(Statement stmt, long txnid) throws SQLException, MetaException { + String writeConflictQuery = sqlGenerator.addLimitClause(1, "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " + + "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", \"COMMITTED\".\"WS_PARTITION\", " + + "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", \"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " + + "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " + + "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND \"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " + + //For partitioned table we always track writes at partition level (never at table) + //and for non partitioned - always at table level, thus the same table should never + //have entries with partition key and w/o + "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR (\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) " + + "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commitTxn() op + " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has committed txns + //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all + //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns + //where each does "delete X + insert X, where X is a row with the same PK. This is + //equivalent to an update of X but won't be in conflict unless D+D is in conflict. + //The same happens when Hive splits U=I+D early so it looks like 2 branches of a + //multi-insert stmt (an Insert and a Delete branch). It also 'feels' + // un-serializable to allow concurrent deletes + " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + + ", " + quoteChar(OperationType.DELETE.sqlConst) + + ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + + quoteChar(OperationType.DELETE.sqlConst) + "))"); + LOG.debug("Going to execute query: <" + writeConflictQuery + ">"); + return stmt.executeQuery(writeConflictQuery); + } + + private void moveTxnComponentsToCompleted(Statement stmt, long txnid, char isUpdateDelete) throws SQLException { + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + String s = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", \"CTC_DATABASE\", " + + "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\"," + + " \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", '" + isUpdateDelete + + "' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid + + //we only track compactor activity in TXN_COMPONENTS to handle the case where the + //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS + " AND \"TC_OPERATION_TYPE\" <> " + quoteChar(OperationType.COMPACT.sqlConst); + LOG.debug("Going to execute insert <" + s + ">"); + + if ((stmt.executeUpdate(s)) < 1) { + //this can be reasonable for an empty txn START/COMMIT or read-only txn + //also an IUD with DP that didn't match any rows. + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + } + } + + private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId) throws SQLException { + List queryBatch = new ArrayList<>(5); + // update write_set with real commitId + if (commitId != null) { + queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId + + " WHERE \"WS_COMMIT_ID\" = " + TEMP_COMMIT_ID + " AND \"WS_TXNID\" = " + txnid); + } + // clean up txn related metadata + if (txnType != TxnType.READ_ONLY) { + queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid); + } + queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); + queryBatch.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid); + queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); + + // execute all in one batch + executeQueriesInBatch(stmt, queryBatch, conf); + } + + private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, Statement stmt) throws SQLException { + if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) { + String errorMsg = "Error updating key/value in the sql backend with" + + " txnId=" + rqst.getTxnid() + "," + + " tableId=" + rqst.getKeyValue().getTableId() + "," + + " key=" + rqst.getKeyValue().getKey() + "," + + " value=" + rqst.getKeyValue().getValue() + "." + + " key should start with " + TXN_KEY_START + "."; + LOG.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + String s = "UPDATE \"TABLE_PARAMS\" SET" + + " \"PARAM_VALUE\" = " + quoteString(rqst.getKeyValue().getValue()) + + " WHERE \"TBL_ID\" = " + rqst.getKeyValue().getTableId() + + " AND \"PARAM_KEY\" = " + quoteString(rqst.getKeyValue().getKey()); + LOG.debug("Going to execute update <" + s + ">"); + int affectedRows = stmt.executeUpdate(s); + if (affectedRows != 1) { + String errorMsg = "Error updating key/value in the sql backend with" + + " txnId=" + rqst.getTxnid() + "," + + " tableId=" + rqst.getKeyValue().getTableId() + "," + + " key=" + rqst.getKeyValue().getKey() + "," + + " value=" + rqst.getKeyValue().getValue() + "." + + " Only one row should have been affected but " + + affectedRows + " rows where affected."; + LOG.warn(errorMsg); + throw new IllegalStateException(errorMsg); + } } /**