diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3bfb0e69cb..f6097f7520 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -644,6 +644,12 @@ public static ConfVars getMetaConf(String name) { "However, it doesn't work correctly with integral values that are not normalized (e.g. have\n" + "leading zeroes, like 0012). If metastore direct SQL is enabled and works, this optimization\n" + "is also irrelevant."), + // Once exceeded, the queries should be broken into separate batches. + // Note: This value is not passed into the JDBC driver, therefore this batch size limit is not automatically enforced. + // Batch construction/splits should be done manually in code using this config value. + JDBC_MAX_BATCH_SIZE("metastore.jdbc.max.batch.size", "hive.metastore.jdbc.max.batch.size", + 1000, new RangeValidator(1, null), + "Maximum number of update/delete/insert queries in a single JDBC batch statement (including Statement/PreparedStatement)."), KERBEROS_KEYTAB_FILE("metastore.kerberos.keytab.file", "hive.metastore.kerberos.keytab.file", "", "The path to the Kerberos Keytab file containing the metastore Thrift server's service principal."), diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index a66e16973f..bb29410e7d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -637,14 +637,13 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { /** * @param stmt Statement which will be used for batching and execution. * @param queries List of sql queries to execute in a Statement batch. - * @param conf Configuration for retrieving max batch size param + * @param batchSize maximum number of queries in a single batch * @return A list with the number of rows affected by each query in queries. * @throws SQLException Thrown if an execution error occurs. */ - static List executeQueriesInBatch(Statement stmt, List queries, Configuration conf) throws SQLException { + static List executeQueriesInBatch(Statement stmt, List queries, int batchSize) throws SQLException { List affectedRowsByQuery = new ArrayList<>(); int queryCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (String query : queries) { LOG.debug("Adding query to batch: <" + query + ">"); queryCounter++; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 962a63d418..d080df417b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -269,6 +269,7 @@ char getSqlConst() { // (End user) Transaction timeout, in milliseconds. private long timeout; + private int maxBatchSize; private String identifierQuoteString; // quotes to use for quoting tables, where necessary private long retryInterval; private int retryLimit; @@ -346,6 +347,7 @@ public void setConf(Configuration conf) { retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); deadlockRetryInterval = retryInterval / 10; maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( @@ -1221,7 +1223,6 @@ public void commitTxn(CommitTxnRequest rqst) String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { pstmt.setString(1, writeEventInfo.getDatabase()); pstmt.setString(2, writeEventInfo.getTable()); @@ -1230,13 +1231,13 @@ public void commitTxn(CommitTxnRequest rqst) pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -1344,7 +1345,7 @@ private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnTyp queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); // execute all in one batch - executeQueriesInBatch(stmt, queryBatch, conf); + executeQueriesInBatch(stmt, queryBatch, maxBatchSize); } private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, Statement stmt) throws SQLException { @@ -2460,7 +2461,6 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn // For each component in this lock request, // add an entry to the txn_components table int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (LockComponent lc : rqst.getComponent()) { if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { //we don't prevent using non-acid resources in a txn but we do lock them @@ -2483,13 +2483,13 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -2572,7 +2572,6 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); long intLockId = 0; try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { @@ -2604,13 +2603,13 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn pstmt.setString(11, rqst.getAgentInfo()); pstmt.addBatch(); - if (intLockId % batchSize == 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize); + if (intLockId % maxBatchSize == 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (intLockId % batchSize != 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize); + if (intLockId % maxBatchSize != 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize); pstmt.executeBatch(); } } @@ -3299,7 +3298,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) Long writeId = rqst.getWriteid(); try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (String partName : rqst.getPartitionnames()) { pstmt.setLong(1, rqst.getTxnid()); pstmt.setString(2, normalizeCase(rqst.getDbname())); @@ -3310,13 +3308,13 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -4246,7 +4244,7 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); // execute all queries in the list in one batch - List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf); + List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize); return getUpdateCount(numUpdateQueries, affectedRowsByQuery); } finally { closeStmt(stmt);