diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 74ef88545e..7c19b66521 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -190,6 +190,10 @@ "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; + private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + + " VALUES (%s, ?, ?, ?, ?, %s)"; + private List transactionalListeners; /** @@ -1109,7 +1113,6 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; - List insertPreparedStmts = null; ResultSet commitIdRs = null, rs; try { lockInternal(); @@ -1291,47 +1294,32 @@ public void commitTxn(CommitTxnRequest rqst) } } else { if (rqst.isSetWriteEventInfos()) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { - rows.add(txnid + ", ?, ?, ?," + - writeEventInfo.getWriteId() + "," + - quoteChar(isUpdateDelete)); - List params = new ArrayList<>(); - params.add(writeEventInfo.getDatabase()); - params.add(writeEventInfo.getTable()); - params.add(writeEventInfo.getPartition()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"COMPLETED_TXN_COMPONENTS\" " + - "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); + try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { + int insertCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { + pstmt.setString(1, writeEventInfo.getDatabase()); + pstmt.setString(2, writeEventInfo.getTable()); + pstmt.setString(3, writeEventInfo.getPartition()); + pstmt.setObject(4, writeEventInfo.getWriteId()); + + pstmt.addBatch(); + insertCounter++; + if (insertCounter % batchSize == 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % batchSize != 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize); + pstmt.executeBatch(); + } } } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - - // cleanup all txn related metadata - s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); - - s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + cleanUpTxnRelatedMetadata(txnid, stmt); // update the key/value associated with the transaction if it has been // set @@ -1380,11 +1368,6 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } close(commitIdRs, stmt, dbConn); unlockInternal(); } @@ -1393,6 +1376,29 @@ public void commitTxn(CommitTxnRequest rqst) } } + private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException { + String s; + s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid; + LOG.debug("Adding query to batch: <" + s + ">"); + stmt.addBatch(s); + s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid; + LOG.debug("Adding query to batch: <" + s + ">"); + stmt.addBatch(s); + s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; + LOG.debug("Adding query to batch: <" + s + ">"); + stmt.addBatch(s); + s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid; + LOG.debug("Adding query to batch: <" + s + ">"); + stmt.addBatch(s); + s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid; + LOG.debug("Adding query to batch: <" + s + ">"); + stmt.addBatch(s); + + LOG.debug("Going to execute batch of queries."); + stmt.executeBatch(); + LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + } + /** * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. * @param rqst info on table/partitions and writeid snapshot to replicate. @@ -4242,10 +4248,7 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - updateCnt += stmt.executeUpdate(query); - } + updateCnt = executeQueriesInBatch(stmt, queries); // As current txn is aborted, this won't read any data from other txns. So, it is safe to unregister // the min_open_txnid from MIN_HISTORY_LEVEL for the aborted txns. Even if the txns in the list are @@ -4259,11 +4262,8 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.debug("Deleted " + rc + " records from MIN_HISTORY_LEVEL"); - } + int totalDeletedRecords = executeQueriesInBatch(stmt, queries); + LOG.debug("Deleted a total of " + totalDeletedRecords + " records from MIN_HISTORY_LEVEL"); LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); if (updateCnt < txnids.size() && isStrict) { @@ -4284,17 +4284,36 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.debug("Removed " + rc + " records from HIVE_LOCKS"); - } + totalDeletedRecords = executeQueriesInBatch(stmt, queries); + LOG.debug("Deleted a total of " + totalDeletedRecords + " records from HIVE_LOCKS"); } finally { closeStmt(stmt); } return updateCnt; } + private int executeQueriesInBatch(Statement stmt, List queries) throws SQLException { + int totalRecordsAffected = 0; + int queryCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (String query : queries) { + LOG.debug("Adding query to batch: <" + query + ">"); + queryCounter++; + stmt.addBatch(query); + if (queryCounter % batchSize == 0) { + LOG.debug("Going to execute queries in batch. Batch size: " + batchSize); + int[] affectedRecordsByQuery = stmt.executeBatch(); + totalRecordsAffected += Arrays.stream(affectedRecordsByQuery).sum(); + } + } + if (queryCounter % batchSize != 0) { + LOG.debug("Going to execute queries in batch. Batch size: " + queryCounter % batchSize); + int[] affectedRecordsByQuery = stmt.executeBatch(); + totalRecordsAffected += Arrays.stream(affectedRecordsByQuery).sum(); + } + return totalRecordsAffected; + } + private static boolean isValidTxn(long txnId) { return txnId != 0; }