diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c3a0..5bc4ae4b33 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -183,6 +183,7 @@ private static final int ALLOWED_REPEATED_DEADLOCKS = 10; private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); private static final Long TEMP_HIVE_LOCK_ID = -1L; + private static final Long TEMP_COMMIT_ID = -1L; private static DataSource connPool; private static DataSource connPoolMutex; @@ -1123,7 +1124,8 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - ResultSet commitIdRs = null, rs; + ResultSet commitIdRs = null, rs = null; + Long commitId = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -1174,9 +1176,7 @@ public void commitTxn(CommitTxnRequest rqst) } String conflictSQLSuffix = null; - if (rqst.isSetReplPolicy()) { - rs = null; - } else { + if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, @@ -1186,32 +1186,35 @@ public void commitTxn(CommitTxnRequest rqst) isUpdateDelete = 'Y'; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict + + // insert write_set with dummy commitId + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it may create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of its columns + */ + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + stmt.executeUpdate("INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + TEMP_COMMIT_ID + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); + /** * This S4U will mutex with other commitTxn() and openTxns(). * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start * at the same time and no new txns start until all 3 commit. - * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. */ commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"")); if (!commitIdRs.next()) { throw new IllegalStateException("No rows found in NEXT_TXN_ID"); } - long commitId = commitIdRs.getLong(1); - Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); - /** - * "select distinct" is used below because - * 1. once we get to multi-statement txns, we only care to record that something was updated once - * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create - * duplicate entries in TXN_COMPONENTS - * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct - * even if it includes all of it's columns - */ - stmt.executeUpdate( - "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + - " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + commitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); + commitId = commitIdRs.getLong(1); + /** - * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * see if there are any overlapping txns that wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, * committed.ws_commit_id <= current.ws_commit_id for all txns * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap @@ -1219,34 +1222,9 @@ public void commitTxn(CommitTxnRequest rqst) * [17,20] committed and [21,21] committing now - these do not overlap. * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) */ - rs = stmt.executeQuery - (sqlGenerator.addLimitClause(1, "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " + - "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", \"COMMITTED\".\"WS_PARTITION\", " + - "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", \"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " + - "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " + - "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND \"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " + - //For partitioned table we always track writes at partition level (never at table) - //and for non partitioned - always at table level, thus the same table should never - //have entries with partition key and w/o - "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR (\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) " + - "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + //txns overlap; could replace ws_txnid - // with txnid, though any decent DB should infer this - " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join only has rows we just inserted as - // part of this commitTxn() op - " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has committed txns - //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all - //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns - //where each does "delete X + insert X, where X is a row with the same PK. This is - //equivalent to an update of X but won't be in conflict unless D+D is in conflict. - //The same happens when Hive splits U=I+D early so it looks like 2 branches of a - //multi-insert stmt (an Insert and a Delete branch). It also 'feels' - // un-serializable to allow concurrent deletes - " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + - ", " + quoteChar(OperationType.DELETE.sqlConst) + - ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " - + quoteChar(OperationType.DELETE.sqlConst) + "))")); - if (rs.next()) { - //found a conflict + boolean writeConflictExists = checkWriteConflict(stmt, txnid); + + if (writeConflictExists) { String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); String partitionName = rs.getString(5); @@ -1255,7 +1233,6 @@ public void commitTxn(CommitTxnRequest rqst) } String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); - close(rs); //remove WRITE_SET info for current txn since it's about to abort dbConn.rollback(undoWriteSetForCurrentTxn); LOG.info(msg); @@ -1264,10 +1241,7 @@ public void commitTxn(CommitTxnRequest rqst) throw new IllegalStateException(msg + " FAILED!"); } dbConn.commit(); - close(null, stmt, dbConn); throw new TxnAbortedException(msg); - } else { - //no conflicting operations, proceed with the rest of commit sequence } } else { @@ -1284,7 +1258,7 @@ public void commitTxn(CommitTxnRequest rqst) } String s; - if (!rqst.isSetReplPolicy()) { + if (!rqst.isSetReplPolicy() && txnRecord.type != TxnType.READ_ONLY) { // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. s = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", \"CTC_DATABASE\", " + @@ -1302,7 +1276,7 @@ public void commitTxn(CommitTxnRequest rqst) LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } - } else { + } else if (rqst.isSetReplPolicy()) { if (rqst.isSetWriteEventInfos()) { String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { @@ -1329,10 +1303,24 @@ public void commitTxn(CommitTxnRequest rqst) } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - cleanUpTxnRelatedMetadata(txnid, stmt); - // update the key/value associated with the transaction if it has been - // set + // update write_set with real commitId (if it's update/delete op) and clean up txn related metadata + List batchedQueries = new ArrayList<>(); + batchedQueries.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); + batchedQueries.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid); + batchedQueries.add("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid); + batchedQueries.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); + if (txnRecord.type != TxnType.READ_ONLY) { + batchedQueries.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid); + } + if (commitId != null) { + batchedQueries.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId + + " WHERE \"WS_COMMIT_ID\" = " + TEMP_COMMIT_ID + " AND \"WS_TXNID\" = " + txnid); + } + executeQueriesInBatch(stmt, batchedQueries, conf); + LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + + // update the key/value associated with the transaction if it has been set if (rqst.isSetKeyValue()) { if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) { String errorMsg = "Error updating key/value in the sql backend with" @@ -1369,7 +1357,6 @@ public void commitTxn(CommitTxnRequest rqst) } LOG.debug("Going to commit"); - close(rs); dbConn.commit(); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -1378,6 +1365,7 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + close(rs); close(commitIdRs, stmt, dbConn); unlockInternal(); } @@ -1386,15 +1374,36 @@ public void commitTxn(CommitTxnRequest rqst) } } - private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException { - List queries = Arrays.asList( - "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, - "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, - "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, - "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, - "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); - executeQueriesInBatch(stmt, queries, conf); - LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + private boolean checkWriteConflict(Statement stmt, long txnid) throws SQLException, MetaException { + try (ResultSet rs = stmt.executeQuery + (sqlGenerator.addLimitClause(1, "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " + + "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", \"COMMITTED\".\"WS_PARTITION\", " + + "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", \"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " + + "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " + + "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND \"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " + + //For partitioned table we always track writes at partition level (never at table) + //and for non partitioned - always at table level, thus the same table should never + //have entries with partition key and w/o + "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR (\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) " + + "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commitTxn() op + " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has committed txns + //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all + //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns + //where each does "delete X + insert X, where X is a row with the same PK. This is + //equivalent to an update of X but won't be in conflict unless D+D is in conflict. + //The same happens when Hive splits U=I+D early so it looks like 2 branches of a + //multi-insert stmt (an Insert and a Delete branch). It also 'feels' + // un-serializable to allow concurrent deletes + " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + + ", " + quoteChar(OperationType.DELETE.sqlConst) + + ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + + quoteChar(OperationType.DELETE.sqlConst) + "))"))) { + + return rs.next(); + } } /**