diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index ef88240a79..620c77e589 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -26,7 +26,10 @@ import java.sql.SQLTransactionRollbackException; import java.sql.Statement; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumMap; +import java.util.List; import java.util.Properties; import com.google.common.annotations.VisibleForTesting; @@ -629,4 +632,33 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { throw new MetaException(msg); } } + + /** + * @param stmt Statement which will be used for batching and execution. + * @param queries List of sql queries to execute in a Statement batch. + * @param conf Configuration for retrieving max batch size param + * @return A list with the number of rows affected by each query in queries. + * @throws SQLException Thrown if an execution error occurs. + */ + static List executeQueriesInBatch(Statement stmt, List queries, Configuration conf) throws SQLException { + List affectedRowsByQuery = new ArrayList<>(); + int queryCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (String query : queries) { + LOG.debug("Adding query to batch: <" + query + ">"); + queryCounter++; + stmt.addBatch(query); + if (queryCounter % batchSize == 0) { + LOG.debug("Going to execute queries in batch. Batch size: " + batchSize); + int[] affectedRecordsByQuery = stmt.executeBatch(); + Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add); + } + } + if (queryCounter % batchSize != 0) { + LOG.debug("Going to execute queries in batch. Batch size: " + queryCounter % batchSize); + int[] affectedRecordsByQuery = stmt.executeBatch(); + Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add); + } + return affectedRowsByQuery; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 74ef88545e..c42da1f264 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -88,6 +88,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; +import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; @@ -190,6 +191,10 @@ "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; + private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + + " VALUES (%s, ?, ?, ?, ?, %s)"; + private List transactionalListeners; /** @@ -1109,7 +1114,6 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - List insertPreparedStmts = null; ResultSet commitIdRs = null, rs; try { lockInternal(); @@ -1291,47 +1295,32 @@ public void commitTxn(CommitTxnRequest rqst) } } else { if (rqst.isSetWriteEventInfos()) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { - rows.add(txnid + ", ?, ?, ?," + - writeEventInfo.getWriteId() + "," + - quoteChar(isUpdateDelete)); - List params = new ArrayList<>(); - params.add(writeEventInfo.getDatabase()); - params.add(writeEventInfo.getTable()); - params.add(writeEventInfo.getPartition()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"COMPLETED_TXN_COMPONENTS\" " + - "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); + try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { + int insertCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { + pstmt.setString(1, writeEventInfo.getDatabase()); + pstmt.setString(2, writeEventInfo.getTable()); + pstmt.setString(3, writeEventInfo.getPartition()); + pstmt.setLong(4, writeEventInfo.getWriteId()); + + pstmt.addBatch(); + insertCounter++; + if (insertCounter % batchSize == 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % batchSize != 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize); + pstmt.executeBatch(); + } } } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - - // cleanup all txn related metadata - s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); - - s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + cleanUpTxnRelatedMetadata(txnid, stmt); // update the key/value associated with the transaction if it has been // set @@ -1380,11 +1369,6 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } close(commitIdRs, stmt, dbConn); unlockInternal(); } @@ -1393,6 +1377,17 @@ public void commitTxn(CommitTxnRequest rqst) } } + private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException { + List queries = Arrays.asList( + "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, + "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, + "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, + "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, + "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); + executeQueriesInBatch(stmt, queries, conf); + LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + } + /** * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. * @param rqst info on table/partitions and writeid snapshot to replicate. @@ -2648,12 +2643,12 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. * - * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking + * {@link #checkLock(java.sql.Connection, long, long)} must run at SERIALIZABLE (make sure some lock we are checking * against doesn't move from W to A in another txn) but this method can heartbeat in * separate txn at READ_COMMITTED. * * Retry-by-caller note: - * Retryable because {@link #checkLock(Connection, long)} is + * Retryable because {@link #checkLock(Connection, long, long)} is */ @Override @RetrySemantics.SafeToRetry @@ -4220,7 +4215,6 @@ private int abortTxns(Connection dbConn, List txnids, boolean isStrict) th private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbeat, boolean isStrict) throws SQLException, MetaException { Statement stmt = null; - int updateCnt = 0; if (txnids.isEmpty()) { return 0; } @@ -4229,70 +4223,44 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe //This is an update statement, thus at any Isolation level will take Write locks so will block //all other ops using S4U on TXNS row. List queries = new ArrayList<>(); - StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + - " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "); - if(checkHeartbeat) { - suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < "); - suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); + // add update txns queries to query list + prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED)) + .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND "); + if (checkHeartbeat) { + suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ") + .append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); + int numUpdateQueries = queries.size(); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - updateCnt += stmt.executeUpdate(query); - } - - // As current txn is aborted, this won't read any data from other txns. So, it is safe to unregister - // the min_open_txnid from MIN_HISTORY_LEVEL for the aborted txns. Even if the txns in the list are - // partially aborted, it is safe to delete from MIN_HISTORY_LEVEL as the remaining txns are either - // already committed or aborted. - queries.clear(); + // add delete min history queries to query list prefix.setLength(0); suffix.setLength(0); - prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE "); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.debug("Deleted " + rc + " records from MIN_HISTORY_LEVEL"); - } - LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); - - if (updateCnt < txnids.size() && isStrict) { - /** - * have to bail in this case since we don't know which transactions were not Aborted and - * thus don't know which locks to delete - * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and - * {@link #performTimeOuts()} - */ - return updateCnt; - } - - queries.clear(); + // add delete hive locks queries to query list prefix.setLength(0); - suffix.setLength(0); - prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE "); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.debug("Removed " + rc + " records from HIVE_LOCKS"); - } + // execute all queries in the list in one batch + List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf); + LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); + return getUpdateCount(numUpdateQueries, affectedRowsByQuery); } finally { closeStmt(stmt); } - return updateCnt; + } + + private int getUpdateCount(int numUpdateQueries, List affectedRowsByQuery) { + return affectedRowsByQuery.stream() + .limit(numUpdateQueries) + .mapToInt(Integer::intValue) + .sum(); } private static boolean isValidTxn(long txnId) {